--- name: lp-celery-task description: Creates Celery 5.5 tasks for league-planner with AbortableTask, progress tracking via taskmanager, queue routing, and retry strategies. Use for async/background tasks. argument-hint: [queue] allowed-tools: Read, Write, Edit, Glob, Grep --- # League-Planner Celery Task Generator Creates production-ready Celery tasks following league-planner patterns: AbortableTask base, progress tracking with taskmanager.Task model, proper queue routing, and robust retry strategies. ## When to Use - Creating long-running background tasks (optimization, simulations) - Implementing async operations triggered by API or UI - Setting up periodic/scheduled tasks - Building task chains or workflows ## Prerequisites - Celery is configured in `leagues/celery.py` - Redis broker is available - taskmanager app is installed for progress tracking - Task queues are defined (celery, q_sim, q_court, subqueue) ## Instructions ### Step 1: Define Task Create task in the appropriate location: - `common/tasks.py` - General utility tasks - `scheduler/simulations/tasks.py` - Simulation tasks - `{app}/tasks.py` - App-specific tasks ```python from celery import shared_task from celery.contrib.abortable import AbortableTask from celery.exceptions import SoftTimeLimitExceeded from django.db import transaction from taskmanager.models import Task as TaskRecord @shared_task( bind=True, name='scheduler.process_scenario', base=AbortableTask, max_retries=3, default_retry_delay=60, autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, retry_backoff_max=600, time_limit=3600, # Hard limit: 1 hour soft_time_limit=3300, # Soft limit: 55 minutes (allows cleanup) acks_late=True, reject_on_worker_lost=True, ) def process_scenario(self, scenario_id: int, user_id: int = None, options: dict = None): """ Process a scenario with optimization. Args: scenario_id: ID of the scenario to process user_id: Optional user ID for notifications options: Optional configuration dict Returns: dict: Result with status and details """ options = options or {} # Create task record for tracking task_record = TaskRecord.objects.create( task_id=self.request.id, task_name='scheduler.process_scenario', scenario_id=scenario_id, user_id=user_id, queue=self.request.delivery_info.get('routing_key', 'celery'), host_name=self.request.hostname, worker=self.request.hostname, ) try: # Update progress self.update_state( state='PROGRESS', meta={'progress': 0, 'status': 'Starting...'} ) task_record.update_progress(0, 'Starting...') # Check for abort signal periodically if self.is_aborted(): return {'status': 'aborted', 'scenario_id': scenario_id} # Main processing logic from scheduler.models import Scenario scenario = Scenario.objects.select_related('season').get(pk=scenario_id) # Step 1: Prepare data (20%) self.update_state( state='PROGRESS', meta={'progress': 20, 'status': 'Preparing data...'} ) task_record.update_progress(20, 'Preparing data...') data = prepare_scenario_data(scenario) if self.is_aborted(): return {'status': 'aborted', 'scenario_id': scenario_id} # Step 2: Run optimization (20-80%) self.update_state( state='PROGRESS', meta={'progress': 40, 'status': 'Running optimization...'} ) task_record.update_progress(40, 'Running optimization...') result = run_optimization( data, progress_callback=lambda p, s: ( self.update_state(state='PROGRESS', meta={'progress': 20 + int(p * 0.6), 'status': s}), task_record.update_progress(20 + int(p * 0.6), s) ), abort_check=self.is_aborted, ) if self.is_aborted(): return {'status': 'aborted', 'scenario_id': scenario_id} # Step 3: Save results (80-100%) self.update_state( state='PROGRESS', meta={'progress': 90, 'status': 'Saving results...'} ) task_record.update_progress(90, 'Saving results...') with transaction.atomic(): save_optimization_results(scenario, result) # Complete self.update_state( state='SUCCESS', meta={'progress': 100, 'status': 'Complete'} ) task_record.update_progress(100, 'Complete') task_record.mark_completed() return { 'status': 'success', 'scenario_id': scenario_id, 'result': result.summary(), } except SoftTimeLimitExceeded: # Graceful handling of time limit task_record.update_progress(-1, 'Time limit exceeded') return { 'status': 'timeout', 'scenario_id': scenario_id, 'message': 'Task exceeded time limit', } except self.MaxRetriesExceededError: task_record.update_progress(-1, 'Max retries exceeded') raise except Exception as exc: task_record.update_progress(-1, f'Error: {str(exc)}') # Re-raise for Celery's error handling raise def prepare_scenario_data(scenario): """Prepare data for optimization.""" # Implementation pass def run_optimization(data, progress_callback, abort_check): """Run the optimization algorithm.""" # Implementation with progress reporting pass def save_optimization_results(scenario, result): """Save optimization results to database.""" # Implementation pass ``` ### Step 2: Register Task and Configure Queue In `leagues/celery.py` add queue routing: ```python from celery import Celery celery = Celery('leagues') celery.conf.task_routes = { # Simulation tasks to dedicated queue 'scheduler.simulations.*': {'queue': 'q_sim'}, 'scheduler.process_scenario': {'queue': 'q_sim'}, # Court optimization tasks 'api.court.*': {'queue': 'q_court'}, # Default queue for everything else '*': {'queue': 'celery'}, } celery.conf.task_queues = { 'celery': {'exchange': 'celery', 'routing_key': 'celery'}, 'q_sim': {'exchange': 'q_sim', 'routing_key': 'q_sim'}, 'q_court': {'exchange': 'q_court', 'routing_key': 'q_court'}, 'subqueue': {'exchange': 'subqueue', 'routing_key': 'subqueue'}, } ``` ### Step 3: Add Progress Tracking Model Methods The taskmanager.Task model provides these methods: ```python # In taskmanager/models.py (already exists) class Task(models.Model): task_id = models.CharField(max_length=255, unique=True) task_name = models.CharField(max_length=255) scenario_id = models.IntegerField(null=True, blank=True) user_id = models.IntegerField(null=True, blank=True) queue = models.CharField(max_length=100, default='celery') host_name = models.CharField(max_length=255, null=True) worker = models.CharField(max_length=255, null=True) progress = models.IntegerField(default=0) status_message = models.CharField(max_length=500, null=True) created_at = models.DateTimeField(auto_now_add=True) completed_at = models.DateTimeField(null=True) def update_progress(self, progress: int, message: str = None): """Update task progress.""" self.progress = progress if message: self.status_message = message self.save(update_fields=['progress', 'status_message']) def mark_completed(self): """Mark task as completed.""" from django.utils import timezone self.completed_at = timezone.now() self.progress = 100 self.save(update_fields=['completed_at', 'progress']) def is_running(self) -> bool: """Check if task is still running.""" from celery.result import AsyncResult result = AsyncResult(self.task_id) return result.state in ('PENDING', 'STARTED', 'PROGRESS', 'RETRY') def get_status(self) -> dict: """Get current task status.""" from celery.result import AsyncResult result = AsyncResult(self.task_id) return { 'state': result.state, 'progress': self.progress, 'message': self.status_message, 'result': result.result if result.ready() else None, } def revoke(self, terminate: bool = False): """Cancel/abort the task.""" from celery.contrib.abortable import AbortableAsyncResult result = AbortableAsyncResult(self.task_id) result.abort() if terminate: result.revoke(terminate=True) ``` ## Patterns & Best Practices ### Task Chain Pattern ```python from celery import chain, group, chord def run_simulation_workflow(scenario_id: int, iterations: int = 10): """Run a complete simulation workflow.""" workflow = chain( # Step 1: Prepare prepare_simulation.s(scenario_id), # Step 2: Run iterations in parallel group( run_iteration.s(i) for i in range(iterations) ), # Step 3: Aggregate results aggregate_results.s(scenario_id), # Step 4: Cleanup cleanup_simulation.s(), ) return workflow.apply_async() @shared_task(bind=True, name='scheduler.prepare_simulation') def prepare_simulation(self, scenario_id: int): """Prepare simulation data.""" # Returns data passed to next task return {'scenario_id': scenario_id, 'prepared': True} @shared_task(bind=True, name='scheduler.run_iteration') def run_iteration(self, preparation_data: dict, iteration: int): """Run single simulation iteration.""" scenario_id = preparation_data['scenario_id'] # Run iteration logic return {'iteration': iteration, 'score': calculate_score()} @shared_task(bind=True, name='scheduler.aggregate_results') def aggregate_results(self, iteration_results: list, scenario_id: int): """Aggregate results from all iterations.""" scores = [r['score'] for r in iteration_results] return { 'scenario_id': scenario_id, 'avg_score': sum(scores) / len(scores), 'best_score': max(scores), } ``` ### Periodic Task Pattern ```python from celery.schedules import crontab celery.conf.beat_schedule = { # Daily cleanup at 2 AM 'cleanup-old-tasks': { 'task': 'taskmanager.cleanup_old_tasks', 'schedule': crontab(hour=2, minute=0), 'args': (30,), # Days to keep }, # Every 5 minutes: check stuck tasks 'check-stuck-tasks': { 'task': 'taskmanager.check_stuck_tasks', 'schedule': 300, # seconds }, # Weekly report on Mondays at 8 AM 'weekly-report': { 'task': 'scheduler.generate_weekly_report', 'schedule': crontab(day_of_week='monday', hour=8, minute=0), }, } @shared_task(name='taskmanager.cleanup_old_tasks') def cleanup_old_tasks(days_to_keep: int = 30): """Clean up old completed tasks.""" from django.utils import timezone from datetime import timedelta cutoff = timezone.now() - timedelta(days=days_to_keep) deleted, _ = TaskRecord.objects.filter( completed_at__lt=cutoff ).delete() return {'deleted': deleted} ``` ### Idempotent Task Pattern ```python @shared_task( bind=True, name='scheduler.idempotent_update', autoretry_for=(Exception,), max_retries=5, ) def idempotent_update(self, scenario_id: int, version: int): """ Idempotent task - safe to retry. Uses optimistic locking via version field. """ from scheduler.models import Scenario from django.db import transaction with transaction.atomic(): scenario = Scenario.objects.select_for_update().get(pk=scenario_id) # Check version to prevent duplicate processing if scenario.version != version: return { 'status': 'skipped', 'reason': 'Version mismatch - already processed', } # Process result = do_processing(scenario) # Increment version scenario.version = version + 1 scenario.save(update_fields=['version']) return {'status': 'success', 'new_version': version + 1} ``` ### Django Transaction Integration (Celery 5.4+) ```python from django.db import transaction def create_scenario_and_optimize(data: dict): """ Create scenario and trigger optimization only after commit. Uses Django's on_commit to ensure task is sent only after the transaction is committed successfully. """ with transaction.atomic(): scenario = Scenario.objects.create(**data) # Task will only be sent if transaction commits transaction.on_commit( lambda: process_scenario.delay(scenario.id) ) return scenario ``` ### Soft Shutdown Handling (Celery 5.5+) ```python # In leagues/celery.py celery.conf.worker_soft_shutdown_timeout = 60 # seconds @shared_task(bind=True, name='scheduler.long_running_task') def long_running_task(self, data_id: int): """Task that handles soft shutdown gracefully.""" from celery.exceptions import WorkerShuttingDown for i in range(100): try: process_chunk(i) except WorkerShuttingDown: # Save checkpoint for resumption save_checkpoint(data_id, i) raise # Re-raise to allow re-queue # Check if abort requested if self.is_aborted(): return {'status': 'aborted', 'progress': i} return {'status': 'complete'} ``` ## Queue Routing Table | Task Pattern | Queue | Timeout | Use Case | |-------------|-------|---------|----------| | `scheduler.*` | `celery` | 2h | General scheduling | | `scheduler.simulations.*` | `q_sim` | 24h | Long simulations | | `api.court.*` | `q_court` | 4h | Court optimization | | `common.*` | `celery` | 30m | Utility tasks | | `*.send_notification` | `subqueue` | 5m | Quick notifications | ## Examples ### Example 1: Simulation Task with Progress ```python @shared_task( bind=True, name='scheduler.simulations.run_batch', base=AbortableTask, time_limit=86400, # 24 hours soft_time_limit=85800, # 23h 50m ) def run_simulation_batch( self, scenario_id: int, num_iterations: int = 100, random_seed: int = None, ): """Run batch simulation with progress tracking.""" from scheduler.models import Scenario import random if random_seed: random.seed(random_seed) scenario = Scenario.objects.get(pk=scenario_id) results = [] for i in range(num_iterations): if self.is_aborted(): return { 'status': 'aborted', 'completed': i, 'total': num_iterations, } # Update progress progress = int((i / num_iterations) * 100) self.update_state( state='PROGRESS', meta={ 'progress': progress, 'current': i, 'total': num_iterations, 'status': f'Running iteration {i+1}/{num_iterations}', } ) # Run single iteration result = run_single_simulation(scenario) results.append(result) return { 'status': 'success', 'iterations': num_iterations, 'best_score': max(r['score'] for r in results), 'avg_score': sum(r['score'] for r in results) / len(results), } ``` ### Example 2: Task with Telegram Notification ```python @shared_task(bind=True, name='common.notify_completion') def notify_completion(self, task_name: str, result: dict, user_id: int = None): """Send notification when task completes.""" from common.tasks import send_telegram_message from common.models import User message = f"Task '{task_name}' completed.\n" message += f"Status: {result.get('status', 'unknown')}\n" if 'score' in result: message += f"Score: {result['score']}\n" # Send to Telegram (project pattern) send_telegram_message.delay(message) # Also notify user if specified if user_id: try: user = User.objects.get(pk=user_id) from scheduler.helpers import notify notify(user, 'Task Complete', message) except User.DoesNotExist: pass return {'notified': True} ``` ## Common Pitfalls - **Passing model instances**: Always pass IDs, not model objects (they can't be serialized properly) - **No abort checking**: Long tasks must check `self.is_aborted()` periodically - **Missing transaction handling**: Database operations should use `transaction.atomic()` - **Forgetting `bind=True`**: Required to access `self` for progress updates and abort checking - **No soft time limit**: Always set `soft_time_limit` slightly less than `time_limit` for cleanup - **Ignoring `acks_late`**: Set to `True` for critical tasks to prevent loss on worker crash ## Verification 1. Check task is registered: `celery -A leagues inspect registered` 2. Monitor with Flower: `celery -A leagues flower` 3. Test task manually: ```python from scheduler.tasks import process_scenario result = process_scenario.delay(scenario_id=1) print(result.status, result.result) ``` 4. Check queue routing: `celery -A leagues inspect active_queues`