2026-02-04 15:49:25 +01:00

17 KiB

name description argument-hint allowed-tools
lp-celery-task Creates Celery 5.5 tasks for league-planner with AbortableTask, progress tracking via taskmanager, queue routing, and retry strategies. Use for async/background tasks. <task-name> [queue] Read, Write, Edit, Glob, Grep

League-Planner Celery Task Generator

Creates production-ready Celery tasks following league-planner patterns: AbortableTask base, progress tracking with taskmanager.Task model, proper queue routing, and robust retry strategies.

When to Use

  • Creating long-running background tasks (optimization, simulations)
  • Implementing async operations triggered by API or UI
  • Setting up periodic/scheduled tasks
  • Building task chains or workflows

Prerequisites

  • Celery is configured in leagues/celery.py
  • Redis broker is available
  • taskmanager app is installed for progress tracking
  • Task queues are defined (celery, q_sim, q_court, subqueue)

Instructions

Step 1: Define Task

Create task in the appropriate location:

  • common/tasks.py - General utility tasks
  • scheduler/simulations/tasks.py - Simulation tasks
  • {app}/tasks.py - App-specific tasks
from celery import shared_task
from celery.contrib.abortable import AbortableTask
from celery.exceptions import SoftTimeLimitExceeded
from django.db import transaction

from taskmanager.models import Task as TaskRecord


@shared_task(
    bind=True,
    name='scheduler.process_scenario',
    base=AbortableTask,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    retry_backoff_max=600,
    time_limit=3600,      # Hard limit: 1 hour
    soft_time_limit=3300,  # Soft limit: 55 minutes (allows cleanup)
    acks_late=True,
    reject_on_worker_lost=True,
)
def process_scenario(self, scenario_id: int, user_id: int = None, options: dict = None):
    """
    Process a scenario with optimization.

    Args:
        scenario_id: ID of the scenario to process
        user_id: Optional user ID for notifications
        options: Optional configuration dict

    Returns:
        dict: Result with status and details
    """
    options = options or {}

    # Create task record for tracking
    task_record = TaskRecord.objects.create(
        task_id=self.request.id,
        task_name='scheduler.process_scenario',
        scenario_id=scenario_id,
        user_id=user_id,
        queue=self.request.delivery_info.get('routing_key', 'celery'),
        host_name=self.request.hostname,
        worker=self.request.hostname,
    )

    try:
        # Update progress
        self.update_state(
            state='PROGRESS',
            meta={'progress': 0, 'status': 'Starting...'}
        )
        task_record.update_progress(0, 'Starting...')

        # Check for abort signal periodically
        if self.is_aborted():
            return {'status': 'aborted', 'scenario_id': scenario_id}

        # Main processing logic
        from scheduler.models import Scenario
        scenario = Scenario.objects.select_related('season').get(pk=scenario_id)

        # Step 1: Prepare data (20%)
        self.update_state(
            state='PROGRESS',
            meta={'progress': 20, 'status': 'Preparing data...'}
        )
        task_record.update_progress(20, 'Preparing data...')
        data = prepare_scenario_data(scenario)

        if self.is_aborted():
            return {'status': 'aborted', 'scenario_id': scenario_id}

        # Step 2: Run optimization (20-80%)
        self.update_state(
            state='PROGRESS',
            meta={'progress': 40, 'status': 'Running optimization...'}
        )
        task_record.update_progress(40, 'Running optimization...')

        result = run_optimization(
            data,
            progress_callback=lambda p, s: (
                self.update_state(state='PROGRESS', meta={'progress': 20 + int(p * 0.6), 'status': s}),
                task_record.update_progress(20 + int(p * 0.6), s)
            ),
            abort_check=self.is_aborted,
        )

        if self.is_aborted():
            return {'status': 'aborted', 'scenario_id': scenario_id}

        # Step 3: Save results (80-100%)
        self.update_state(
            state='PROGRESS',
            meta={'progress': 90, 'status': 'Saving results...'}
        )
        task_record.update_progress(90, 'Saving results...')

        with transaction.atomic():
            save_optimization_results(scenario, result)

        # Complete
        self.update_state(
            state='SUCCESS',
            meta={'progress': 100, 'status': 'Complete'}
        )
        task_record.update_progress(100, 'Complete')
        task_record.mark_completed()

        return {
            'status': 'success',
            'scenario_id': scenario_id,
            'result': result.summary(),
        }

    except SoftTimeLimitExceeded:
        # Graceful handling of time limit
        task_record.update_progress(-1, 'Time limit exceeded')
        return {
            'status': 'timeout',
            'scenario_id': scenario_id,
            'message': 'Task exceeded time limit',
        }

    except self.MaxRetriesExceededError:
        task_record.update_progress(-1, 'Max retries exceeded')
        raise

    except Exception as exc:
        task_record.update_progress(-1, f'Error: {str(exc)}')
        # Re-raise for Celery's error handling
        raise


def prepare_scenario_data(scenario):
    """Prepare data for optimization."""
    # Implementation
    pass


def run_optimization(data, progress_callback, abort_check):
    """Run the optimization algorithm."""
    # Implementation with progress reporting
    pass


def save_optimization_results(scenario, result):
    """Save optimization results to database."""
    # Implementation
    pass

Step 2: Register Task and Configure Queue

In leagues/celery.py add queue routing:

from celery import Celery

celery = Celery('leagues')

celery.conf.task_routes = {
    # Simulation tasks to dedicated queue
    'scheduler.simulations.*': {'queue': 'q_sim'},
    'scheduler.process_scenario': {'queue': 'q_sim'},

    # Court optimization tasks
    'api.court.*': {'queue': 'q_court'},

    # Default queue for everything else
    '*': {'queue': 'celery'},
}

celery.conf.task_queues = {
    'celery': {'exchange': 'celery', 'routing_key': 'celery'},
    'q_sim': {'exchange': 'q_sim', 'routing_key': 'q_sim'},
    'q_court': {'exchange': 'q_court', 'routing_key': 'q_court'},
    'subqueue': {'exchange': 'subqueue', 'routing_key': 'subqueue'},
}

Step 3: Add Progress Tracking Model Methods

The taskmanager.Task model provides these methods:

# In taskmanager/models.py (already exists)
class Task(models.Model):
    task_id = models.CharField(max_length=255, unique=True)
    task_name = models.CharField(max_length=255)
    scenario_id = models.IntegerField(null=True, blank=True)
    user_id = models.IntegerField(null=True, blank=True)
    queue = models.CharField(max_length=100, default='celery')
    host_name = models.CharField(max_length=255, null=True)
    worker = models.CharField(max_length=255, null=True)
    progress = models.IntegerField(default=0)
    status_message = models.CharField(max_length=500, null=True)
    created_at = models.DateTimeField(auto_now_add=True)
    completed_at = models.DateTimeField(null=True)

    def update_progress(self, progress: int, message: str = None):
        """Update task progress."""
        self.progress = progress
        if message:
            self.status_message = message
        self.save(update_fields=['progress', 'status_message'])

    def mark_completed(self):
        """Mark task as completed."""
        from django.utils import timezone
        self.completed_at = timezone.now()
        self.progress = 100
        self.save(update_fields=['completed_at', 'progress'])

    def is_running(self) -> bool:
        """Check if task is still running."""
        from celery.result import AsyncResult
        result = AsyncResult(self.task_id)
        return result.state in ('PENDING', 'STARTED', 'PROGRESS', 'RETRY')

    def get_status(self) -> dict:
        """Get current task status."""
        from celery.result import AsyncResult
        result = AsyncResult(self.task_id)
        return {
            'state': result.state,
            'progress': self.progress,
            'message': self.status_message,
            'result': result.result if result.ready() else None,
        }

    def revoke(self, terminate: bool = False):
        """Cancel/abort the task."""
        from celery.contrib.abortable import AbortableAsyncResult
        result = AbortableAsyncResult(self.task_id)
        result.abort()
        if terminate:
            result.revoke(terminate=True)

Patterns & Best Practices

Task Chain Pattern

from celery import chain, group, chord


def run_simulation_workflow(scenario_id: int, iterations: int = 10):
    """Run a complete simulation workflow."""
    workflow = chain(
        # Step 1: Prepare
        prepare_simulation.s(scenario_id),
        # Step 2: Run iterations in parallel
        group(
            run_iteration.s(i) for i in range(iterations)
        ),
        # Step 3: Aggregate results
        aggregate_results.s(scenario_id),
        # Step 4: Cleanup
        cleanup_simulation.s(),
    )
    return workflow.apply_async()


@shared_task(bind=True, name='scheduler.prepare_simulation')
def prepare_simulation(self, scenario_id: int):
    """Prepare simulation data."""
    # Returns data passed to next task
    return {'scenario_id': scenario_id, 'prepared': True}


@shared_task(bind=True, name='scheduler.run_iteration')
def run_iteration(self, preparation_data: dict, iteration: int):
    """Run single simulation iteration."""
    scenario_id = preparation_data['scenario_id']
    # Run iteration logic
    return {'iteration': iteration, 'score': calculate_score()}


@shared_task(bind=True, name='scheduler.aggregate_results')
def aggregate_results(self, iteration_results: list, scenario_id: int):
    """Aggregate results from all iterations."""
    scores = [r['score'] for r in iteration_results]
    return {
        'scenario_id': scenario_id,
        'avg_score': sum(scores) / len(scores),
        'best_score': max(scores),
    }

Periodic Task Pattern

from celery.schedules import crontab

celery.conf.beat_schedule = {
    # Daily cleanup at 2 AM
    'cleanup-old-tasks': {
        'task': 'taskmanager.cleanup_old_tasks',
        'schedule': crontab(hour=2, minute=0),
        'args': (30,),  # Days to keep
    },

    # Every 5 minutes: check stuck tasks
    'check-stuck-tasks': {
        'task': 'taskmanager.check_stuck_tasks',
        'schedule': 300,  # seconds
    },

    # Weekly report on Mondays at 8 AM
    'weekly-report': {
        'task': 'scheduler.generate_weekly_report',
        'schedule': crontab(day_of_week='monday', hour=8, minute=0),
    },
}


@shared_task(name='taskmanager.cleanup_old_tasks')
def cleanup_old_tasks(days_to_keep: int = 30):
    """Clean up old completed tasks."""
    from django.utils import timezone
    from datetime import timedelta

    cutoff = timezone.now() - timedelta(days=days_to_keep)
    deleted, _ = TaskRecord.objects.filter(
        completed_at__lt=cutoff
    ).delete()

    return {'deleted': deleted}

Idempotent Task Pattern

@shared_task(
    bind=True,
    name='scheduler.idempotent_update',
    autoretry_for=(Exception,),
    max_retries=5,
)
def idempotent_update(self, scenario_id: int, version: int):
    """
    Idempotent task - safe to retry.

    Uses optimistic locking via version field.
    """
    from scheduler.models import Scenario
    from django.db import transaction

    with transaction.atomic():
        scenario = Scenario.objects.select_for_update().get(pk=scenario_id)

        # Check version to prevent duplicate processing
        if scenario.version != version:
            return {
                'status': 'skipped',
                'reason': 'Version mismatch - already processed',
            }

        # Process
        result = do_processing(scenario)

        # Increment version
        scenario.version = version + 1
        scenario.save(update_fields=['version'])

    return {'status': 'success', 'new_version': version + 1}

Django Transaction Integration (Celery 5.4+)

from django.db import transaction


def create_scenario_and_optimize(data: dict):
    """
    Create scenario and trigger optimization only after commit.

    Uses Django's on_commit to ensure task is sent only after
    the transaction is committed successfully.
    """
    with transaction.atomic():
        scenario = Scenario.objects.create(**data)

        # Task will only be sent if transaction commits
        transaction.on_commit(
            lambda: process_scenario.delay(scenario.id)
        )

    return scenario

Soft Shutdown Handling (Celery 5.5+)

# In leagues/celery.py
celery.conf.worker_soft_shutdown_timeout = 60  # seconds

@shared_task(bind=True, name='scheduler.long_running_task')
def long_running_task(self, data_id: int):
    """Task that handles soft shutdown gracefully."""
    from celery.exceptions import WorkerShuttingDown

    for i in range(100):
        try:
            process_chunk(i)
        except WorkerShuttingDown:
            # Save checkpoint for resumption
            save_checkpoint(data_id, i)
            raise  # Re-raise to allow re-queue

        # Check if abort requested
        if self.is_aborted():
            return {'status': 'aborted', 'progress': i}

    return {'status': 'complete'}

Queue Routing Table

Task Pattern Queue Timeout Use Case
scheduler.* celery 2h General scheduling
scheduler.simulations.* q_sim 24h Long simulations
api.court.* q_court 4h Court optimization
common.* celery 30m Utility tasks
*.send_notification subqueue 5m Quick notifications

Examples

Example 1: Simulation Task with Progress

@shared_task(
    bind=True,
    name='scheduler.simulations.run_batch',
    base=AbortableTask,
    time_limit=86400,  # 24 hours
    soft_time_limit=85800,  # 23h 50m
)
def run_simulation_batch(
    self,
    scenario_id: int,
    num_iterations: int = 100,
    random_seed: int = None,
):
    """Run batch simulation with progress tracking."""
    from scheduler.models import Scenario
    import random

    if random_seed:
        random.seed(random_seed)

    scenario = Scenario.objects.get(pk=scenario_id)
    results = []

    for i in range(num_iterations):
        if self.is_aborted():
            return {
                'status': 'aborted',
                'completed': i,
                'total': num_iterations,
            }

        # Update progress
        progress = int((i / num_iterations) * 100)
        self.update_state(
            state='PROGRESS',
            meta={
                'progress': progress,
                'current': i,
                'total': num_iterations,
                'status': f'Running iteration {i+1}/{num_iterations}',
            }
        )

        # Run single iteration
        result = run_single_simulation(scenario)
        results.append(result)

    return {
        'status': 'success',
        'iterations': num_iterations,
        'best_score': max(r['score'] for r in results),
        'avg_score': sum(r['score'] for r in results) / len(results),
    }

Example 2: Task with Telegram Notification

@shared_task(bind=True, name='common.notify_completion')
def notify_completion(self, task_name: str, result: dict, user_id: int = None):
    """Send notification when task completes."""
    from common.tasks import send_telegram_message
    from common.models import User

    message = f"Task '{task_name}' completed.\n"
    message += f"Status: {result.get('status', 'unknown')}\n"

    if 'score' in result:
        message += f"Score: {result['score']}\n"

    # Send to Telegram (project pattern)
    send_telegram_message.delay(message)

    # Also notify user if specified
    if user_id:
        try:
            user = User.objects.get(pk=user_id)
            from scheduler.helpers import notify
            notify(user, 'Task Complete', message)
        except User.DoesNotExist:
            pass

    return {'notified': True}

Common Pitfalls

  • Passing model instances: Always pass IDs, not model objects (they can't be serialized properly)
  • No abort checking: Long tasks must check self.is_aborted() periodically
  • Missing transaction handling: Database operations should use transaction.atomic()
  • Forgetting bind=True: Required to access self for progress updates and abort checking
  • No soft time limit: Always set soft_time_limit slightly less than time_limit for cleanup
  • Ignoring acks_late: Set to True for critical tasks to prevent loss on worker crash

Verification

  1. Check task is registered: celery -A leagues inspect registered
  2. Monitor with Flower: celery -A leagues flower
  3. Test task manually:
    from scheduler.tasks import process_scenario
    result = process_scenario.delay(scenario_id=1)
    print(result.status, result.result)
    
  4. Check queue routing: celery -A leagues inspect active_queues