Python APScheduler Monitoring: Track Every Scheduled Task
Your APScheduler jobs are running. Probably. Maybe. You're not entirely sure because APScheduler doesn't tell you when jobs silently fail, miss their execution window, or crash without logging anything useful.
This is the reality for most Python applications using APScheduler in production. The scheduler runs jobs in background threads, and unless you've built custom monitoring, you're flying blind. A database backup that hasn't run in three days? You'll find out when you need that backup.
This guide shows you how to monitor APScheduler jobs properly—catching failures before they become incidents, tracking execution duration, and getting alerts when something goes wrong.
Why APScheduler Jobs Fail Silently
APScheduler is the most popular in-process scheduler for Python, with good reason. It's flexible, supports multiple job stores, and integrates cleanly with web frameworks. But it has a critical gap: no built-in monitoring or alerting.
The event listener system fires notifications when jobs execute, fail, or miss their window—but those events disappear into the void unless you capture them. There's no execution history, no dashboard, no alerts.
Here's what can go wrong without you knowing:
Jobs That Never Start
The most common production issue is jobs that simply don't run. Your script exits before the scheduler fires:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start()
scheduler.add_job(backup_database, 'cron', hour=2)
# Script exits here - job never runsBackgroundScheduler runs in a daemon thread that dies when the main thread exits. Without a blocking call or signal handler, your job never executes.
Missed Executions
When a job can't run at its scheduled time (server restart, high load, thread pool exhausted), APScheduler logs a warning and moves on:
WARNING: Run time of job "backup_job (trigger: cron[hour='2'],
next run at: 2024-12-05 02:00:00)" was missed by 0:15:32.847291By default, the job is simply skipped. The misfire_grace_time setting controls how late a job can start, but if you're not watching logs, you won't know jobs are being missed.
Concurrent Execution Limits
APScheduler defaults to allowing only one instance of each job running simultaneously. When a job takes longer than its interval, subsequent runs are silently dropped:
Execution of job "sync_data (trigger: interval[0:05:00])" skipped:
maximum number of running instances reached (1)Your 5-minute sync job that suddenly takes 7 minutes? It's now running every 10 minutes and you have no idea.
Multi-Worker Deployments
This catches everyone eventually. You deploy your Flask app with Gunicorn using 4 workers, and suddenly your hourly job runs 4 times per hour—once per worker process.
From the APScheduler FAQ:
"Sharing a persistent job store among two or more processes will lead to incorrect scheduler behavior like duplicate execution or the scheduler missing jobs entirely."
Serialization Failures
Jobs stored in persistent backends (PostgreSQL, Redis, MongoDB) must be serializable. Lambda functions, closures, and bound methods fail silently:
ValueError: This Job cannot be serialized since the reference to its
callable could not be determined. Consider giving a textual reference
(module:function name) instead.Your job gets added but never persists across restarts.
APScheduler's Built-in Event System
APScheduler provides event listeners that fire on job lifecycle events. This is your foundation for monitoring:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import (
EVENT_JOB_EXECUTED,
EVENT_JOB_ERROR,
EVENT_JOB_MISSED,
EVENT_JOB_MAX_INSTANCES
)
def job_listener(event):
if event.exception:
print(f"Job {event.job_id} failed: {event.exception}")
print(f"Traceback: {event.traceback}")
elif event.code == EVENT_JOB_MISSED:
print(f"Job {event.job_id} missed at {event.scheduled_run_time}")
else:
print(f"Job {event.job_id} completed successfully")
scheduler = BackgroundScheduler()
scheduler.add_listener(
job_listener,
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES
)The event object contains:
job_id: The job identifierscheduled_run_time: When the job was supposed to runretval: Return value (on success)exception: Exception object (on failure)traceback: Full traceback string (on failure)
But here's the problem: events fire and disappear. There's no storage, no history, no way to query what happened last week. You need to send these events somewhere useful.
The Heartbeat Monitoring Pattern
The most reliable way to monitor scheduled jobs is the dead man's switch pattern: your job pings an external service on completion. If the ping doesn't arrive within the expected window, you get alerted.
This approach catches failures that internal monitoring misses:
- Server crashes before job completes
- Network partitions preventing job execution
- Container restarts during job runs
- Out-of-memory kills
Here's a basic implementation:
import requests
import functools
import time
import logging
logger = logging.getLogger(__name__)
def monitored_job(ping_url):
"""Decorator that pings a monitoring endpoint on job completion."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
# Signal success with execution duration
requests.get(
ping_url,
params={'duration': int(duration)},
timeout=10
)
return result
except Exception as e:
# Signal failure
requests.post(
f"{ping_url}/fail",
data=str(e),
timeout=10
)
raise
return wrapper
return decorator
# Usage
@monitored_job("https://cronradar.io/ping/abc123")
def nightly_backup():
"""Backs up database to S3."""
dump_database()
upload_to_s3()
scheduler = BackgroundScheduler()
scheduler.add_job(nightly_backup, 'cron', hour=2, id='nightly-backup')When nightly_backup completes successfully, it pings the monitoring URL. If the job fails, crashes, or never runs, the ping never arrives and you get alerted.
Three-Signal Pattern for Complete Visibility
The basic heartbeat catches job failures, but you lose context. Did the job fail after 30 seconds or 30 minutes? Did it even start?
The three-signal pattern provides complete visibility:
- Start signal: Job began execution
- Success signal: Job completed with duration
- Failure signal: Job failed with error details
import requests
import functools
import time
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class JobMonitor:
"""
Comprehensive job monitoring with start/success/failure signals.
Provides:
- Accurate duration tracking (from actual start, not scheduled time)
- Distinguishes between jobs that failed vs never started
- Error context for debugging
"""
def __init__(
self,
job_name: str,
ping_url: str,
timeout: int = 10
):
self.job_name = job_name
self.ping_url = ping_url
self.timeout = timeout
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
# Signal job started
self._ping(f"{self.ping_url}/start")
logger.info(f"Job '{self.job_name}' started")
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
# Signal success with duration
self._ping(self.ping_url, params={'duration': int(duration)})
logger.info(f"Job '{self.job_name}' completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
# Signal failure with error details
self._ping(
f"{self.ping_url}/fail",
data=f"{type(e).__name__}: {str(e)}"
)
logger.error(
f"Job '{self.job_name}' failed after {duration:.2f}s: {e}",
exc_info=True
)
raise
return wrapper
def _ping(self, url: str, params: dict = None, data: str = None):
try:
if data:
requests.post(url, data=data, timeout=self.timeout)
else:
requests.get(url, params=params, timeout=self.timeout)
except requests.RequestException as e:
# Don't let monitoring failures break the job
logger.warning(f"Failed to ping monitoring endpoint: {e}")
# Usage
@JobMonitor(job_name="daily-report", ping_url="https://cronradar.io/ping/xyz789")
def generate_daily_report():
"""Generates and emails daily sales report."""
data = fetch_sales_data()
report = compile_report(data)
send_email(report)
scheduler.add_job(
generate_daily_report,
'cron',
hour=6,
id='daily-report',
replace_existing=True
)With this pattern, your monitoring dashboard shows:
- Job started at 06:00:03
- Job completed at 06:00:47
- Duration: 44 seconds
- Status: Success
Or when things go wrong:
- Job started at 06:00:02
- Job failed at 06:02:15
- Duration: 133 seconds
- Error:
ConnectionError: Database connection refused
Combining Event Listeners with External Monitoring
For comprehensive monitoring, combine APScheduler's event system with external heartbeat monitoring:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import (
EVENT_JOB_EXECUTED,
EVENT_JOB_ERROR,
EVENT_JOB_MISSED,
EVENT_JOB_MAX_INSTANCES
)
import requests
import logging
logger = logging.getLogger(__name__)
CRONRADAR_BASE_URL = "https://cronradar.io/ping"
JOBS_CONFIG = {
'nightly-backup': 'abc123',
'hourly-sync': 'def456',
'daily-report': 'ghi789',
}
def monitoring_listener(event):
"""Send all job events to external monitoring."""
job_id = event.job_id
ping_key = JOBS_CONFIG.get(job_id)
if not ping_key:
return
base_url = f"{CRONRADAR_BASE_URL}/{ping_key}"
try:
if event.code == EVENT_JOB_MISSED:
# Job never ran
requests.post(
f"{base_url}/fail",
data=f"Job missed scheduled time: {event.scheduled_run_time}",
timeout=10
)
logger.warning(f"Job '{job_id}' missed execution window")
elif event.code == EVENT_JOB_MAX_INSTANCES:
# Job skipped due to overlap
requests.post(
f"{base_url}/fail",
data="Skipped: previous instance still running",
timeout=10
)
logger.warning(f"Job '{job_id}' skipped - max instances reached")
elif event.exception:
# Job failed with exception
requests.post(
f"{base_url}/fail",
data=f"{type(event.exception).__name__}: {event.exception}",
timeout=10
)
logger.error(f"Job '{job_id}' failed: {event.exception}")
else:
# Job succeeded
requests.get(base_url, timeout=10)
logger.info(f"Job '{job_id}' completed successfully")
except requests.RequestException as e:
logger.warning(f"Failed to send monitoring event: {e}")
scheduler = BackgroundScheduler(
job_defaults={
'coalesce': True, # Combine missed runs
'max_instances': 1, # Prevent overlap
'misfire_grace_time': 300 # 5 min grace period
}
)
scheduler.add_listener(
monitoring_listener,
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES
)This catches events at the scheduler level, including missed executions and overlap skips that the decorator pattern can't detect.
Production Configuration Best Practices
Prevent Common Failures
Configure APScheduler to handle real-world conditions:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
executors = {
'default': ThreadPoolExecutor(20), # More threads for I/O jobs
'cpu_intensive': ProcessPoolExecutor(4) # Separate pool for CPU work
}
job_defaults = {
'coalesce': True, # If multiple runs missed, only run once
'max_instances': 1, # Prevent overlapping executions
'misfire_grace_time': 300, # Run if less than 5 min late
}
scheduler = BackgroundScheduler(
executors=executors,
job_defaults=job_defaults,
timezone='UTC' # Always use UTC to avoid DST issues
)Keep the Scheduler Running
For BackgroundScheduler, you need something to keep the main thread alive:
import signal
import time
import atexit
def graceful_shutdown(signum=None, frame=None):
"""Shutdown scheduler gracefully, waiting for running jobs."""
scheduler.shutdown(wait=True)
# Handle termination signals
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)
atexit.register(lambda: scheduler.shutdown(wait=False))
# Keep main thread alive
scheduler.start()
try:
while True:
time.sleep(60)
except (KeyboardInterrupt, SystemExit):
graceful_shutdown()Single-Instance Deployment
For multi-worker deployments (Gunicorn, uWSGI), run the scheduler in only one process:
Option 1: Dedicated scheduler process
# Run web app and scheduler separately
gunicorn app:app --workers 4
python scheduler_worker.py # Separate processOption 2: Gunicorn preload
# gunicorn.conf.py
preload_app = True
workers = 4
def on_starting(server):
"""Start scheduler only in master process."""
from myapp.scheduler import scheduler
scheduler.start()Option 3: Environment variable flag
import os
if os.environ.get('RUN_SCHEDULER') == 'true':
scheduler.start()Then run one worker with the flag:
RUN_SCHEDULER=true gunicorn app:app --workers 1 &
gunicorn app:app --workers 3Health Check Endpoint
Expose scheduler status for container orchestration:
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/health/scheduler')
def scheduler_health():
if not scheduler.running:
return jsonify({
'status': 'unhealthy',
'error': 'Scheduler not running'
}), 503
jobs = scheduler.get_jobs()
return jsonify({
'status': 'healthy',
'scheduler_running': True,
'job_count': len(jobs),
'jobs': [
{
'id': job.id,
'next_run': job.next_run_time.isoformat() if job.next_run_time else None,
'trigger': str(job.trigger)
}
for job in jobs
]
})Troubleshooting Common Issues
Jobs Not Running at All
Check 1: Is the scheduler actually running?
print(f"Scheduler running: {scheduler.running}")
print(f"Jobs registered: {scheduler.get_jobs()}")Check 2: uWSGI thread issue Add --enable-threads to uWSGI command. uWSGI disables Python threads by default, which silently breaks BackgroundScheduler.
Check 3: Script exits immediately BackgroundScheduler runs in a daemon thread. Add a blocking call or use BlockingScheduler for dedicated scheduler processes.
Jobs Running Multiple Times
Cause: Multiple workers, each running a scheduler instance
Verify by logging process ID:
import os
def my_job():
print(f"Job running in process {os.getpid()}")Solution: Use single-instance deployment patterns described above.
Jobs Missed During Restarts
Cause: Memory job store doesn't persist
Solution: Use persistent job store:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'default': SQLAlchemyJobStore(url='postgresql://user:pass@localhost/jobs')
}
scheduler = BackgroundScheduler(jobstores=jobstores)But remember: don't share persistent stores across multiple processes.
Database Connection Timeouts
Error:
OperationalError: (2006, 'MySQL server has gone away')Cause: Database wait_timeout shorter than job interval
Solution: Configure connection pool recycling:
from sqlalchemy import create_engine
engine = create_engine(
'mysql://user:pass@localhost/jobs',
pool_recycle=3600, # Recycle connections hourly
pool_pre_ping=True # Verify connections before use
)Timezone and DST Problems
Jobs scheduled at 2:00 AM can skip during spring DST transition or run twice during fall transition.
Solution: Always use UTC:
scheduler = BackgroundScheduler(timezone='UTC')
# Convert to local time in job logic if needed
from datetime import datetime
import pytz
def my_job():
local_tz = pytz.timezone('America/New_York')
local_time = datetime.now(local_tz)
# Job logic using local_timePutting It All Together
Here's a production-ready setup with comprehensive monitoring:
import os
import signal
import atexit
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import (
EVENT_JOB_EXECUTED, EVENT_JOB_ERROR,
EVENT_JOB_MISSED, EVENT_JOB_MAX_INSTANCES
)
import requests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
CRONRADAR_BASE = os.environ.get('CRONRADAR_URL', 'https://cronradar.io/ping')
DATABASE_URL = os.environ.get('DATABASE_URL', 'sqlite:///jobs.db')
# Job monitoring configuration
MONITORED_JOBS = {
'nightly-backup': os.environ.get('BACKUP_PING_KEY'),
'hourly-sync': os.environ.get('SYNC_PING_KEY'),
}
def send_to_cronradar(job_id: str, status: str, error: str = None):
"""Send job status to CronRadar."""
ping_key = MONITORED_JOBS.get(job_id)
if not ping_key:
return
url = f"{CRONRADAR_BASE}/{ping_key}"
try:
if status == 'success':
requests.get(url, timeout=10)
elif status == 'failed':
requests.post(f"{url}/fail", data=error or 'Unknown error', timeout=10)
except requests.RequestException as e:
logger.warning(f"Failed to notify CronRadar: {e}")
def job_event_handler(event):
"""Handle all job events."""
job_id = event.job_id
if event.code == EVENT_JOB_MISSED:
send_to_cronradar(job_id, 'failed', f'Missed: {event.scheduled_run_time}')
elif event.code == EVENT_JOB_MAX_INSTANCES:
send_to_cronradar(job_id, 'failed', 'Skipped: max instances reached')
elif event.exception:
send_to_cronradar(job_id, 'failed', str(event.exception))
else:
send_to_cronradar(job_id, 'success')
# Scheduler setup
scheduler = BackgroundScheduler(
jobstores={'default': SQLAlchemyJobStore(url=DATABASE_URL)},
executors={'default': ThreadPoolExecutor(10)},
job_defaults={
'coalesce': True,
'max_instances': 1,
'misfire_grace_time': 300
},
timezone='UTC'
)
scheduler.add_listener(
job_event_handler,
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES
)
# Graceful shutdown
def shutdown():
if scheduler.running:
scheduler.shutdown(wait=True)
signal.signal(signal.SIGTERM, lambda s, f: shutdown())
signal.signal(signal.SIGINT, lambda s, f: shutdown())
atexit.register(shutdown)
# Define and register jobs
def nightly_backup():
logger.info("Starting nightly backup")
# Backup logic here
logger.info("Backup complete")
def hourly_sync():
logger.info("Starting data sync")
# Sync logic here
logger.info("Sync complete")
scheduler.add_job(
nightly_backup,
'cron',
hour=2,
id='nightly-backup',
replace_existing=True
)
scheduler.add_job(
hourly_sync,
'interval',
hours=1,
id='hourly-sync',
replace_existing=True
)
if __name__ == '__main__':
scheduler.start()
logger.info(f"Scheduler started with {len(scheduler.get_jobs())} jobs")
# Keep running
import time
while True:
time.sleep(60)Next Steps
APScheduler handles the scheduling. You need something to handle the monitoring.
Set up external monitoring for your scheduled jobs:
- Create monitors for each critical job with expected schedules
- Configure alerts via Slack, email, or PagerDuty
- Add the ping URLs to your jobs using the patterns above
- Set grace periods matching your
misfire_grace_time
Stop wondering if your scheduled jobs are running. Start knowing.