
Celery Monitoring Guide: Track Your Python Background Jobs
Complete guide to monitoring Celery tasks in production: Flower dashboard, Prometheus metrics, django-celery-results, custom monitoring, and framework-native solutions. Real Python code examples and best practices.
Celery is the de facto standard for background task processing in Python applications—asynchronous task queues, scheduled jobs with Celery Beat, distributed processing across workers. But in production, distributed systems fail in distributed ways.
Workers crash. Brokers (Redis, RabbitMQ) become unavailable. Tasks get stuck. Beat schedules drift. And when Celery tasks fail silently, your application keeps running while critical background processes stop—payments don't process, emails don't send, data synchronization halts.
This guide covers production-grade monitoring strategies for Celery, from Flower dashboards to Prometheus metrics, custom signals, and framework-native monitoring.
How Celery Architecture Works
Before diving into monitoring, let's review Celery's distributed architecture.
Core Components
1. Celery Workers Process tasks from queues. Multiple workers can run in parallel.
# Start a worker
celery -A myapp worker --loglevel=info
2. Message Broker Stores tasks waiting to be processed (Redis, RabbitMQ, etc.)
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
3. Result Backend Stores task results and state (optional but recommended for monitoring)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
4. Celery Beat Scheduler for periodic tasks (like cron)
# Start the beat scheduler
celery -A myapp beat --loglevel=info
Task Types
from celery import shared_task
# Regular async task
@shared_task
def send_email(user_id):
user = User.objects.get(id=user_id)
send_email_to(user.email)
# Scheduled periodic task
from celery.schedules import crontab
app.conf.beat_schedule = {
'backup-every-night': {
'task': 'tasks.backup_database',
'schedule': crontab(hour=2, minute=0),
},
'process-payments-every-5-minutes': {
'task': 'tasks.process_payments',
'schedule': 300.0, # Every 5 minutes
},
}
Why Monitoring is Critical
Celery's distributed nature creates failure modes that don't exist in synchronous code:
- Worker failures - Process crashes, OOM kills, hardware failures
- Broker issues - Redis/RabbitMQ downtime, connection timeouts
- Task failures - Exceptions, timeouts, infinite loops
- Beat failures - Scheduler stops, schedules drift, duplicate tasks
- Queue buildup - Tasks arriving faster than workers process them
- Silent failures - Tasks complete with exceptions but no alerts
Without monitoring, these failures are invisible until customers complain.
Method 1: Flower (Real-Time Web Dashboard)
Flower is the most popular Celery monitoring tool—a real-time web dashboard for Celery clusters.
Installation
pip install flower
Starting Flower
# Basic
celery -A myapp flower
# With authentication
celery -A myapp flower --basic_auth=user:password
# Custom port
celery -A myapp flower --port=5555
Access dashboard at http://localhost:5555
Flower Features
Real-time monitoring:
- Active workers and their status
- Task execution rates (per second)
- Task success/failure counts
- Worker resource usage (CPU, memory)
- Queue lengths
- Task history and details
Task management:
- View running tasks
- Revoke/terminate tasks
- Retry failed tasks
- View task arguments and results
Programmatic Access
Flower provides an HTTP API:
import requests
# Get worker stats
response = requests.get('http://localhost:5555/api/workers')
workers = response.json()
for worker_name, worker_data in workers.items():
print(f"Worker: {worker_name}")
print(f"Status: {worker_data['status']}")
print(f"Active tasks: {len(worker_data.get('active', []))}")
# Get task info
task_id = 'abc-123-def-456'
response = requests.get(f'http://localhost:5555/api/task/info/{task_id}')
task_info = response.json()
Building Alerts with Flower API
import requests
from datetime import datetime, timedelta
class FlowerMonitor:
def __init__(self, flower_url='http://localhost:5555'):
self.base_url = flower_url
def get_failed_tasks(self, hours=24):
"""Get failed tasks in the last N hours"""
# Flower doesn't have a direct failed tasks endpoint
# You need to query task history and filter
response = requests.get(f'{self.base_url}/api/tasks')
tasks = response.json()
cutoff = datetime.now() - timedelta(hours=hours)
failed = []
for task_id, task_data in tasks.items():
if task_data.get('state') == 'FAILURE':
timestamp = task_data.get('timestamp')
if timestamp and datetime.fromtimestamp(timestamp) > cutoff:
failed.append({
'id': task_id,
'name': task_data.get('name'),
'exception': task_data.get('exception'),
'timestamp': timestamp
})
return failed
def check_worker_health(self):
"""Check if workers are active"""
response = requests.get(f'{self.base_url}/api/workers')
workers = response.json()
offline_workers = []
for worker_name, worker_data in workers.items():
if not worker_data.get('status'):
offline_workers.append(worker_name)
return offline_workers
def get_queue_lengths(self):
"""Get current queue lengths"""
response = requests.get(f'{self.base_url}/api/workers')
workers = response.json()
# Queue lengths vary by broker
# For Redis, query broker directly
# This is a simplified example
queue_stats = {}
for worker_name, worker_data in workers.items():
active = len(worker_data.get('active', []))
queue_stats[worker_name] = active
return queue_stats
Pros and Cons
Pros:
- ✅ Beautiful, real-time web UI
- ✅ Easy to install and use
- ✅ Task revocation and retry
- ✅ HTTP API for automation
- ✅ Worker monitoring
Cons:
- ❌ No built-in alerting
- ❌ Requires separate service to run
- ❌ Limited historical data (depends on result backend)
- ❌ Doesn't monitor Beat scheduler specifically
- ❌ No dead man's switch for periodic tasks
Best for: Development, debugging, and manual production monitoring.
Method 2: Prometheus + Grafana
For teams using Prometheus, integrate Celery metrics for comprehensive observability.
Installing Celery Exporter
pip install celery-exporter
Running the Exporter
celery-exporter --broker-url=redis://localhost:6379/0
This exposes Prometheus metrics on http://localhost:9808/metrics
Available Metrics
# Task metrics
celery_tasks_total{state="SUCCESS"} 1234
celery_tasks_total{state="FAILURE"} 56
celery_tasks_total{state="RETRY"} 12
# Worker metrics
celery_workers{state="online"} 4
celery_workers{state="offline"} 0
# Queue metrics
celery_queue_length{queue_name="default"} 42
celery_queue_length{queue_name="priority"} 5
# Task runtime
celery_task_runtime_seconds{task="tasks.send_email"} 1.234
Prometheus Configuration
# prometheus.yml
scrape_configs:
- job_name: 'celery'
static_configs:
- targets: ['localhost:9808']
Custom Metrics in Tasks
Add custom instrumentation:
from prometheus_client import Counter, Histogram
import time
# Define metrics
task_counter = Counter(
'myapp_task_processed',
'Number of tasks processed',
['task_name', 'status']
)
task_duration = Histogram(
'myapp_task_duration_seconds',
'Task execution time',
['task_name']
)
@shared_task
def process_payment(payment_id):
start_time = time.time()
try:
# Process payment logic
result = do_payment_processing(payment_id)
task_counter.labels(task_name='process_payment', status='success').inc()
return result
except Exception as e:
task_counter.labels(task_name='process_payment', status='failure').inc()
raise
finally:
duration = time.time() - start_time
task_duration.labels(task_name='process_payment').observe(duration)
Grafana Dashboards
Create dashboards with:
- Task success/failure rates over time
- Queue lengths by queue name
- Worker count and status
- Task execution time percentiles
- Error rate by task type
Alerting with Prometheus
# alerts.yml
groups:
- name: celery_alerts
rules:
- alert: CeleryHighFailureRate
expr: rate(celery_tasks_total{state="FAILURE"}[5m]) > 0.1
for: 5m
annotations:
summary: "High Celery task failure rate"
- alert: CeleryQueueBuildup
expr: celery_queue_length > 1000
for: 10m
annotations:
summary: "Celery queue has {{ $value }} pending tasks"
- alert: CeleryWorkersDown
expr: celery_workers{state="online"} == 0
for: 2m
annotations:
summary: "No Celery workers are online"
Pros and Cons
Pros:
- ✅ Industry-standard metrics stack
- ✅ Powerful querying with PromQL
- ✅ Flexible alerting
- ✅ Historical data and trends
- ✅ Integrates with existing observability
Cons:
- ❌ Requires Prometheus + Grafana infrastructure
- ❌ Complex setup for small teams
- ❌ Doesn't monitor Beat scheduler directly
- ❌ No task-level lifecycle tracking
- ❌ Manual dashboard/alert configuration
Best for: Teams with existing Prometheus/Grafana who want unified metrics.
Method 3: Django-Celery-Results + Custom Monitoring
For Django applications, use django-celery-results with custom monitoring.
Installation
pip install django-celery-results
Django Configuration
# settings.py
INSTALLED_APPS = [
...
'django_celery_results',
]
# Use Django ORM as result backend
CELERY_RESULT_BACKEND = 'django-db'
# Enable extended task tracking
CELERY_RESULT_EXTENDED = True
Run migrations:
python manage.py migrate django_celery_results
Querying Task Results
from django_celery_results.models import TaskResult
from django.utils import timezone
from datetime import timedelta
class CeleryMonitor:
@staticmethod
def get_failed_tasks(hours=24):
"""Get failed tasks in the last N hours"""
cutoff = timezone.now() - timedelta(hours=hours)
return TaskResult.objects.filter(
status='FAILURE',
date_done__gte=cutoff
).values('task_name', 'task_args', 'result', 'date_done')
@staticmethod
def get_task_stats(task_name, hours=24):
"""Get success/failure stats for a specific task"""
cutoff = timezone.now() - timedelta(hours=hours)
tasks = TaskResult.objects.filter(
task_name=task_name,
date_done__gte=cutoff
)
total = tasks.count()
successful = tasks.filter(status='SUCCESS').count()
failed = tasks.filter(status='FAILURE').count()
return {
'task_name': task_name,
'total': total,
'successful': successful,
'failed': failed,
'success_rate': (successful / total * 100) if total > 0 else 0
}
@staticmethod
def get_slow_tasks(task_name, threshold_seconds=10, hours=24):
"""Find tasks that took longer than threshold"""
cutoff = timezone.now() - timedelta(hours=hours)
# django-celery-results doesn't store duration directly
# You need to calculate from date_created and date_done
from django.db.models import F, ExpressionWrapper, DurationField
slow_tasks = TaskResult.objects.filter(
task_name=task_name,
status='SUCCESS',
date_done__gte=cutoff
).annotate(
duration=ExpressionWrapper(
F('date_done') - F('date_created'),
output_field=DurationField()
)
).filter(
duration__gt=timedelta(seconds=threshold_seconds)
)
return slow_tasks
Building a Monitoring Dashboard
# views.py
from django.views.generic import TemplateView
from django_celery_results.models import TaskResult
class CeleryMonitorView(TemplateView):
template_name = 'monitoring/celery.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Recent failures
context['recent_failures'] = TaskResult.objects.filter(
status='FAILURE'
).order_by('-date_done')[:20]
# Task stats by name
from django.db.models import Count
context['task_stats'] = TaskResult.objects.values(
'task_name', 'status'
).annotate(count=Count('id'))
return context
Periodic Health Checks
# tasks.py
from celery import shared_task
from django.core.mail import mail_admins
@shared_task
def check_celery_health():
"""Run every 10 minutes to check for issues"""
monitor = CeleryMonitor()
# Check for high failure rate
failed = monitor.get_failed_tasks(hours=1)
if len(failed) > 10:
mail_admins(
'Celery Alert: High Failure Rate',
f'Found {len(failed)} failed tasks in the last hour'
)
# Check for specific critical tasks
payment_stats = monitor.get_task_stats('tasks.process_payment', hours=1)
if payment_stats['success_rate'] < 95:
mail_admins(
'Celery Alert: Payment Processing Issues',
f"Payment success rate: {payment_stats['success_rate']}%"
)
# Schedule the health check
app.conf.beat_schedule = {
'celery-health-check': {
'task': 'tasks.check_celery_health',
'schedule': 600.0, # Every 10 minutes
},
}
Pros and Cons
Pros:
- ✅ Integrates with Django ORM
- ✅ Query task results with Django queries
- ✅ Build custom dashboards
- ✅ Store task history long-term
Cons:
- ❌ Django-specific (not for Flask, FastAPI, etc.)
- ❌ Requires building alerting yourself
- ❌ Database storage overhead
- ❌ Doesn't monitor Beat scheduler
- ❌ No real-time visibility
Best for: Django teams that want custom dashboards and long-term task history.
Method 4: Celery Signals + Custom Monitoring
Use Celery's built-in signals for custom monitoring integrations.
Available Signals
from celery.signals import (
task_prerun,
task_postrun,
task_failure,
task_retry,
beat_init,
beat_embedded_init
)
Custom Monitoring with Signals
# monitoring.py
import requests
import logging
from celery.signals import task_prerun, task_postrun, task_failure
logger = logging.getLogger(__name__)
MONITOR_BASE_URL = 'https://monitor.example.com'
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
"""Called before task execution"""
task_name = task.name
try:
requests.post(
f'{MONITOR_BASE_URL}/ping/{task_name}/start',
json={'task_id': task_id},
timeout=3
)
except Exception as e:
logger.warning(f"Failed to report task start: {e}")
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None,
retval=None, state=None, **kwargs):
"""Called after task execution"""
task_name = task.name
try:
requests.post(
f'{MONITOR_BASE_URL}/ping/{task_name}/complete',
json={
'task_id': task_id,
'state': state
},
timeout=3
)
except Exception as e:
logger.warning(f"Failed to report task completion: {e}")
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None,
traceback=None, **kwargs):
"""Called when task fails"""
task_name = sender.name
try:
requests.post(
f'{MONITOR_BASE_URL}/ping/{task_name}/fail',
json={
'task_id': task_id,
'error': str(exception),
'traceback': str(traceback)
},
timeout=3
)
except Exception as e:
logger.warning(f"Failed to report task failure: {e}")
Monitoring Beat Scheduler
from celery.signals import beat_init
@beat_init.connect
def beat_started(sender=None, **kwargs):
"""Called when beat scheduler starts"""
try:
requests.post(
f'{MONITOR_BASE_URL}/beat/started',
json={'timestamp': datetime.now().isoformat()}
)
except Exception as e:
logger.warning(f"Failed to report beat start: {e}")
# Periodic beat heartbeat
from celery import shared_task
@shared_task
def beat_heartbeat():
"""Runs every minute to confirm beat is alive"""
try:
requests.post(f'{MONITOR_BASE_URL}/beat/heartbeat')
except Exception as e:
logger.warning(f"Failed to send beat heartbeat: {e}")
app.conf.beat_schedule = {
'beat-heartbeat': {
'task': 'tasks.beat_heartbeat',
'schedule': 60.0, # Every minute
},
}
Tracking Task Duration
import time
@task_prerun.connect
def track_start_time(sender=None, task_id=None, **kwargs):
"""Store task start time"""
sender.request.start_time = time.time()
@task_postrun.connect
def track_duration(sender=None, task_id=None, state=None, **kwargs):
"""Calculate and report task duration"""
if hasattr(sender.request, 'start_time'):
duration = time.time() - sender.request.start_time
try:
requests.post(
f'{MONITOR_BASE_URL}/metrics/duration',
json={
'task_name': sender.name,
'duration_seconds': duration,
'task_id': task_id
}
)
except Exception as e:
logger.warning(f"Failed to report duration: {e}")
Pros and Cons
Pros:
- ✅ Framework-agnostic (works with Django, Flask, FastAPI)
- ✅ Lifecycle tracking (start/success/fail)
- ✅ Custom integrations
- ✅ Fine-grained control
Cons:
- ❌ Requires building monitoring infrastructure
- ❌ Manual setup for each signal
- ❌ Need to handle HTTP failures gracefully
- ❌ No built-in alerting
Best for: Teams that want custom integrations and full control.
Method 5: Framework-Native Auto-Discovery
Use CronRadar's Celery integration for automatic monitoring.
Installation
pip install cronradar-celery
Configuration
# settings.py or config.py
CRONRADAR_API_KEY = 'your-api-key'
CRONRADAR_ENABLED = True
Enable Monitoring
# celery.py
from celery import Celery
from cronradar.celery import setup_cronradar
app = Celery('myapp')
app.config_from_object('django.conf:settings', namespace='CELERY')
# Auto-discover and monitor all periodic tasks
if settings.CRONRADAR_ENABLED:
setup_cronradar(app, mode='all')
Automatic Beat Schedule Discovery
All periodic tasks are automatically monitored:
app.conf.beat_schedule = {
'backup-database': {
'task': 'tasks.backup_database',
'schedule': crontab(hour=2, minute=0),
},
'process-payments': {
'task': 'tasks.process_payments',
'schedule': 300.0, # Every 5 minutes
},
'send-reports': {
'task': 'tasks.send_weekly_reports',
'schedule': crontab(day_of_week=1, hour=8, minute=0),
},
}
# All three tasks automatically monitored with:
# - Schedule detection
# - Lifecycle tracking
# - Grace periods
# - Alerting
Monitoring One-Time Tasks
@shared_task
def send_email(user_id):
# Task logic
pass
# Send with monitoring
send_email.apply_async(
args=[user_id],
monitor=True,
monitor_name='send-email'
)
Custom Configuration
# Per-task configuration
setup_cronradar(app, config={
'tasks.backup_database': {
'grace_period': 1800, # 30 minutes
'timeout': 7200, # 2 hours
},
'tasks.quick_task': {
'grace_period': 60, # 1 minute
}
})
Pros and Cons
Pros:
- ✅ Zero-config auto-discovery
- ✅ Automatic beat schedule detection
- ✅ Dead man's switch for periodic tasks
- ✅ Built-in alerting (Slack, Teams, PagerDuty, email)
- ✅ Historical data and trends
- ✅ Team collaboration
- ✅ No infrastructure to maintain
Cons:
- ❌ External service dependency
- ❌ Paid after trial
- ❌ Data sent outside infrastructure
Best for: Teams that want comprehensive monitoring without building infrastructure.
Best Practices for Celery Monitoring
1. Set Task Time Limits
@shared_task(time_limit=300, soft_time_limit=270)
def long_running_task():
"""Task with 5-minute hard limit, 4.5-minute soft limit"""
try:
do_work()
except SoftTimeLimitExceeded:
cleanup()
raise
2. Use Task Routing
# Route critical tasks to dedicated queue
@shared_task
def process_payment(payment_id):
pass
process_payment.apply_async(
args=[payment_id],
queue='critical'
)
# Start workers for specific queues
# celery -A myapp worker -Q critical --concurrency=10
# celery -A myapp worker -Q default --concurrency=20
3. Implement Idempotency
@shared_task
def process_payment(payment_id):
"""Idempotent payment processing"""
payment = Payment.objects.get(id=payment_id)
# Check if already processed
if payment.status == 'completed':
return {'status': 'already_processed'}
# Process payment...
payment.status = 'completed'
payment.save()
return {'status': 'success'}
4. Monitor Task Queues
from celery import current_app
def get_queue_lengths():
"""Get current queue lengths (Redis broker)"""
with current_app.connection_or_acquire() as conn:
lengths = {}
for queue in ['default', 'critical', 'low']:
lengths[queue] = conn.default_channel.client.llen(queue)
return lengths
5. Graceful Task Shutdown
import signal
@shared_task(bind=True)
def interruptible_task(self):
"""Task that handles shutdown gracefully"""
def signal_handler(signum, frame):
raise InterruptedTask()
signal.signal(signal.SIGTERM, signal_handler)
try:
for item in large_dataset:
process_item(item)
except InterruptedTask:
# Save progress
self.retry(countdown=60)
6. Test Beat Schedules
# Test crontab schedules
from celery.schedules import crontab
from datetime import datetime
schedule = crontab(hour=2, minute=0)
now = datetime.now()
is_due, next_run = schedule.is_due(now)
print(f"Is due: {is_due}")
print(f"Next run in: {next_run} seconds")
Troubleshooting Common Issues
Workers Not Processing Tasks
Check worker status:
celery -A myapp inspect active
celery -A myapp inspect stats
Verify broker connection:
from celery import current_app
try:
current_app.connection().ensure_connection(max_retries=3)
print("Broker connection OK")
except Exception as e:
print(f"Broker connection failed: {e}")
Beat Not Scheduling Tasks
Check beat is running:
ps aux | grep 'celery.*beat'
Verify beat schedule:
celery -A myapp inspect scheduled
Check for multiple beat instances:
# Only ONE beat scheduler should run per application
# Use celery beat with persistent scheduler
# settings.py
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
Tasks Stuck in Queue
Check worker concurrency:
# Increase workers
celery -A myapp worker --concurrency=20
Verify task routing:
# Check if task is routed to non-existent queue
@shared_task
def my_task():
pass
# Ensure worker is consuming from that queue
# celery -A myapp worker -Q correct_queue
High Memory Usage
Use task result expiry:
# settings.py
CELERY_RESULT_EXPIRES = 3600 # 1 hour
# Or per-task
@shared_task(expires=1800)
def temporary_task():
pass
Limit task prefetch:
# Prevent workers from prefetching too many tasks
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
Production Checklist
- [ ] Result backend configured
- [ ] Task time limits set
- [ ] Workers running with appropriate concurrency
- [ ] Beat scheduler running (only one instance)
- [ ] Monitoring enabled (Flower/Prometheus/CronRadar)
- [ ] Alerts configured for failures
- [ ] Queue lengths monitored
- [ ] Task retry logic implemented
- [ ] Idempotent tasks where needed
- [ ] Graceful shutdown handling
Conclusion
Celery's distributed architecture requires distributed monitoring. Flower is great for development, but production needs proactive alerting and historical analysis.
Your options:
- Flower - Beautiful dashboard, manual monitoring
- Prometheus + Grafana - Metrics-based, requires infrastructure
- django-celery-results - Django integration, custom dashboards
- Custom signals - Full control, build everything yourself
- Auto-discovery - Zero maintenance, managed service
Start by monitoring your most critical periodic tasks—payment processing, data synchronization, nightly backups. Then expand to async tasks and queue monitoring.
The cost of monitoring is negligible compared to the cost of undetected failures in production.
Monitor all Celery tasks automatically. CronRadar's Celery integration auto-discovers periodic tasks, tracks execution, and alerts your team. Start monitoring in 5 minutes →


