2026-02-04 15:49:25 +01:00

578 lines
17 KiB
Markdown

---
name: lp-celery-task
description: Creates Celery 5.5 tasks for league-planner with AbortableTask, progress tracking via taskmanager, queue routing, and retry strategies. Use for async/background tasks.
argument-hint: <task-name> [queue]
allowed-tools: Read, Write, Edit, Glob, Grep
---
# League-Planner Celery Task Generator
Creates production-ready Celery tasks following league-planner patterns: AbortableTask base, progress tracking with taskmanager.Task model, proper queue routing, and robust retry strategies.
## When to Use
- Creating long-running background tasks (optimization, simulations)
- Implementing async operations triggered by API or UI
- Setting up periodic/scheduled tasks
- Building task chains or workflows
## Prerequisites
- Celery is configured in `leagues/celery.py`
- Redis broker is available
- taskmanager app is installed for progress tracking
- Task queues are defined (celery, q_sim, q_court, subqueue)
## Instructions
### Step 1: Define Task
Create task in the appropriate location:
- `common/tasks.py` - General utility tasks
- `scheduler/simulations/tasks.py` - Simulation tasks
- `{app}/tasks.py` - App-specific tasks
```python
from celery import shared_task
from celery.contrib.abortable import AbortableTask
from celery.exceptions import SoftTimeLimitExceeded
from django.db import transaction
from taskmanager.models import Task as TaskRecord
@shared_task(
bind=True,
name='scheduler.process_scenario',
base=AbortableTask,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
retry_backoff_max=600,
time_limit=3600, # Hard limit: 1 hour
soft_time_limit=3300, # Soft limit: 55 minutes (allows cleanup)
acks_late=True,
reject_on_worker_lost=True,
)
def process_scenario(self, scenario_id: int, user_id: int = None, options: dict = None):
"""
Process a scenario with optimization.
Args:
scenario_id: ID of the scenario to process
user_id: Optional user ID for notifications
options: Optional configuration dict
Returns:
dict: Result with status and details
"""
options = options or {}
# Create task record for tracking
task_record = TaskRecord.objects.create(
task_id=self.request.id,
task_name='scheduler.process_scenario',
scenario_id=scenario_id,
user_id=user_id,
queue=self.request.delivery_info.get('routing_key', 'celery'),
host_name=self.request.hostname,
worker=self.request.hostname,
)
try:
# Update progress
self.update_state(
state='PROGRESS',
meta={'progress': 0, 'status': 'Starting...'}
)
task_record.update_progress(0, 'Starting...')
# Check for abort signal periodically
if self.is_aborted():
return {'status': 'aborted', 'scenario_id': scenario_id}
# Main processing logic
from scheduler.models import Scenario
scenario = Scenario.objects.select_related('season').get(pk=scenario_id)
# Step 1: Prepare data (20%)
self.update_state(
state='PROGRESS',
meta={'progress': 20, 'status': 'Preparing data...'}
)
task_record.update_progress(20, 'Preparing data...')
data = prepare_scenario_data(scenario)
if self.is_aborted():
return {'status': 'aborted', 'scenario_id': scenario_id}
# Step 2: Run optimization (20-80%)
self.update_state(
state='PROGRESS',
meta={'progress': 40, 'status': 'Running optimization...'}
)
task_record.update_progress(40, 'Running optimization...')
result = run_optimization(
data,
progress_callback=lambda p, s: (
self.update_state(state='PROGRESS', meta={'progress': 20 + int(p * 0.6), 'status': s}),
task_record.update_progress(20 + int(p * 0.6), s)
),
abort_check=self.is_aborted,
)
if self.is_aborted():
return {'status': 'aborted', 'scenario_id': scenario_id}
# Step 3: Save results (80-100%)
self.update_state(
state='PROGRESS',
meta={'progress': 90, 'status': 'Saving results...'}
)
task_record.update_progress(90, 'Saving results...')
with transaction.atomic():
save_optimization_results(scenario, result)
# Complete
self.update_state(
state='SUCCESS',
meta={'progress': 100, 'status': 'Complete'}
)
task_record.update_progress(100, 'Complete')
task_record.mark_completed()
return {
'status': 'success',
'scenario_id': scenario_id,
'result': result.summary(),
}
except SoftTimeLimitExceeded:
# Graceful handling of time limit
task_record.update_progress(-1, 'Time limit exceeded')
return {
'status': 'timeout',
'scenario_id': scenario_id,
'message': 'Task exceeded time limit',
}
except self.MaxRetriesExceededError:
task_record.update_progress(-1, 'Max retries exceeded')
raise
except Exception as exc:
task_record.update_progress(-1, f'Error: {str(exc)}')
# Re-raise for Celery's error handling
raise
def prepare_scenario_data(scenario):
"""Prepare data for optimization."""
# Implementation
pass
def run_optimization(data, progress_callback, abort_check):
"""Run the optimization algorithm."""
# Implementation with progress reporting
pass
def save_optimization_results(scenario, result):
"""Save optimization results to database."""
# Implementation
pass
```
### Step 2: Register Task and Configure Queue
In `leagues/celery.py` add queue routing:
```python
from celery import Celery
celery = Celery('leagues')
celery.conf.task_routes = {
# Simulation tasks to dedicated queue
'scheduler.simulations.*': {'queue': 'q_sim'},
'scheduler.process_scenario': {'queue': 'q_sim'},
# Court optimization tasks
'api.court.*': {'queue': 'q_court'},
# Default queue for everything else
'*': {'queue': 'celery'},
}
celery.conf.task_queues = {
'celery': {'exchange': 'celery', 'routing_key': 'celery'},
'q_sim': {'exchange': 'q_sim', 'routing_key': 'q_sim'},
'q_court': {'exchange': 'q_court', 'routing_key': 'q_court'},
'subqueue': {'exchange': 'subqueue', 'routing_key': 'subqueue'},
}
```
### Step 3: Add Progress Tracking Model Methods
The taskmanager.Task model provides these methods:
```python
# In taskmanager/models.py (already exists)
class Task(models.Model):
task_id = models.CharField(max_length=255, unique=True)
task_name = models.CharField(max_length=255)
scenario_id = models.IntegerField(null=True, blank=True)
user_id = models.IntegerField(null=True, blank=True)
queue = models.CharField(max_length=100, default='celery')
host_name = models.CharField(max_length=255, null=True)
worker = models.CharField(max_length=255, null=True)
progress = models.IntegerField(default=0)
status_message = models.CharField(max_length=500, null=True)
created_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True)
def update_progress(self, progress: int, message: str = None):
"""Update task progress."""
self.progress = progress
if message:
self.status_message = message
self.save(update_fields=['progress', 'status_message'])
def mark_completed(self):
"""Mark task as completed."""
from django.utils import timezone
self.completed_at = timezone.now()
self.progress = 100
self.save(update_fields=['completed_at', 'progress'])
def is_running(self) -> bool:
"""Check if task is still running."""
from celery.result import AsyncResult
result = AsyncResult(self.task_id)
return result.state in ('PENDING', 'STARTED', 'PROGRESS', 'RETRY')
def get_status(self) -> dict:
"""Get current task status."""
from celery.result import AsyncResult
result = AsyncResult(self.task_id)
return {
'state': result.state,
'progress': self.progress,
'message': self.status_message,
'result': result.result if result.ready() else None,
}
def revoke(self, terminate: bool = False):
"""Cancel/abort the task."""
from celery.contrib.abortable import AbortableAsyncResult
result = AbortableAsyncResult(self.task_id)
result.abort()
if terminate:
result.revoke(terminate=True)
```
## Patterns & Best Practices
### Task Chain Pattern
```python
from celery import chain, group, chord
def run_simulation_workflow(scenario_id: int, iterations: int = 10):
"""Run a complete simulation workflow."""
workflow = chain(
# Step 1: Prepare
prepare_simulation.s(scenario_id),
# Step 2: Run iterations in parallel
group(
run_iteration.s(i) for i in range(iterations)
),
# Step 3: Aggregate results
aggregate_results.s(scenario_id),
# Step 4: Cleanup
cleanup_simulation.s(),
)
return workflow.apply_async()
@shared_task(bind=True, name='scheduler.prepare_simulation')
def prepare_simulation(self, scenario_id: int):
"""Prepare simulation data."""
# Returns data passed to next task
return {'scenario_id': scenario_id, 'prepared': True}
@shared_task(bind=True, name='scheduler.run_iteration')
def run_iteration(self, preparation_data: dict, iteration: int):
"""Run single simulation iteration."""
scenario_id = preparation_data['scenario_id']
# Run iteration logic
return {'iteration': iteration, 'score': calculate_score()}
@shared_task(bind=True, name='scheduler.aggregate_results')
def aggregate_results(self, iteration_results: list, scenario_id: int):
"""Aggregate results from all iterations."""
scores = [r['score'] for r in iteration_results]
return {
'scenario_id': scenario_id,
'avg_score': sum(scores) / len(scores),
'best_score': max(scores),
}
```
### Periodic Task Pattern
```python
from celery.schedules import crontab
celery.conf.beat_schedule = {
# Daily cleanup at 2 AM
'cleanup-old-tasks': {
'task': 'taskmanager.cleanup_old_tasks',
'schedule': crontab(hour=2, minute=0),
'args': (30,), # Days to keep
},
# Every 5 minutes: check stuck tasks
'check-stuck-tasks': {
'task': 'taskmanager.check_stuck_tasks',
'schedule': 300, # seconds
},
# Weekly report on Mondays at 8 AM
'weekly-report': {
'task': 'scheduler.generate_weekly_report',
'schedule': crontab(day_of_week='monday', hour=8, minute=0),
},
}
@shared_task(name='taskmanager.cleanup_old_tasks')
def cleanup_old_tasks(days_to_keep: int = 30):
"""Clean up old completed tasks."""
from django.utils import timezone
from datetime import timedelta
cutoff = timezone.now() - timedelta(days=days_to_keep)
deleted, _ = TaskRecord.objects.filter(
completed_at__lt=cutoff
).delete()
return {'deleted': deleted}
```
### Idempotent Task Pattern
```python
@shared_task(
bind=True,
name='scheduler.idempotent_update',
autoretry_for=(Exception,),
max_retries=5,
)
def idempotent_update(self, scenario_id: int, version: int):
"""
Idempotent task - safe to retry.
Uses optimistic locking via version field.
"""
from scheduler.models import Scenario
from django.db import transaction
with transaction.atomic():
scenario = Scenario.objects.select_for_update().get(pk=scenario_id)
# Check version to prevent duplicate processing
if scenario.version != version:
return {
'status': 'skipped',
'reason': 'Version mismatch - already processed',
}
# Process
result = do_processing(scenario)
# Increment version
scenario.version = version + 1
scenario.save(update_fields=['version'])
return {'status': 'success', 'new_version': version + 1}
```
### Django Transaction Integration (Celery 5.4+)
```python
from django.db import transaction
def create_scenario_and_optimize(data: dict):
"""
Create scenario and trigger optimization only after commit.
Uses Django's on_commit to ensure task is sent only after
the transaction is committed successfully.
"""
with transaction.atomic():
scenario = Scenario.objects.create(**data)
# Task will only be sent if transaction commits
transaction.on_commit(
lambda: process_scenario.delay(scenario.id)
)
return scenario
```
### Soft Shutdown Handling (Celery 5.5+)
```python
# In leagues/celery.py
celery.conf.worker_soft_shutdown_timeout = 60 # seconds
@shared_task(bind=True, name='scheduler.long_running_task')
def long_running_task(self, data_id: int):
"""Task that handles soft shutdown gracefully."""
from celery.exceptions import WorkerShuttingDown
for i in range(100):
try:
process_chunk(i)
except WorkerShuttingDown:
# Save checkpoint for resumption
save_checkpoint(data_id, i)
raise # Re-raise to allow re-queue
# Check if abort requested
if self.is_aborted():
return {'status': 'aborted', 'progress': i}
return {'status': 'complete'}
```
## Queue Routing Table
| Task Pattern | Queue | Timeout | Use Case |
|-------------|-------|---------|----------|
| `scheduler.*` | `celery` | 2h | General scheduling |
| `scheduler.simulations.*` | `q_sim` | 24h | Long simulations |
| `api.court.*` | `q_court` | 4h | Court optimization |
| `common.*` | `celery` | 30m | Utility tasks |
| `*.send_notification` | `subqueue` | 5m | Quick notifications |
## Examples
### Example 1: Simulation Task with Progress
```python
@shared_task(
bind=True,
name='scheduler.simulations.run_batch',
base=AbortableTask,
time_limit=86400, # 24 hours
soft_time_limit=85800, # 23h 50m
)
def run_simulation_batch(
self,
scenario_id: int,
num_iterations: int = 100,
random_seed: int = None,
):
"""Run batch simulation with progress tracking."""
from scheduler.models import Scenario
import random
if random_seed:
random.seed(random_seed)
scenario = Scenario.objects.get(pk=scenario_id)
results = []
for i in range(num_iterations):
if self.is_aborted():
return {
'status': 'aborted',
'completed': i,
'total': num_iterations,
}
# Update progress
progress = int((i / num_iterations) * 100)
self.update_state(
state='PROGRESS',
meta={
'progress': progress,
'current': i,
'total': num_iterations,
'status': f'Running iteration {i+1}/{num_iterations}',
}
)
# Run single iteration
result = run_single_simulation(scenario)
results.append(result)
return {
'status': 'success',
'iterations': num_iterations,
'best_score': max(r['score'] for r in results),
'avg_score': sum(r['score'] for r in results) / len(results),
}
```
### Example 2: Task with Telegram Notification
```python
@shared_task(bind=True, name='common.notify_completion')
def notify_completion(self, task_name: str, result: dict, user_id: int = None):
"""Send notification when task completes."""
from common.tasks import send_telegram_message
from common.models import User
message = f"Task '{task_name}' completed.\n"
message += f"Status: {result.get('status', 'unknown')}\n"
if 'score' in result:
message += f"Score: {result['score']}\n"
# Send to Telegram (project pattern)
send_telegram_message.delay(message)
# Also notify user if specified
if user_id:
try:
user = User.objects.get(pk=user_id)
from scheduler.helpers import notify
notify(user, 'Task Complete', message)
except User.DoesNotExist:
pass
return {'notified': True}
```
## Common Pitfalls
- **Passing model instances**: Always pass IDs, not model objects (they can't be serialized properly)
- **No abort checking**: Long tasks must check `self.is_aborted()` periodically
- **Missing transaction handling**: Database operations should use `transaction.atomic()`
- **Forgetting `bind=True`**: Required to access `self` for progress updates and abort checking
- **No soft time limit**: Always set `soft_time_limit` slightly less than `time_limit` for cleanup
- **Ignoring `acks_late`**: Set to `True` for critical tasks to prevent loss on worker crash
## Verification
1. Check task is registered: `celery -A leagues inspect registered`
2. Monitor with Flower: `celery -A leagues flower`
3. Test task manually:
```python
from scheduler.tasks import process_scenario
result = process_scenario.delay(scenario_id=1)
print(result.status, result.result)
```
4. Check queue routing: `celery -A leagues inspect active_queues`