Celery Monitoring Guide: Track Your Python Background Jobs
celery monitoringdjango celery monitoringmonitor celery tasks

Celery Monitoring Guide: Track Your Python Background Jobs

Complete guide to monitoring Celery tasks in production: Flower dashboard, Prometheus metrics, django-celery-results, custom monitoring, and framework-native solutions. Real Python code examples and best practices.

CronRadar Team
14 min read

Celery is the de facto standard for background task processing in Python applications—asynchronous task queues, scheduled jobs with Celery Beat, distributed processing across workers. But in production, distributed systems fail in distributed ways.

Workers crash. Brokers (Redis, RabbitMQ) become unavailable. Tasks get stuck. Beat schedules drift. And when Celery tasks fail silently, your application keeps running while critical background processes stop—payments don't process, emails don't send, data synchronization halts.

This guide covers production-grade monitoring strategies for Celery, from Flower dashboards to Prometheus metrics, custom signals, and framework-native monitoring.

How Celery Architecture Works

Before diving into monitoring, let's review Celery's distributed architecture.

Core Components

1. Celery Workers Process tasks from queues. Multiple workers can run in parallel.

# Start a worker
celery -A myapp worker --loglevel=info

2. Message Broker Stores tasks waiting to be processed (Redis, RabbitMQ, etc.)

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'

3. Result Backend Stores task results and state (optional but recommended for monitoring)

CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

4. Celery Beat Scheduler for periodic tasks (like cron)

# Start the beat scheduler
celery -A myapp beat --loglevel=info

Task Types

from celery import shared_task

# Regular async task
@shared_task
def send_email(user_id):
    user = User.objects.get(id=user_id)
    send_email_to(user.email)

# Scheduled periodic task
from celery.schedules import crontab

app.conf.beat_schedule = {
    'backup-every-night': {
        'task': 'tasks.backup_database',
        'schedule': crontab(hour=2, minute=0),
    },
    'process-payments-every-5-minutes': {
        'task': 'tasks.process_payments',
        'schedule': 300.0,  # Every 5 minutes
    },
}

Why Monitoring is Critical

Celery's distributed nature creates failure modes that don't exist in synchronous code:

  1. Worker failures - Process crashes, OOM kills, hardware failures
  2. Broker issues - Redis/RabbitMQ downtime, connection timeouts
  3. Task failures - Exceptions, timeouts, infinite loops
  4. Beat failures - Scheduler stops, schedules drift, duplicate tasks
  5. Queue buildup - Tasks arriving faster than workers process them
  6. Silent failures - Tasks complete with exceptions but no alerts

Without monitoring, these failures are invisible until customers complain.

Method 1: Flower (Real-Time Web Dashboard)

Flower is the most popular Celery monitoring tool—a real-time web dashboard for Celery clusters.

Installation

pip install flower

Starting Flower

# Basic
celery -A myapp flower

# With authentication
celery -A myapp flower --basic_auth=user:password

# Custom port
celery -A myapp flower --port=5555

Access dashboard at http://localhost:5555

Flower Features

Real-time monitoring:

  • Active workers and their status
  • Task execution rates (per second)
  • Task success/failure counts
  • Worker resource usage (CPU, memory)
  • Queue lengths
  • Task history and details

Task management:

  • View running tasks
  • Revoke/terminate tasks
  • Retry failed tasks
  • View task arguments and results

Programmatic Access

Flower provides an HTTP API:

import requests

# Get worker stats
response = requests.get('http://localhost:5555/api/workers')
workers = response.json()

for worker_name, worker_data in workers.items():
    print(f"Worker: {worker_name}")
    print(f"Status: {worker_data['status']}")
    print(f"Active tasks: {len(worker_data.get('active', []))}")

# Get task info
task_id = 'abc-123-def-456'
response = requests.get(f'http://localhost:5555/api/task/info/{task_id}')
task_info = response.json()

Building Alerts with Flower API

import requests
from datetime import datetime, timedelta

class FlowerMonitor:
    def __init__(self, flower_url='http://localhost:5555'):
        self.base_url = flower_url

    def get_failed_tasks(self, hours=24):
        """Get failed tasks in the last N hours"""
        # Flower doesn't have a direct failed tasks endpoint
        # You need to query task history and filter
        response = requests.get(f'{self.base_url}/api/tasks')
        tasks = response.json()

        cutoff = datetime.now() - timedelta(hours=hours)
        failed = []

        for task_id, task_data in tasks.items():
            if task_data.get('state') == 'FAILURE':
                timestamp = task_data.get('timestamp')
                if timestamp and datetime.fromtimestamp(timestamp) > cutoff:
                    failed.append({
                        'id': task_id,
                        'name': task_data.get('name'),
                        'exception': task_data.get('exception'),
                        'timestamp': timestamp
                    })

        return failed

    def check_worker_health(self):
        """Check if workers are active"""
        response = requests.get(f'{self.base_url}/api/workers')
        workers = response.json()

        offline_workers = []
        for worker_name, worker_data in workers.items():
            if not worker_data.get('status'):
                offline_workers.append(worker_name)

        return offline_workers

    def get_queue_lengths(self):
        """Get current queue lengths"""
        response = requests.get(f'{self.base_url}/api/workers')
        workers = response.json()

        # Queue lengths vary by broker
        # For Redis, query broker directly
        # This is a simplified example
        queue_stats = {}
        for worker_name, worker_data in workers.items():
            active = len(worker_data.get('active', []))
            queue_stats[worker_name] = active

        return queue_stats

Pros and Cons

Pros:

  • ✅ Beautiful, real-time web UI
  • ✅ Easy to install and use
  • ✅ Task revocation and retry
  • ✅ HTTP API for automation
  • ✅ Worker monitoring

Cons:

  • ❌ No built-in alerting
  • ❌ Requires separate service to run
  • ❌ Limited historical data (depends on result backend)
  • ❌ Doesn't monitor Beat scheduler specifically
  • ❌ No dead man's switch for periodic tasks

Best for: Development, debugging, and manual production monitoring.

Method 2: Prometheus + Grafana

For teams using Prometheus, integrate Celery metrics for comprehensive observability.

Installing Celery Exporter

pip install celery-exporter

Running the Exporter

celery-exporter --broker-url=redis://localhost:6379/0

This exposes Prometheus metrics on http://localhost:9808/metrics

Available Metrics

# Task metrics
celery_tasks_total{state="SUCCESS"} 1234
celery_tasks_total{state="FAILURE"} 56
celery_tasks_total{state="RETRY"} 12

# Worker metrics
celery_workers{state="online"} 4
celery_workers{state="offline"} 0

# Queue metrics
celery_queue_length{queue_name="default"} 42
celery_queue_length{queue_name="priority"} 5

# Task runtime
celery_task_runtime_seconds{task="tasks.send_email"} 1.234

Prometheus Configuration

# prometheus.yml
scrape_configs:
  - job_name: 'celery'
    static_configs:
      - targets: ['localhost:9808']

Custom Metrics in Tasks

Add custom instrumentation:

from prometheus_client import Counter, Histogram
import time

# Define metrics
task_counter = Counter(
    'myapp_task_processed',
    'Number of tasks processed',
    ['task_name', 'status']
)

task_duration = Histogram(
    'myapp_task_duration_seconds',
    'Task execution time',
    ['task_name']
)

@shared_task
def process_payment(payment_id):
    start_time = time.time()

    try:
        # Process payment logic
        result = do_payment_processing(payment_id)

        task_counter.labels(task_name='process_payment', status='success').inc()
        return result

    except Exception as e:
        task_counter.labels(task_name='process_payment', status='failure').inc()
        raise

    finally:
        duration = time.time() - start_time
        task_duration.labels(task_name='process_payment').observe(duration)

Grafana Dashboards

Create dashboards with:

  • Task success/failure rates over time
  • Queue lengths by queue name
  • Worker count and status
  • Task execution time percentiles
  • Error rate by task type

Alerting with Prometheus

# alerts.yml
groups:
  - name: celery_alerts
    rules:
      - alert: CeleryHighFailureRate
        expr: rate(celery_tasks_total{state="FAILURE"}[5m]) > 0.1
        for: 5m
        annotations:
          summary: "High Celery task failure rate"

      - alert: CeleryQueueBuildup
        expr: celery_queue_length > 1000
        for: 10m
        annotations:
          summary: "Celery queue has {{ $value }} pending tasks"

      - alert: CeleryWorkersDown
        expr: celery_workers{state="online"} == 0
        for: 2m
        annotations:
          summary: "No Celery workers are online"

Pros and Cons

Pros:

  • ✅ Industry-standard metrics stack
  • ✅ Powerful querying with PromQL
  • ✅ Flexible alerting
  • ✅ Historical data and trends
  • ✅ Integrates with existing observability

Cons:

  • ❌ Requires Prometheus + Grafana infrastructure
  • ❌ Complex setup for small teams
  • ❌ Doesn't monitor Beat scheduler directly
  • ❌ No task-level lifecycle tracking
  • ❌ Manual dashboard/alert configuration

Best for: Teams with existing Prometheus/Grafana who want unified metrics.

Method 3: Django-Celery-Results + Custom Monitoring

For Django applications, use django-celery-results with custom monitoring.

Installation

pip install django-celery-results

Django Configuration

# settings.py
INSTALLED_APPS = [
    ...
    'django_celery_results',
]

# Use Django ORM as result backend
CELERY_RESULT_BACKEND = 'django-db'

# Enable extended task tracking
CELERY_RESULT_EXTENDED = True

Run migrations:

python manage.py migrate django_celery_results

Querying Task Results

from django_celery_results.models import TaskResult
from django.utils import timezone
from datetime import timedelta

class CeleryMonitor:
    @staticmethod
    def get_failed_tasks(hours=24):
        """Get failed tasks in the last N hours"""
        cutoff = timezone.now() - timedelta(hours=hours)

        return TaskResult.objects.filter(
            status='FAILURE',
            date_done__gte=cutoff
        ).values('task_name', 'task_args', 'result', 'date_done')

    @staticmethod
    def get_task_stats(task_name, hours=24):
        """Get success/failure stats for a specific task"""
        cutoff = timezone.now() - timedelta(hours=hours)

        tasks = TaskResult.objects.filter(
            task_name=task_name,
            date_done__gte=cutoff
        )

        total = tasks.count()
        successful = tasks.filter(status='SUCCESS').count()
        failed = tasks.filter(status='FAILURE').count()

        return {
            'task_name': task_name,
            'total': total,
            'successful': successful,
            'failed': failed,
            'success_rate': (successful / total * 100) if total > 0 else 0
        }

    @staticmethod
    def get_slow_tasks(task_name, threshold_seconds=10, hours=24):
        """Find tasks that took longer than threshold"""
        cutoff = timezone.now() - timedelta(hours=hours)

        # django-celery-results doesn't store duration directly
        # You need to calculate from date_created and date_done
        from django.db.models import F, ExpressionWrapper, DurationField

        slow_tasks = TaskResult.objects.filter(
            task_name=task_name,
            status='SUCCESS',
            date_done__gte=cutoff
        ).annotate(
            duration=ExpressionWrapper(
                F('date_done') - F('date_created'),
                output_field=DurationField()
            )
        ).filter(
            duration__gt=timedelta(seconds=threshold_seconds)
        )

        return slow_tasks

Building a Monitoring Dashboard

# views.py
from django.views.generic import TemplateView
from django_celery_results.models import TaskResult

class CeleryMonitorView(TemplateView):
    template_name = 'monitoring/celery.html'

    def get_context_data(self, **kwargs):
        context = super().get_context_data(**kwargs)

        # Recent failures
        context['recent_failures'] = TaskResult.objects.filter(
            status='FAILURE'
        ).order_by('-date_done')[:20]

        # Task stats by name
        from django.db.models import Count
        context['task_stats'] = TaskResult.objects.values(
            'task_name', 'status'
        ).annotate(count=Count('id'))

        return context

Periodic Health Checks

# tasks.py
from celery import shared_task
from django.core.mail import mail_admins

@shared_task
def check_celery_health():
    """Run every 10 minutes to check for issues"""
    monitor = CeleryMonitor()

    # Check for high failure rate
    failed = monitor.get_failed_tasks(hours=1)
    if len(failed) > 10:
        mail_admins(
            'Celery Alert: High Failure Rate',
            f'Found {len(failed)} failed tasks in the last hour'
        )

    # Check for specific critical tasks
    payment_stats = monitor.get_task_stats('tasks.process_payment', hours=1)
    if payment_stats['success_rate'] < 95:
        mail_admins(
            'Celery Alert: Payment Processing Issues',
            f"Payment success rate: {payment_stats['success_rate']}%"
        )

# Schedule the health check
app.conf.beat_schedule = {
    'celery-health-check': {
        'task': 'tasks.check_celery_health',
        'schedule': 600.0,  # Every 10 minutes
    },
}

Pros and Cons

Pros:

  • ✅ Integrates with Django ORM
  • ✅ Query task results with Django queries
  • ✅ Build custom dashboards
  • ✅ Store task history long-term

Cons:

  • ❌ Django-specific (not for Flask, FastAPI, etc.)
  • ❌ Requires building alerting yourself
  • ❌ Database storage overhead
  • ❌ Doesn't monitor Beat scheduler
  • ❌ No real-time visibility

Best for: Django teams that want custom dashboards and long-term task history.

Method 4: Celery Signals + Custom Monitoring

Use Celery's built-in signals for custom monitoring integrations.

Available Signals

from celery.signals import (
    task_prerun,
    task_postrun,
    task_failure,
    task_retry,
    beat_init,
    beat_embedded_init
)

Custom Monitoring with Signals

# monitoring.py
import requests
import logging
from celery.signals import task_prerun, task_postrun, task_failure

logger = logging.getLogger(__name__)

MONITOR_BASE_URL = 'https://monitor.example.com'

@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
    """Called before task execution"""
    task_name = task.name

    try:
        requests.post(
            f'{MONITOR_BASE_URL}/ping/{task_name}/start',
            json={'task_id': task_id},
            timeout=3
        )
    except Exception as e:
        logger.warning(f"Failed to report task start: {e}")

@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None,
                         retval=None, state=None, **kwargs):
    """Called after task execution"""
    task_name = task.name

    try:
        requests.post(
            f'{MONITOR_BASE_URL}/ping/{task_name}/complete',
            json={
                'task_id': task_id,
                'state': state
            },
            timeout=3
        )
    except Exception as e:
        logger.warning(f"Failed to report task completion: {e}")

@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None,
                         traceback=None, **kwargs):
    """Called when task fails"""
    task_name = sender.name

    try:
        requests.post(
            f'{MONITOR_BASE_URL}/ping/{task_name}/fail',
            json={
                'task_id': task_id,
                'error': str(exception),
                'traceback': str(traceback)
            },
            timeout=3
        )
    except Exception as e:
        logger.warning(f"Failed to report task failure: {e}")

Monitoring Beat Scheduler

from celery.signals import beat_init

@beat_init.connect
def beat_started(sender=None, **kwargs):
    """Called when beat scheduler starts"""
    try:
        requests.post(
            f'{MONITOR_BASE_URL}/beat/started',
            json={'timestamp': datetime.now().isoformat()}
        )
    except Exception as e:
        logger.warning(f"Failed to report beat start: {e}")

# Periodic beat heartbeat
from celery import shared_task

@shared_task
def beat_heartbeat():
    """Runs every minute to confirm beat is alive"""
    try:
        requests.post(f'{MONITOR_BASE_URL}/beat/heartbeat')
    except Exception as e:
        logger.warning(f"Failed to send beat heartbeat: {e}")

app.conf.beat_schedule = {
    'beat-heartbeat': {
        'task': 'tasks.beat_heartbeat',
        'schedule': 60.0,  # Every minute
    },
}

Tracking Task Duration

import time

@task_prerun.connect
def track_start_time(sender=None, task_id=None, **kwargs):
    """Store task start time"""
    sender.request.start_time = time.time()

@task_postrun.connect
def track_duration(sender=None, task_id=None, state=None, **kwargs):
    """Calculate and report task duration"""
    if hasattr(sender.request, 'start_time'):
        duration = time.time() - sender.request.start_time

        try:
            requests.post(
                f'{MONITOR_BASE_URL}/metrics/duration',
                json={
                    'task_name': sender.name,
                    'duration_seconds': duration,
                    'task_id': task_id
                }
            )
        except Exception as e:
            logger.warning(f"Failed to report duration: {e}")

Pros and Cons

Pros:

  • ✅ Framework-agnostic (works with Django, Flask, FastAPI)
  • ✅ Lifecycle tracking (start/success/fail)
  • ✅ Custom integrations
  • ✅ Fine-grained control

Cons:

  • ❌ Requires building monitoring infrastructure
  • ❌ Manual setup for each signal
  • ❌ Need to handle HTTP failures gracefully
  • ❌ No built-in alerting

Best for: Teams that want custom integrations and full control.

Method 5: Framework-Native Auto-Discovery

Use CronRadar's Celery integration for automatic monitoring.

Installation

pip install cronradar-celery

Configuration

# settings.py or config.py
CRONRADAR_API_KEY = 'your-api-key'
CRONRADAR_ENABLED = True

Enable Monitoring

# celery.py
from celery import Celery
from cronradar.celery import setup_cronradar

app = Celery('myapp')
app.config_from_object('django.conf:settings', namespace='CELERY')

# Auto-discover and monitor all periodic tasks
if settings.CRONRADAR_ENABLED:
    setup_cronradar(app, mode='all')

Automatic Beat Schedule Discovery

All periodic tasks are automatically monitored:

app.conf.beat_schedule = {
    'backup-database': {
        'task': 'tasks.backup_database',
        'schedule': crontab(hour=2, minute=0),
    },
    'process-payments': {
        'task': 'tasks.process_payments',
        'schedule': 300.0,  # Every 5 minutes
    },
    'send-reports': {
        'task': 'tasks.send_weekly_reports',
        'schedule': crontab(day_of_week=1, hour=8, minute=0),
    },
}

# All three tasks automatically monitored with:
# - Schedule detection
# - Lifecycle tracking
# - Grace periods
# - Alerting

Monitoring One-Time Tasks

@shared_task
def send_email(user_id):
    # Task logic
    pass

# Send with monitoring
send_email.apply_async(
    args=[user_id],
    monitor=True,
    monitor_name='send-email'
)

Custom Configuration

# Per-task configuration
setup_cronradar(app, config={
    'tasks.backup_database': {
        'grace_period': 1800,  # 30 minutes
        'timeout': 7200,       # 2 hours
    },
    'tasks.quick_task': {
        'grace_period': 60,    # 1 minute
    }
})

Pros and Cons

Pros:

  • ✅ Zero-config auto-discovery
  • ✅ Automatic beat schedule detection
  • ✅ Dead man's switch for periodic tasks
  • ✅ Built-in alerting (Slack, Teams, PagerDuty, email)
  • ✅ Historical data and trends
  • ✅ Team collaboration
  • ✅ No infrastructure to maintain

Cons:

  • ❌ External service dependency
  • ❌ Paid after trial
  • ❌ Data sent outside infrastructure

Best for: Teams that want comprehensive monitoring without building infrastructure.

Best Practices for Celery Monitoring

1. Set Task Time Limits

@shared_task(time_limit=300, soft_time_limit=270)
def long_running_task():
    """Task with 5-minute hard limit, 4.5-minute soft limit"""
    try:
        do_work()
    except SoftTimeLimitExceeded:
        cleanup()
        raise

2. Use Task Routing

# Route critical tasks to dedicated queue
@shared_task
def process_payment(payment_id):
    pass

process_payment.apply_async(
    args=[payment_id],
    queue='critical'
)

# Start workers for specific queues
# celery -A myapp worker -Q critical --concurrency=10
# celery -A myapp worker -Q default --concurrency=20

3. Implement Idempotency

@shared_task
def process_payment(payment_id):
    """Idempotent payment processing"""
    payment = Payment.objects.get(id=payment_id)

    # Check if already processed
    if payment.status == 'completed':
        return {'status': 'already_processed'}

    # Process payment...
    payment.status = 'completed'
    payment.save()

    return {'status': 'success'}

4. Monitor Task Queues

from celery import current_app

def get_queue_lengths():
    """Get current queue lengths (Redis broker)"""
    with current_app.connection_or_acquire() as conn:
        lengths = {}
        for queue in ['default', 'critical', 'low']:
            lengths[queue] = conn.default_channel.client.llen(queue)
        return lengths

5. Graceful Task Shutdown

import signal

@shared_task(bind=True)
def interruptible_task(self):
    """Task that handles shutdown gracefully"""

    def signal_handler(signum, frame):
        raise InterruptedTask()

    signal.signal(signal.SIGTERM, signal_handler)

    try:
        for item in large_dataset:
            process_item(item)
    except InterruptedTask:
        # Save progress
        self.retry(countdown=60)

6. Test Beat Schedules

# Test crontab schedules
from celery.schedules import crontab
from datetime import datetime

schedule = crontab(hour=2, minute=0)
now = datetime.now()
is_due, next_run = schedule.is_due(now)

print(f"Is due: {is_due}")
print(f"Next run in: {next_run} seconds")

Troubleshooting Common Issues

Workers Not Processing Tasks

Check worker status:

celery -A myapp inspect active
celery -A myapp inspect stats

Verify broker connection:

from celery import current_app

try:
    current_app.connection().ensure_connection(max_retries=3)
    print("Broker connection OK")
except Exception as e:
    print(f"Broker connection failed: {e}")

Beat Not Scheduling Tasks

Check beat is running:

ps aux | grep 'celery.*beat'

Verify beat schedule:

celery -A myapp inspect scheduled

Check for multiple beat instances:

# Only ONE beat scheduler should run per application
# Use celery beat with persistent scheduler

# settings.py
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

Tasks Stuck in Queue

Check worker concurrency:

# Increase workers
celery -A myapp worker --concurrency=20

Verify task routing:

# Check if task is routed to non-existent queue
@shared_task
def my_task():
    pass

# Ensure worker is consuming from that queue
# celery -A myapp worker -Q correct_queue

High Memory Usage

Use task result expiry:

# settings.py
CELERY_RESULT_EXPIRES = 3600  # 1 hour

# Or per-task
@shared_task(expires=1800)
def temporary_task():
    pass

Limit task prefetch:

# Prevent workers from prefetching too many tasks
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

Production Checklist

  • [ ] Result backend configured
  • [ ] Task time limits set
  • [ ] Workers running with appropriate concurrency
  • [ ] Beat scheduler running (only one instance)
  • [ ] Monitoring enabled (Flower/Prometheus/CronRadar)
  • [ ] Alerts configured for failures
  • [ ] Queue lengths monitored
  • [ ] Task retry logic implemented
  • [ ] Idempotent tasks where needed
  • [ ] Graceful shutdown handling

Conclusion

Celery's distributed architecture requires distributed monitoring. Flower is great for development, but production needs proactive alerting and historical analysis.

Your options:

  1. Flower - Beautiful dashboard, manual monitoring
  2. Prometheus + Grafana - Metrics-based, requires infrastructure
  3. django-celery-results - Django integration, custom dashboards
  4. Custom signals - Full control, build everything yourself
  5. Auto-discovery - Zero maintenance, managed service

Start by monitoring your most critical periodic tasks—payment processing, data synchronization, nightly backups. Then expand to async tasks and queue monitoring.

The cost of monitoring is negligible compared to the cost of undetected failures in production.


Monitor all Celery tasks automatically. CronRadar's Celery integration auto-discovers periodic tasks, tracks execution, and alerts your team. Start monitoring in 5 minutes →

Share this article

Ready to Monitor Your Cron Jobs?

Start monitoring your scheduled tasks with CronRadar. No credit card required for 14-day trial.