7.1 KiB
7.1 KiB
Example: Celery Task Patterns Skill
This is a complete example of a Fullstack skill for Celery task development.
---
name: celery-task
description: Creates Celery tasks with retry logic, error handling, and monitoring. Use when implementing async background tasks.
argument-hint: [task-name] [task-type: simple|periodic|chain]
allowed-tools: Read, Write, Edit, Glob, Grep
---
# Celery Task Generator
Generate production-ready Celery tasks with proper retry logic, error handling, result tracking, and monitoring.
## When to Use
- Creating background tasks for long-running operations
- Setting up periodic/scheduled tasks
- Implementing task chains and workflows
- Adding monitoring and observability to tasks
## Prerequisites
- Celery installed and configured
- Redis or RabbitMQ as message broker
- Django-Celery-Beat for periodic tasks (optional)
## Instructions
### Step 1: Determine Task Type
Based on `$ARGUMENTS`, select the appropriate pattern:
- **simple**: One-off background task
- **periodic**: Scheduled recurring task
- **chain**: Multi-step workflow
### Step 2: Create Task Definition
Generate task in `tasks.py`:
```python
import logging
from typing import Any
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
from django.core.cache import cache
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
name='app.process_data',
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
acks_late=True,
reject_on_worker_lost=True,
time_limit=300,
soft_time_limit=240,
)
def process_data(
self,
data_id: int,
options: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Process data asynchronously.
Args:
data_id: ID of the data to process
options: Optional processing options
Returns:
dict with processing results
"""
options = options or {}
task_id = self.request.id
# Idempotency check
cache_key = f"task:{task_id}:completed"
if cache.get(cache_key):
logger.info(f"Task {task_id} already completed, skipping")
return {"status": "skipped", "reason": "already_completed"}
try:
logger.info(f"Starting task {task_id} for data {data_id}")
# Your processing logic here
result = do_processing(data_id, options)
# Mark as completed
cache.set(cache_key, True, timeout=86400)
logger.info(f"Task {task_id} completed successfully")
return {"status": "success", "result": result}
except (ConnectionError, TimeoutError) as exc:
logger.warning(f"Task {task_id} failed with {exc}, will retry")
raise # autoretry_for handles this
except Exception as exc:
logger.exception(f"Task {task_id} failed with unexpected error")
try:
self.retry(exc=exc, countdown=120)
except MaxRetriesExceededError:
logger.error(f"Task {task_id} max retries exceeded")
return {"status": "failed", "error": str(exc)}
Step 3: Create Periodic Task (if needed)
For scheduled tasks, add beat schedule in celery.py:
from celery.schedules import crontab
app.conf.beat_schedule = {
'process-daily-reports': {
'task': 'app.generate_daily_report',
'schedule': crontab(hour=6, minute=0),
'options': {'expires': 3600},
},
'cleanup-old-data': {
'task': 'app.cleanup_old_data',
'schedule': crontab(hour=2, minute=0, day_of_week='sunday'),
'args': (30,), # days to keep
},
}
Step 4: Create Task Chain (if needed)
For multi-step workflows:
from celery import chain, group, chord
def create_processing_workflow(data_ids: list[int]) -> None:
"""Create a workflow that processes data in parallel then aggregates."""
workflow = chord(
group(process_single.s(data_id) for data_id in data_ids),
aggregate_results.s()
)
workflow.apply_async()
@shared_task
def process_single(data_id: int) -> dict:
"""Process a single data item."""
return {"data_id": data_id, "processed": True}
@shared_task
def aggregate_results(results: list[dict]) -> dict:
"""Aggregate results from parallel processing."""
return {
"total": len(results),
"successful": sum(1 for r in results if r.get("processed")),
}
Step 5: Add Progress Reporting
For long-running tasks with progress updates:
@shared_task(bind=True)
def long_running_task(self, items: list[int]) -> dict:
"""Task with progress reporting."""
total = len(items)
for i, item in enumerate(items):
# Process item
process_item(item)
# Update progress
self.update_state(
state='PROGRESS',
meta={
'current': i + 1,
'total': total,
'percent': int((i + 1) / total * 100),
}
)
return {'status': 'completed', 'processed': total}
Patterns & Best Practices
Task Naming Convention
Use descriptive, namespaced task names:
@shared_task(name='scheduler.scenarios.generate_schedule')
def generate_schedule(scenario_id: int) -> dict:
...
Idempotency
Always design tasks to be safely re-runnable:
def process_order(order_id: int) -> dict:
order = Order.objects.get(id=order_id)
if order.status == 'processed':
return {'status': 'already_processed'}
# ... process order
Result Expiration
Set appropriate result expiration:
app.conf.result_expires = 86400 # 24 hours
Dead Letter Queue
Handle permanently failed tasks:
@shared_task(bind=True)
def task_with_dlq(self, data):
try:
process(data)
except MaxRetriesExceededError:
# Send to dead letter queue
dead_letter_task.delay(self.name, data, self.request.id)
raise
Monitoring
Prometheus Metrics
Add task metrics:
from prometheus_client import Counter, Histogram
TASK_COUNTER = Counter(
'celery_task_total',
'Total Celery tasks',
['task_name', 'status']
)
TASK_DURATION = Histogram(
'celery_task_duration_seconds',
'Task duration in seconds',
['task_name']
)
Logging
Use structured logging:
logger.info(
"Task completed",
extra={
"task_id": self.request.id,
"task_name": self.name,
"duration": duration,
"result_status": result["status"],
}
)
Common Pitfalls
- No Timeout: Always set
time_limitandsoft_time_limit - Missing Idempotency: Tasks may run multiple times
- Large Arguments: Don't pass large objects; pass IDs instead
- No Error Handling: Always handle exceptions gracefully
- Blocking Operations: Use async I/O where possible
Verification
# Send task
celery -A config call app.process_data --args='[123]'
# Monitor tasks
celery -A config inspect active
celery -A config inspect reserved
# Check results (if using Redis)
redis-cli GET celery-task-meta-<task-id>