Cancelling Long-Running Tasks
When working with large datasets or complex evaluations, some Ragas operations can take significant time to complete. The cancellation feature allows you to gracefully terminate these long-running tasks when needed, which is especially important in production environments.
Overview
Ragas provides cancellation support for:
- evaluate() - Evaluation of datasets with metrics
- generate_with_langchain_docs() - Test set generation from documents
The cancellation mechanism is thread-safe and allows for graceful termination with partial results when possible.
Basic Usage
Cancellable Evaluation
Instead of running evaluation directly, you can get an executor that allows cancellation:
from ragas import evaluate
from ragas.dataset_schema import EvaluationDataset
# Your dataset and metrics
dataset = EvaluationDataset(...)
metrics = [...]
# Get executor instead of running evaluation immediately
executor = evaluate(
dataset=dataset,
metrics=metrics,
return_executor=True # Key parameter
)
# Now you can:
# - Cancel: executor.cancel()
# - Check status: executor.is_cancelled()
# - Get results: executor.results() # This blocks until completion
Cancellable Test Set Generation
Similar approach for test set generation:
from ragas.testset.synthesizers.generate import TestsetGenerator
generator = TestsetGenerator(...)
# Get executor for cancellable generation
executor = generator.generate_with_langchain_docs(
documents=documents,
testset_size=100,
return_executor=True # Allow access to Executor to cancel
)
# Use the same cancellation interface
executor.cancel()
Production Patterns
1. Timeout Pattern
Automatically cancel operations that exceed a time limit:
import threading
import time
def evaluate_with_timeout(dataset, metrics, timeout_seconds=300):
"""Run evaluation with automatic timeout."""
# Get cancellable executor
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
results = None
exception = None
def run_evaluation():
nonlocal results, exception
try:
results = executor.results()
except Exception as e:
exception = e
# Start evaluation in background thread
thread = threading.Thread(target=run_evaluation)
thread.start()
# Wait for completion or timeout
thread.join(timeout=timeout_seconds)
if thread.is_alive():
print(f"Evaluation exceeded {timeout_seconds}s timeout, cancelling...")
executor.cancel()
thread.join(timeout=10) # Custom timeout as per need
return None, "timeout"
return results, exception
# Usage
results, error = evaluate_with_timeout(dataset, metrics, timeout_seconds=600)
if error == "timeout":
print("Evaluation was cancelled due to timeout")
else:
print(f"Evaluation completed: {results}")
2. Signal Handler Pattern (Ctrl+C)
Allow users to cancel with keyboard interrupt:
import signal
import sys
def setup_cancellation_handler():
"""Set up graceful cancellation on Ctrl+C."""
executor = None
def signal_handler(signum, frame):
if executor and not executor.is_cancelled():
print("\nReceived interrupt signal, cancelling evaluation...")
executor.cancel()
print("Cancellation requested. Waiting for graceful shutdown...")
sys.exit(0)
# Register signal handler
signal.signal(signal.SIGINT, signal_handler)
return lambda exec: setattr(signal_handler, 'executor', exec)
# Usage
set_executor = setup_cancellation_handler()
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
set_executor(executor)
print("Running evaluation... Press Ctrl+C to cancel gracefully")
try:
results = executor.results()
print("Evaluation completed successfully")
except KeyboardInterrupt:
print("Evaluation was cancelled")
3. Web Application Pattern
For web applications, cancel operations when requests are aborted:
from flask import Flask, request
import threading
import uuid
app = Flask(__name__)
active_evaluations = {}
@app.route('/evaluate', methods=['POST'])
def start_evaluation():
# Create unique evaluation ID
eval_id = str(uuid.uuid4())
# Get dataset and metrics from request
dataset = get_dataset_from_request(request)
metrics = get_metrics_from_request(request)
# Start cancellable evaluation
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
active_evaluations[eval_id] = executor
# Start evaluation in background
def run_eval():
try:
results = executor.results()
# Store results somewhere
store_results(eval_id, results)
except Exception as e:
store_error(eval_id, str(e))
finally:
active_evaluations.pop(eval_id, None)
threading.Thread(target=run_eval).start()
return {"evaluation_id": eval_id, "status": "started"}
@app.route('/evaluate/<eval_id>/cancel', methods=['POST'])
def cancel_evaluation(eval_id):
executor = active_evaluations.get(eval_id)
if executor:
executor.cancel()
return {"status": "cancelled"}
return {"error": "Evaluation not found"}, 404
Advanced Usage
Checking Cancellation Status
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
# Start in background
def monitor_evaluation():
while not executor.is_cancelled():
print("Evaluation still running...")
time.sleep(5)
print("Evaluation was cancelled")
threading.Thread(target=monitor_evaluation).start()
# Cancel after some condition
if some_condition():
executor.cancel()
Partial Results
When cancellation occurs during execution, you may get partial results:
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
try:
results = executor.results()
print(f"Completed {len(results)} evaluations")
except Exception as e:
if executor.is_cancelled():
print("Evaluation was cancelled - may have partial results")
else:
print(f"Evaluation failed: {e}")
Custom Cancellation Logic
class EvaluationManager:
def __init__(self):
self.executors = []
def start_evaluation(self, dataset, metrics):
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
self.executors.append(executor)
return executor
def cancel_all(self):
"""Cancel all running evaluations."""
for executor in self.executors:
if not executor.is_cancelled():
executor.cancel()
print(f"Cancelled {len(self.executors)} evaluations")
def cleanup_completed(self):
"""Remove completed executors."""
self.executors = [ex for ex in self.executors if not ex.is_cancelled()]
# Usage
manager = EvaluationManager()
# Start multiple evaluations
exec1 = manager.start_evaluation(dataset1, metrics)
exec2 = manager.start_evaluation(dataset2, metrics)
# Cancel all if needed
manager.cancel_all()
Best Practices
1. Always Use Timeouts in Production
# Good: Always set reasonable timeouts
results, error = evaluate_with_timeout(dataset, metrics, timeout_seconds=1800) # 30 minutes
# Avoid: Indefinite blocking
results = executor.results() # Could block forever
2. Handle Cancellation Gracefully
try:
results = executor.results()
process_results(results)
except Exception as e:
if executor.is_cancelled():
log_cancellation()
cleanup_partial_work()
else:
log_error(e)
handle_failure()
3. Provide User Feedback
def run_with_progress_and_cancellation(executor):
print("Starting evaluation... Press Ctrl+C to cancel")
# Monitor progress in background
def show_progress():
while not executor.is_cancelled():
# Show some progress indication
print(".", end="", flush=True)
time.sleep(1)
progress_thread = threading.Thread(target=show_progress)
progress_thread.daemon = True
progress_thread.start()
try:
return executor.results()
except KeyboardInterrupt:
print("\nCancelling...")
executor.cancel()
return None
4. Clean Up Resources
def managed_evaluation(dataset, metrics):
executor = None
try:
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
return executor.results()
except Exception as e:
if executor:
executor.cancel()
raise
finally:
# Clean up any temporary resources
cleanup_temp_files()
Limitations
- Async Operations: Cancellation works at the task level, not within individual LLM calls
- Partial State: Cancelled operations may leave partial results or temporary files
- Timing: Cancellation is cooperative - tasks need to check for cancellation periodically
- Dependencies: Some external services may not respect cancellation immediately
Troubleshooting
Cancellation Not Working
# Check if cancellation is set
if executor.is_cancelled():
print("Cancellation was requested")
else:
print("Cancellation not requested yet")
# Ensure you're calling cancel()
executor.cancel()
assert executor.is_cancelled()
Tasks Still Running After Cancellation
# Give time for graceful shutdown
executor.cancel()
time.sleep(2) # Allow tasks to detect cancellation
# Force cleanup if needed
import asyncio
try:
loop = asyncio.get_running_loop()
for task in asyncio.all_tasks(loop):
task.cancel()
except RuntimeError:
pass # No event loop running
The cancellation feature provides robust control over long-running Ragas operations, enabling production-ready deployments with proper resource management and user experience.