# Example: Celery Task Patterns Skill This is a complete example of a Fullstack skill for Celery task development. --- ```yaml --- name: celery-task description: Creates Celery tasks with retry logic, error handling, and monitoring. Use when implementing async background tasks. argument-hint: [task-name] [task-type: simple|periodic|chain] allowed-tools: Read, Write, Edit, Glob, Grep --- # Celery Task Generator Generate production-ready Celery tasks with proper retry logic, error handling, result tracking, and monitoring. ## When to Use - Creating background tasks for long-running operations - Setting up periodic/scheduled tasks - Implementing task chains and workflows - Adding monitoring and observability to tasks ## Prerequisites - Celery installed and configured - Redis or RabbitMQ as message broker - Django-Celery-Beat for periodic tasks (optional) ## Instructions ### Step 1: Determine Task Type Based on `$ARGUMENTS`, select the appropriate pattern: - **simple**: One-off background task - **periodic**: Scheduled recurring task - **chain**: Multi-step workflow ### Step 2: Create Task Definition Generate task in `tasks.py`: ```python import logging from typing import Any from celery import shared_task from celery.exceptions import MaxRetriesExceededError from django.core.cache import cache logger = logging.getLogger(__name__) @shared_task( bind=True, name='app.process_data', max_retries=3, default_retry_delay=60, autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, retry_backoff_max=600, retry_jitter=True, acks_late=True, reject_on_worker_lost=True, time_limit=300, soft_time_limit=240, ) def process_data( self, data_id: int, options: dict[str, Any] | None = None, ) -> dict[str, Any]: """ Process data asynchronously. Args: data_id: ID of the data to process options: Optional processing options Returns: dict with processing results """ options = options or {} task_id = self.request.id # Idempotency check cache_key = f"task:{task_id}:completed" if cache.get(cache_key): logger.info(f"Task {task_id} already completed, skipping") return {"status": "skipped", "reason": "already_completed"} try: logger.info(f"Starting task {task_id} for data {data_id}") # Your processing logic here result = do_processing(data_id, options) # Mark as completed cache.set(cache_key, True, timeout=86400) logger.info(f"Task {task_id} completed successfully") return {"status": "success", "result": result} except (ConnectionError, TimeoutError) as exc: logger.warning(f"Task {task_id} failed with {exc}, will retry") raise # autoretry_for handles this except Exception as exc: logger.exception(f"Task {task_id} failed with unexpected error") try: self.retry(exc=exc, countdown=120) except MaxRetriesExceededError: logger.error(f"Task {task_id} max retries exceeded") return {"status": "failed", "error": str(exc)} ``` ### Step 3: Create Periodic Task (if needed) For scheduled tasks, add beat schedule in `celery.py`: ```python from celery.schedules import crontab app.conf.beat_schedule = { 'process-daily-reports': { 'task': 'app.generate_daily_report', 'schedule': crontab(hour=6, minute=0), 'options': {'expires': 3600}, }, 'cleanup-old-data': { 'task': 'app.cleanup_old_data', 'schedule': crontab(hour=2, minute=0, day_of_week='sunday'), 'args': (30,), # days to keep }, } ``` ### Step 4: Create Task Chain (if needed) For multi-step workflows: ```python from celery import chain, group, chord def create_processing_workflow(data_ids: list[int]) -> None: """Create a workflow that processes data in parallel then aggregates.""" workflow = chord( group(process_single.s(data_id) for data_id in data_ids), aggregate_results.s() ) workflow.apply_async() @shared_task def process_single(data_id: int) -> dict: """Process a single data item.""" return {"data_id": data_id, "processed": True} @shared_task def aggregate_results(results: list[dict]) -> dict: """Aggregate results from parallel processing.""" return { "total": len(results), "successful": sum(1 for r in results if r.get("processed")), } ``` ### Step 5: Add Progress Reporting For long-running tasks with progress updates: ```python @shared_task(bind=True) def long_running_task(self, items: list[int]) -> dict: """Task with progress reporting.""" total = len(items) for i, item in enumerate(items): # Process item process_item(item) # Update progress self.update_state( state='PROGRESS', meta={ 'current': i + 1, 'total': total, 'percent': int((i + 1) / total * 100), } ) return {'status': 'completed', 'processed': total} ``` ## Patterns & Best Practices ### Task Naming Convention Use descriptive, namespaced task names: ```python @shared_task(name='scheduler.scenarios.generate_schedule') def generate_schedule(scenario_id: int) -> dict: ... ``` ### Idempotency Always design tasks to be safely re-runnable: ```python def process_order(order_id: int) -> dict: order = Order.objects.get(id=order_id) if order.status == 'processed': return {'status': 'already_processed'} # ... process order ``` ### Result Expiration Set appropriate result expiration: ```python app.conf.result_expires = 86400 # 24 hours ``` ### Dead Letter Queue Handle permanently failed tasks: ```python @shared_task(bind=True) def task_with_dlq(self, data): try: process(data) except MaxRetriesExceededError: # Send to dead letter queue dead_letter_task.delay(self.name, data, self.request.id) raise ``` ## Monitoring ### Prometheus Metrics Add task metrics: ```python from prometheus_client import Counter, Histogram TASK_COUNTER = Counter( 'celery_task_total', 'Total Celery tasks', ['task_name', 'status'] ) TASK_DURATION = Histogram( 'celery_task_duration_seconds', 'Task duration in seconds', ['task_name'] ) ``` ### Logging Use structured logging: ```python logger.info( "Task completed", extra={ "task_id": self.request.id, "task_name": self.name, "duration": duration, "result_status": result["status"], } ) ``` ## Common Pitfalls - **No Timeout**: Always set `time_limit` and `soft_time_limit` - **Missing Idempotency**: Tasks may run multiple times - **Large Arguments**: Don't pass large objects; pass IDs instead - **No Error Handling**: Always handle exceptions gracefully - **Blocking Operations**: Use async I/O where possible ## Verification ```bash # Send task celery -A config call app.process_data --args='[123]' # Monitor tasks celery -A config inspect active celery -A config inspect reserved # Check results (if using Redis) redis-cli GET celery-task-meta- ``` ```