Free ServiceCronRadar is completely free for the first half of 2026!

Python APScheduler Monitoring: Track Every Scheduled Task

Cronradar··9 min

Your APScheduler jobs are running. Probably. Maybe. You're not entirely sure because APScheduler doesn't tell you when jobs silently fail, miss their execution window, or crash without logging anything useful.

This is the reality for most Python applications using APScheduler in production. The scheduler runs jobs in background threads, and unless you've built custom monitoring, you're flying blind. A database backup that hasn't run in three days? You'll find out when you need that backup.

This guide shows you how to monitor APScheduler jobs properly—catching failures before they become incidents, tracking execution duration, and getting alerts when something goes wrong.

Why APScheduler Jobs Fail Silently

APScheduler is the most popular in-process scheduler for Python, with good reason. It's flexible, supports multiple job stores, and integrates cleanly with web frameworks. But it has a critical gap: no built-in monitoring or alerting.

The event listener system fires notifications when jobs execute, fail, or miss their window—but those events disappear into the void unless you capture them. There's no execution history, no dashboard, no alerts.

Here's what can go wrong without you knowing:

Jobs That Never Start

The most common production issue is jobs that simply don't run. Your script exits before the scheduler fires:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
scheduler.start()
scheduler.add_job(backup_database, 'cron', hour=2)
# Script exits here - job never runs

BackgroundScheduler runs in a daemon thread that dies when the main thread exits. Without a blocking call or signal handler, your job never executes.

Missed Executions

When a job can't run at its scheduled time (server restart, high load, thread pool exhausted), APScheduler logs a warning and moves on:

WARNING: Run time of job "backup_job (trigger: cron[hour='2'], 
next run at: 2024-12-05 02:00:00)" was missed by 0:15:32.847291

By default, the job is simply skipped. The misfire_grace_time setting controls how late a job can start, but if you're not watching logs, you won't know jobs are being missed.

Concurrent Execution Limits

APScheduler defaults to allowing only one instance of each job running simultaneously. When a job takes longer than its interval, subsequent runs are silently dropped:

Execution of job "sync_data (trigger: interval[0:05:00])" skipped: 
maximum number of running instances reached (1)

Your 5-minute sync job that suddenly takes 7 minutes? It's now running every 10 minutes and you have no idea.

Multi-Worker Deployments

This catches everyone eventually. You deploy your Flask app with Gunicorn using 4 workers, and suddenly your hourly job runs 4 times per hour—once per worker process.

From the APScheduler FAQ:

"Sharing a persistent job store among two or more processes will lead to incorrect scheduler behavior like duplicate execution or the scheduler missing jobs entirely."

Serialization Failures

Jobs stored in persistent backends (PostgreSQL, Redis, MongoDB) must be serializable. Lambda functions, closures, and bound methods fail silently:

ValueError: This Job cannot be serialized since the reference to its 
callable could not be determined. Consider giving a textual reference 
(module:function name) instead.

Your job gets added but never persists across restarts.

APScheduler's Built-in Event System

APScheduler provides event listeners that fire on job lifecycle events. This is your foundation for monitoring:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import (
    EVENT_JOB_EXECUTED,
    EVENT_JOB_ERROR,
    EVENT_JOB_MISSED,
    EVENT_JOB_MAX_INSTANCES
)

def job_listener(event):
    if event.exception:
        print(f"Job {event.job_id} failed: {event.exception}")
        print(f"Traceback: {event.traceback}")
    elif event.code == EVENT_JOB_MISSED:
        print(f"Job {event.job_id} missed at {event.scheduled_run_time}")
    else:
        print(f"Job {event.job_id} completed successfully")

scheduler = BackgroundScheduler()
scheduler.add_listener(
    job_listener,
    EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES
)

The event object contains:

  • job_id: The job identifier
  • scheduled_run_time: When the job was supposed to run
  • retval: Return value (on success)
  • exception: Exception object (on failure)
  • traceback: Full traceback string (on failure)

But here's the problem: events fire and disappear. There's no storage, no history, no way to query what happened last week. You need to send these events somewhere useful.

The Heartbeat Monitoring Pattern

The most reliable way to monitor scheduled jobs is the dead man's switch pattern: your job pings an external service on completion. If the ping doesn't arrive within the expected window, you get alerted.

This approach catches failures that internal monitoring misses:

  • Server crashes before job completes
  • Network partitions preventing job execution
  • Container restarts during job runs
  • Out-of-memory kills

Here's a basic implementation:

import requests
import functools
import time
import logging

logger = logging.getLogger(__name__)

def monitored_job(ping_url):
    """Decorator that pings a monitoring endpoint on job completion."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                
                # Signal success with execution duration
                requests.get(
                    ping_url,
                    params={'duration': int(duration)},
                    timeout=10
                )
                return result
                
            except Exception as e:
                # Signal failure
                requests.post(
                    f"{ping_url}/fail",
                    data=str(e),
                    timeout=10
                )
                raise
                
        return wrapper
    return decorator


# Usage
@monitored_job("https://cronradar.io/ping/abc123")
def nightly_backup():
    """Backs up database to S3."""
    dump_database()
    upload_to_s3()

scheduler = BackgroundScheduler()
scheduler.add_job(nightly_backup, 'cron', hour=2, id='nightly-backup')

When nightly_backup completes successfully, it pings the monitoring URL. If the job fails, crashes, or never runs, the ping never arrives and you get alerted.

Three-Signal Pattern for Complete Visibility

The basic heartbeat catches job failures, but you lose context. Did the job fail after 30 seconds or 30 minutes? Did it even start?

The three-signal pattern provides complete visibility:

  1. Start signal: Job began execution
  2. Success signal: Job completed with duration
  3. Failure signal: Job failed with error details
import requests
import functools
import time
import logging
from typing import Optional

logger = logging.getLogger(__name__)

class JobMonitor:
    """
    Comprehensive job monitoring with start/success/failure signals.
    
    Provides:
    - Accurate duration tracking (from actual start, not scheduled time)
    - Distinguishes between jobs that failed vs never started
    - Error context for debugging
    """
    
    def __init__(
        self,
        job_name: str,
        ping_url: str,
        timeout: int = 10
    ):
        self.job_name = job_name
        self.ping_url = ping_url
        self.timeout = timeout
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            # Signal job started
            self._ping(f"{self.ping_url}/start")
            logger.info(f"Job '{self.job_name}' started")
            
            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                
                # Signal success with duration
                self._ping(self.ping_url, params={'duration': int(duration)})
                logger.info(f"Job '{self.job_name}' completed in {duration:.2f}s")
                
                return result
                
            except Exception as e:
                duration = time.time() - start_time
                
                # Signal failure with error details
                self._ping(
                    f"{self.ping_url}/fail",
                    data=f"{type(e).__name__}: {str(e)}"
                )
                logger.error(
                    f"Job '{self.job_name}' failed after {duration:.2f}s: {e}",
                    exc_info=True
                )
                raise
                
        return wrapper
    
    def _ping(self, url: str, params: dict = None, data: str = None):
        try:
            if data:
                requests.post(url, data=data, timeout=self.timeout)
            else:
                requests.get(url, params=params, timeout=self.timeout)
        except requests.RequestException as e:
            # Don't let monitoring failures break the job
            logger.warning(f"Failed to ping monitoring endpoint: {e}")


# Usage
@JobMonitor(job_name="daily-report", ping_url="https://cronradar.io/ping/xyz789")
def generate_daily_report():
    """Generates and emails daily sales report."""
    data = fetch_sales_data()
    report = compile_report(data)
    send_email(report)

scheduler.add_job(
    generate_daily_report,
    'cron',
    hour=6,
    id='daily-report',
    replace_existing=True
)

With this pattern, your monitoring dashboard shows:

  • Job started at 06:00:03
  • Job completed at 06:00:47
  • Duration: 44 seconds
  • Status: Success

Or when things go wrong:

  • Job started at 06:00:02
  • Job failed at 06:02:15
  • Duration: 133 seconds
  • Error: ConnectionError: Database connection refused

Combining Event Listeners with External Monitoring

For comprehensive monitoring, combine APScheduler's event system with external heartbeat monitoring:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import (
    EVENT_JOB_EXECUTED,
    EVENT_JOB_ERROR,
    EVENT_JOB_MISSED,
    EVENT_JOB_MAX_INSTANCES
)
import requests
import logging

logger = logging.getLogger(__name__)

CRONRADAR_BASE_URL = "https://cronradar.io/ping"
JOBS_CONFIG = {
    'nightly-backup': 'abc123',
    'hourly-sync': 'def456',
    'daily-report': 'ghi789',
}

def monitoring_listener(event):
    """Send all job events to external monitoring."""
    job_id = event.job_id
    ping_key = JOBS_CONFIG.get(job_id)
    
    if not ping_key:
        return
    
    base_url = f"{CRONRADAR_BASE_URL}/{ping_key}"
    
    try:
        if event.code == EVENT_JOB_MISSED:
            # Job never ran
            requests.post(
                f"{base_url}/fail",
                data=f"Job missed scheduled time: {event.scheduled_run_time}",
                timeout=10
            )
            logger.warning(f"Job '{job_id}' missed execution window")
            
        elif event.code == EVENT_JOB_MAX_INSTANCES:
            # Job skipped due to overlap
            requests.post(
                f"{base_url}/fail",
                data="Skipped: previous instance still running",
                timeout=10
            )
            logger.warning(f"Job '{job_id}' skipped - max instances reached")
            
        elif event.exception:
            # Job failed with exception
            requests.post(
                f"{base_url}/fail",
                data=f"{type(event.exception).__name__}: {event.exception}",
                timeout=10
            )
            logger.error(f"Job '{job_id}' failed: {event.exception}")
            
        else:
            # Job succeeded
            requests.get(base_url, timeout=10)
            logger.info(f"Job '{job_id}' completed successfully")
            
    except requests.RequestException as e:
        logger.warning(f"Failed to send monitoring event: {e}")


scheduler = BackgroundScheduler(
    job_defaults={
        'coalesce': True,           # Combine missed runs
        'max_instances': 1,         # Prevent overlap
        'misfire_grace_time': 300   # 5 min grace period
    }
)

scheduler.add_listener(
    monitoring_listener,
    EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES
)

This catches events at the scheduler level, including missed executions and overlap skips that the decorator pattern can't detect.

Production Configuration Best Practices

Prevent Common Failures

Configure APScheduler to handle real-world conditions:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

executors = {
    'default': ThreadPoolExecutor(20),      # More threads for I/O jobs
    'cpu_intensive': ProcessPoolExecutor(4)  # Separate pool for CPU work
}

job_defaults = {
    'coalesce': True,           # If multiple runs missed, only run once
    'max_instances': 1,         # Prevent overlapping executions
    'misfire_grace_time': 300,  # Run if less than 5 min late
}

scheduler = BackgroundScheduler(
    executors=executors,
    job_defaults=job_defaults,
    timezone='UTC'  # Always use UTC to avoid DST issues
)

Keep the Scheduler Running

For BackgroundScheduler, you need something to keep the main thread alive:

import signal
import time
import atexit

def graceful_shutdown(signum=None, frame=None):
    """Shutdown scheduler gracefully, waiting for running jobs."""
    scheduler.shutdown(wait=True)

# Handle termination signals
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)
atexit.register(lambda: scheduler.shutdown(wait=False))

# Keep main thread alive
scheduler.start()
try:
    while True:
        time.sleep(60)
except (KeyboardInterrupt, SystemExit):
    graceful_shutdown()

Single-Instance Deployment

For multi-worker deployments (Gunicorn, uWSGI), run the scheduler in only one process:

Option 1: Dedicated scheduler process

# Run web app and scheduler separately
gunicorn app:app --workers 4
python scheduler_worker.py  # Separate process

Option 2: Gunicorn preload

# gunicorn.conf.py
preload_app = True
workers = 4

def on_starting(server):
    """Start scheduler only in master process."""
    from myapp.scheduler import scheduler
    scheduler.start()

Option 3: Environment variable flag

import os

if os.environ.get('RUN_SCHEDULER') == 'true':
    scheduler.start()

Then run one worker with the flag:

RUN_SCHEDULER=true gunicorn app:app --workers 1 &
gunicorn app:app --workers 3

Health Check Endpoint

Expose scheduler status for container orchestration:

from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/health/scheduler')
def scheduler_health():
    if not scheduler.running:
        return jsonify({
            'status': 'unhealthy',
            'error': 'Scheduler not running'
        }), 503
    
    jobs = scheduler.get_jobs()
    return jsonify({
        'status': 'healthy',
        'scheduler_running': True,
        'job_count': len(jobs),
        'jobs': [
            {
                'id': job.id,
                'next_run': job.next_run_time.isoformat() if job.next_run_time else None,
                'trigger': str(job.trigger)
            }
            for job in jobs
        ]
    })

Troubleshooting Common Issues

Jobs Not Running at All

Check 1: Is the scheduler actually running?

print(f"Scheduler running: {scheduler.running}")
print(f"Jobs registered: {scheduler.get_jobs()}")

Check 2: uWSGI thread issue Add --enable-threads to uWSGI command. uWSGI disables Python threads by default, which silently breaks BackgroundScheduler.

Check 3: Script exits immediately BackgroundScheduler runs in a daemon thread. Add a blocking call or use BlockingScheduler for dedicated scheduler processes.

Jobs Running Multiple Times

Cause: Multiple workers, each running a scheduler instance

Verify by logging process ID:

import os

def my_job():
    print(f"Job running in process {os.getpid()}")

Solution: Use single-instance deployment patterns described above.

Jobs Missed During Restarts

Cause: Memory job store doesn't persist

Solution: Use persistent job store:

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

jobstores = {
    'default': SQLAlchemyJobStore(url='postgresql://user:pass@localhost/jobs')
}

scheduler = BackgroundScheduler(jobstores=jobstores)

But remember: don't share persistent stores across multiple processes.

Database Connection Timeouts

Error:

OperationalError: (2006, 'MySQL server has gone away')

Cause: Database wait_timeout shorter than job interval

Solution: Configure connection pool recycling:

from sqlalchemy import create_engine

engine = create_engine(
    'mysql://user:pass@localhost/jobs',
    pool_recycle=3600,  # Recycle connections hourly
    pool_pre_ping=True  # Verify connections before use
)

Timezone and DST Problems

Jobs scheduled at 2:00 AM can skip during spring DST transition or run twice during fall transition.

Solution: Always use UTC:

scheduler = BackgroundScheduler(timezone='UTC')

# Convert to local time in job logic if needed
from datetime import datetime
import pytz

def my_job():
    local_tz = pytz.timezone('America/New_York')
    local_time = datetime.now(local_tz)
    # Job logic using local_time

Putting It All Together

Here's a production-ready setup with comprehensive monitoring:

import os
import signal
import atexit
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import (
    EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, 
    EVENT_JOB_MISSED, EVENT_JOB_MAX_INSTANCES
)
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration
CRONRADAR_BASE = os.environ.get('CRONRADAR_URL', 'https://cronradar.io/ping')
DATABASE_URL = os.environ.get('DATABASE_URL', 'sqlite:///jobs.db')

# Job monitoring configuration
MONITORED_JOBS = {
    'nightly-backup': os.environ.get('BACKUP_PING_KEY'),
    'hourly-sync': os.environ.get('SYNC_PING_KEY'),
}

def send_to_cronradar(job_id: str, status: str, error: str = None):
    """Send job status to CronRadar."""
    ping_key = MONITORED_JOBS.get(job_id)
    if not ping_key:
        return
    
    url = f"{CRONRADAR_BASE}/{ping_key}"
    try:
        if status == 'success':
            requests.get(url, timeout=10)
        elif status == 'failed':
            requests.post(f"{url}/fail", data=error or 'Unknown error', timeout=10)
    except requests.RequestException as e:
        logger.warning(f"Failed to notify CronRadar: {e}")

def job_event_handler(event):
    """Handle all job events."""
    job_id = event.job_id
    
    if event.code == EVENT_JOB_MISSED:
        send_to_cronradar(job_id, 'failed', f'Missed: {event.scheduled_run_time}')
    elif event.code == EVENT_JOB_MAX_INSTANCES:
        send_to_cronradar(job_id, 'failed', 'Skipped: max instances reached')
    elif event.exception:
        send_to_cronradar(job_id, 'failed', str(event.exception))
    else:
        send_to_cronradar(job_id, 'success')

# Scheduler setup
scheduler = BackgroundScheduler(
    jobstores={'default': SQLAlchemyJobStore(url=DATABASE_URL)},
    executors={'default': ThreadPoolExecutor(10)},
    job_defaults={
        'coalesce': True,
        'max_instances': 1,
        'misfire_grace_time': 300
    },
    timezone='UTC'
)

scheduler.add_listener(
    job_event_handler,
    EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES
)

# Graceful shutdown
def shutdown():
    if scheduler.running:
        scheduler.shutdown(wait=True)

signal.signal(signal.SIGTERM, lambda s, f: shutdown())
signal.signal(signal.SIGINT, lambda s, f: shutdown())
atexit.register(shutdown)

# Define and register jobs
def nightly_backup():
    logger.info("Starting nightly backup")
    # Backup logic here
    logger.info("Backup complete")

def hourly_sync():
    logger.info("Starting data sync")
    # Sync logic here
    logger.info("Sync complete")

scheduler.add_job(
    nightly_backup,
    'cron',
    hour=2,
    id='nightly-backup',
    replace_existing=True
)

scheduler.add_job(
    hourly_sync,
    'interval',
    hours=1,
    id='hourly-sync',
    replace_existing=True
)

if __name__ == '__main__':
    scheduler.start()
    logger.info(f"Scheduler started with {len(scheduler.get_jobs())} jobs")
    
    # Keep running
    import time
    while True:
        time.sleep(60)

Next Steps

APScheduler handles the scheduling. You need something to handle the monitoring.

Set up external monitoring for your scheduled jobs:

  1. Create monitors for each critical job with expected schedules
  2. Configure alerts via Slack, email, or PagerDuty
  3. Add the ping URLs to your jobs using the patterns above
  4. Set grace periods matching your misfire_grace_time

Stop wondering if your scheduled jobs are running. Start knowing.

Start monitoring your cron jobs

Get started in minutes. No credit card required.