305 lines
7.1 KiB
Markdown
305 lines
7.1 KiB
Markdown
# Example: Celery Task Patterns Skill
|
|
|
|
This is a complete example of a Fullstack skill for Celery task development.
|
|
|
|
---
|
|
|
|
```yaml
|
|
---
|
|
name: celery-task
|
|
description: Creates Celery tasks with retry logic, error handling, and monitoring. Use when implementing async background tasks.
|
|
argument-hint: [task-name] [task-type: simple|periodic|chain]
|
|
allowed-tools: Read, Write, Edit, Glob, Grep
|
|
---
|
|
|
|
# Celery Task Generator
|
|
|
|
Generate production-ready Celery tasks with proper retry logic, error handling, result tracking, and monitoring.
|
|
|
|
## When to Use
|
|
|
|
- Creating background tasks for long-running operations
|
|
- Setting up periodic/scheduled tasks
|
|
- Implementing task chains and workflows
|
|
- Adding monitoring and observability to tasks
|
|
|
|
## Prerequisites
|
|
|
|
- Celery installed and configured
|
|
- Redis or RabbitMQ as message broker
|
|
- Django-Celery-Beat for periodic tasks (optional)
|
|
|
|
## Instructions
|
|
|
|
### Step 1: Determine Task Type
|
|
|
|
Based on `$ARGUMENTS`, select the appropriate pattern:
|
|
- **simple**: One-off background task
|
|
- **periodic**: Scheduled recurring task
|
|
- **chain**: Multi-step workflow
|
|
|
|
### Step 2: Create Task Definition
|
|
|
|
Generate task in `tasks.py`:
|
|
|
|
```python
|
|
import logging
|
|
from typing import Any
|
|
|
|
from celery import shared_task
|
|
from celery.exceptions import MaxRetriesExceededError
|
|
from django.core.cache import cache
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task(
|
|
bind=True,
|
|
name='app.process_data',
|
|
max_retries=3,
|
|
default_retry_delay=60,
|
|
autoretry_for=(ConnectionError, TimeoutError),
|
|
retry_backoff=True,
|
|
retry_backoff_max=600,
|
|
retry_jitter=True,
|
|
acks_late=True,
|
|
reject_on_worker_lost=True,
|
|
time_limit=300,
|
|
soft_time_limit=240,
|
|
)
|
|
def process_data(
|
|
self,
|
|
data_id: int,
|
|
options: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Process data asynchronously.
|
|
|
|
Args:
|
|
data_id: ID of the data to process
|
|
options: Optional processing options
|
|
|
|
Returns:
|
|
dict with processing results
|
|
"""
|
|
options = options or {}
|
|
task_id = self.request.id
|
|
|
|
# Idempotency check
|
|
cache_key = f"task:{task_id}:completed"
|
|
if cache.get(cache_key):
|
|
logger.info(f"Task {task_id} already completed, skipping")
|
|
return {"status": "skipped", "reason": "already_completed"}
|
|
|
|
try:
|
|
logger.info(f"Starting task {task_id} for data {data_id}")
|
|
|
|
# Your processing logic here
|
|
result = do_processing(data_id, options)
|
|
|
|
# Mark as completed
|
|
cache.set(cache_key, True, timeout=86400)
|
|
|
|
logger.info(f"Task {task_id} completed successfully")
|
|
return {"status": "success", "result": result}
|
|
|
|
except (ConnectionError, TimeoutError) as exc:
|
|
logger.warning(f"Task {task_id} failed with {exc}, will retry")
|
|
raise # autoretry_for handles this
|
|
|
|
except Exception as exc:
|
|
logger.exception(f"Task {task_id} failed with unexpected error")
|
|
try:
|
|
self.retry(exc=exc, countdown=120)
|
|
except MaxRetriesExceededError:
|
|
logger.error(f"Task {task_id} max retries exceeded")
|
|
return {"status": "failed", "error": str(exc)}
|
|
```
|
|
|
|
### Step 3: Create Periodic Task (if needed)
|
|
|
|
For scheduled tasks, add beat schedule in `celery.py`:
|
|
|
|
```python
|
|
from celery.schedules import crontab
|
|
|
|
app.conf.beat_schedule = {
|
|
'process-daily-reports': {
|
|
'task': 'app.generate_daily_report',
|
|
'schedule': crontab(hour=6, minute=0),
|
|
'options': {'expires': 3600},
|
|
},
|
|
'cleanup-old-data': {
|
|
'task': 'app.cleanup_old_data',
|
|
'schedule': crontab(hour=2, minute=0, day_of_week='sunday'),
|
|
'args': (30,), # days to keep
|
|
},
|
|
}
|
|
```
|
|
|
|
### Step 4: Create Task Chain (if needed)
|
|
|
|
For multi-step workflows:
|
|
|
|
```python
|
|
from celery import chain, group, chord
|
|
|
|
def create_processing_workflow(data_ids: list[int]) -> None:
|
|
"""Create a workflow that processes data in parallel then aggregates."""
|
|
workflow = chord(
|
|
group(process_single.s(data_id) for data_id in data_ids),
|
|
aggregate_results.s()
|
|
)
|
|
workflow.apply_async()
|
|
|
|
|
|
@shared_task
|
|
def process_single(data_id: int) -> dict:
|
|
"""Process a single data item."""
|
|
return {"data_id": data_id, "processed": True}
|
|
|
|
|
|
@shared_task
|
|
def aggregate_results(results: list[dict]) -> dict:
|
|
"""Aggregate results from parallel processing."""
|
|
return {
|
|
"total": len(results),
|
|
"successful": sum(1 for r in results if r.get("processed")),
|
|
}
|
|
```
|
|
|
|
### Step 5: Add Progress Reporting
|
|
|
|
For long-running tasks with progress updates:
|
|
|
|
```python
|
|
@shared_task(bind=True)
|
|
def long_running_task(self, items: list[int]) -> dict:
|
|
"""Task with progress reporting."""
|
|
total = len(items)
|
|
|
|
for i, item in enumerate(items):
|
|
# Process item
|
|
process_item(item)
|
|
|
|
# Update progress
|
|
self.update_state(
|
|
state='PROGRESS',
|
|
meta={
|
|
'current': i + 1,
|
|
'total': total,
|
|
'percent': int((i + 1) / total * 100),
|
|
}
|
|
)
|
|
|
|
return {'status': 'completed', 'processed': total}
|
|
```
|
|
|
|
## Patterns & Best Practices
|
|
|
|
### Task Naming Convention
|
|
|
|
Use descriptive, namespaced task names:
|
|
|
|
```python
|
|
@shared_task(name='scheduler.scenarios.generate_schedule')
|
|
def generate_schedule(scenario_id: int) -> dict:
|
|
...
|
|
```
|
|
|
|
### Idempotency
|
|
|
|
Always design tasks to be safely re-runnable:
|
|
|
|
```python
|
|
def process_order(order_id: int) -> dict:
|
|
order = Order.objects.get(id=order_id)
|
|
if order.status == 'processed':
|
|
return {'status': 'already_processed'}
|
|
# ... process order
|
|
```
|
|
|
|
### Result Expiration
|
|
|
|
Set appropriate result expiration:
|
|
|
|
```python
|
|
app.conf.result_expires = 86400 # 24 hours
|
|
```
|
|
|
|
### Dead Letter Queue
|
|
|
|
Handle permanently failed tasks:
|
|
|
|
```python
|
|
@shared_task(bind=True)
|
|
def task_with_dlq(self, data):
|
|
try:
|
|
process(data)
|
|
except MaxRetriesExceededError:
|
|
# Send to dead letter queue
|
|
dead_letter_task.delay(self.name, data, self.request.id)
|
|
raise
|
|
```
|
|
|
|
## Monitoring
|
|
|
|
### Prometheus Metrics
|
|
|
|
Add task metrics:
|
|
|
|
```python
|
|
from prometheus_client import Counter, Histogram
|
|
|
|
TASK_COUNTER = Counter(
|
|
'celery_task_total',
|
|
'Total Celery tasks',
|
|
['task_name', 'status']
|
|
)
|
|
|
|
TASK_DURATION = Histogram(
|
|
'celery_task_duration_seconds',
|
|
'Task duration in seconds',
|
|
['task_name']
|
|
)
|
|
```
|
|
|
|
### Logging
|
|
|
|
Use structured logging:
|
|
|
|
```python
|
|
logger.info(
|
|
"Task completed",
|
|
extra={
|
|
"task_id": self.request.id,
|
|
"task_name": self.name,
|
|
"duration": duration,
|
|
"result_status": result["status"],
|
|
}
|
|
)
|
|
```
|
|
|
|
## Common Pitfalls
|
|
|
|
- **No Timeout**: Always set `time_limit` and `soft_time_limit`
|
|
- **Missing Idempotency**: Tasks may run multiple times
|
|
- **Large Arguments**: Don't pass large objects; pass IDs instead
|
|
- **No Error Handling**: Always handle exceptions gracefully
|
|
- **Blocking Operations**: Use async I/O where possible
|
|
|
|
## Verification
|
|
|
|
```bash
|
|
# Send task
|
|
celery -A config call app.process_data --args='[123]'
|
|
|
|
# Monitor tasks
|
|
celery -A config inspect active
|
|
celery -A config inspect reserved
|
|
|
|
# Check results (if using Redis)
|
|
redis-cli GET celery-task-meta-<task-id>
|
|
```
|
|
```
|