2026-02-04 15:29:11 +01:00

7.1 KiB

Example: Celery Task Patterns Skill

This is a complete example of a Fullstack skill for Celery task development.


---
name: celery-task
description: Creates Celery tasks with retry logic, error handling, and monitoring. Use when implementing async background tasks.
argument-hint: [task-name] [task-type: simple|periodic|chain]
allowed-tools: Read, Write, Edit, Glob, Grep
---

# Celery Task Generator

Generate production-ready Celery tasks with proper retry logic, error handling, result tracking, and monitoring.

## When to Use

- Creating background tasks for long-running operations
- Setting up periodic/scheduled tasks
- Implementing task chains and workflows
- Adding monitoring and observability to tasks

## Prerequisites

- Celery installed and configured
- Redis or RabbitMQ as message broker
- Django-Celery-Beat for periodic tasks (optional)

## Instructions

### Step 1: Determine Task Type

Based on `$ARGUMENTS`, select the appropriate pattern:
- **simple**: One-off background task
- **periodic**: Scheduled recurring task
- **chain**: Multi-step workflow

### Step 2: Create Task Definition

Generate task in `tasks.py`:

```python
import logging
from typing import Any

from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
from django.core.cache import cache

logger = logging.getLogger(__name__)


@shared_task(
    bind=True,
    name='app.process_data',
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
    acks_late=True,
    reject_on_worker_lost=True,
    time_limit=300,
    soft_time_limit=240,
)
def process_data(
    self,
    data_id: int,
    options: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """
    Process data asynchronously.

    Args:
        data_id: ID of the data to process
        options: Optional processing options

    Returns:
        dict with processing results
    """
    options = options or {}
    task_id = self.request.id

    # Idempotency check
    cache_key = f"task:{task_id}:completed"
    if cache.get(cache_key):
        logger.info(f"Task {task_id} already completed, skipping")
        return {"status": "skipped", "reason": "already_completed"}

    try:
        logger.info(f"Starting task {task_id} for data {data_id}")

        # Your processing logic here
        result = do_processing(data_id, options)

        # Mark as completed
        cache.set(cache_key, True, timeout=86400)

        logger.info(f"Task {task_id} completed successfully")
        return {"status": "success", "result": result}

    except (ConnectionError, TimeoutError) as exc:
        logger.warning(f"Task {task_id} failed with {exc}, will retry")
        raise  # autoretry_for handles this

    except Exception as exc:
        logger.exception(f"Task {task_id} failed with unexpected error")
        try:
            self.retry(exc=exc, countdown=120)
        except MaxRetriesExceededError:
            logger.error(f"Task {task_id} max retries exceeded")
            return {"status": "failed", "error": str(exc)}

Step 3: Create Periodic Task (if needed)

For scheduled tasks, add beat schedule in celery.py:

from celery.schedules import crontab

app.conf.beat_schedule = {
    'process-daily-reports': {
        'task': 'app.generate_daily_report',
        'schedule': crontab(hour=6, minute=0),
        'options': {'expires': 3600},
    },
    'cleanup-old-data': {
        'task': 'app.cleanup_old_data',
        'schedule': crontab(hour=2, minute=0, day_of_week='sunday'),
        'args': (30,),  # days to keep
    },
}

Step 4: Create Task Chain (if needed)

For multi-step workflows:

from celery import chain, group, chord

def create_processing_workflow(data_ids: list[int]) -> None:
    """Create a workflow that processes data in parallel then aggregates."""
    workflow = chord(
        group(process_single.s(data_id) for data_id in data_ids),
        aggregate_results.s()
    )
    workflow.apply_async()


@shared_task
def process_single(data_id: int) -> dict:
    """Process a single data item."""
    return {"data_id": data_id, "processed": True}


@shared_task
def aggregate_results(results: list[dict]) -> dict:
    """Aggregate results from parallel processing."""
    return {
        "total": len(results),
        "successful": sum(1 for r in results if r.get("processed")),
    }

Step 5: Add Progress Reporting

For long-running tasks with progress updates:

@shared_task(bind=True)
def long_running_task(self, items: list[int]) -> dict:
    """Task with progress reporting."""
    total = len(items)

    for i, item in enumerate(items):
        # Process item
        process_item(item)

        # Update progress
        self.update_state(
            state='PROGRESS',
            meta={
                'current': i + 1,
                'total': total,
                'percent': int((i + 1) / total * 100),
            }
        )

    return {'status': 'completed', 'processed': total}

Patterns & Best Practices

Task Naming Convention

Use descriptive, namespaced task names:

@shared_task(name='scheduler.scenarios.generate_schedule')
def generate_schedule(scenario_id: int) -> dict:
    ...

Idempotency

Always design tasks to be safely re-runnable:

def process_order(order_id: int) -> dict:
    order = Order.objects.get(id=order_id)
    if order.status == 'processed':
        return {'status': 'already_processed'}
    # ... process order

Result Expiration

Set appropriate result expiration:

app.conf.result_expires = 86400  # 24 hours

Dead Letter Queue

Handle permanently failed tasks:

@shared_task(bind=True)
def task_with_dlq(self, data):
    try:
        process(data)
    except MaxRetriesExceededError:
        # Send to dead letter queue
        dead_letter_task.delay(self.name, data, self.request.id)
        raise

Monitoring

Prometheus Metrics

Add task metrics:

from prometheus_client import Counter, Histogram

TASK_COUNTER = Counter(
    'celery_task_total',
    'Total Celery tasks',
    ['task_name', 'status']
)

TASK_DURATION = Histogram(
    'celery_task_duration_seconds',
    'Task duration in seconds',
    ['task_name']
)

Logging

Use structured logging:

logger.info(
    "Task completed",
    extra={
        "task_id": self.request.id,
        "task_name": self.name,
        "duration": duration,
        "result_status": result["status"],
    }
)

Common Pitfalls

  • No Timeout: Always set time_limit and soft_time_limit
  • Missing Idempotency: Tasks may run multiple times
  • Large Arguments: Don't pass large objects; pass IDs instead
  • No Error Handling: Always handle exceptions gracefully
  • Blocking Operations: Use async I/O where possible

Verification

# Send task
celery -A config call app.process_data --args='[123]'

# Monitor tasks
celery -A config inspect active
celery -A config inspect reserved

# Check results (if using Redis)
redis-cli GET celery-task-meta-<task-id>