Files
the-other-dude/backend/app/services/backup_scheduler.py
Jason Staack b840047e19 feat: The Other Dude v9.0.1 — full-featured email system
ci: add GitHub Pages deployment workflow for docs site

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 19:30:44 -05:00

198 lines
6.3 KiB
Python

"""Dynamic backup scheduler — reads cron schedules from DB, manages APScheduler jobs."""
import logging
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from app.database import AdminAsyncSessionLocal
from app.models.config_backup import ConfigBackupSchedule
from app.models.device import Device
from app.services import backup_service
from sqlalchemy import select
logger = logging.getLogger(__name__)
_scheduler: Optional[AsyncIOScheduler] = None
# System default: 2am UTC daily
DEFAULT_CRON = "0 2 * * *"
def _cron_to_trigger(cron_expr: str) -> Optional[CronTrigger]:
"""Parse a 5-field cron expression into an APScheduler CronTrigger.
Returns None if the expression is invalid.
"""
try:
parts = cron_expr.strip().split()
if len(parts) != 5:
return None
minute, hour, day, month, day_of_week = parts
return CronTrigger(
minute=minute, hour=hour, day=day, month=month,
day_of_week=day_of_week, timezone="UTC",
)
except Exception as e:
logger.warning("Invalid cron expression '%s': %s", cron_expr, e)
return None
def build_schedule_map(schedules: list) -> dict[str, list[dict]]:
"""Group device schedules by cron expression.
Returns: {cron_expression: [{device_id, tenant_id}, ...]}
"""
schedule_map: dict[str, list[dict]] = {}
for s in schedules:
if not s.enabled:
continue
cron = s.cron_expression or DEFAULT_CRON
if cron not in schedule_map:
schedule_map[cron] = []
schedule_map[cron].append({
"device_id": str(s.device_id),
"tenant_id": str(s.tenant_id),
})
return schedule_map
async def _run_scheduled_backups(devices: list[dict]) -> None:
"""Run backups for a list of devices. Each failure is isolated."""
success_count = 0
failure_count = 0
for dev_info in devices:
try:
async with AdminAsyncSessionLocal() as session:
await backup_service.run_backup(
device_id=dev_info["device_id"],
tenant_id=dev_info["tenant_id"],
trigger_type="scheduled",
db_session=session,
)
await session.commit()
logger.info("Scheduled backup OK: device %s", dev_info["device_id"])
success_count += 1
except Exception as e:
logger.error(
"Scheduled backup FAILED: device %s: %s",
dev_info["device_id"], e,
)
failure_count += 1
logger.info(
"Backup batch complete — %d succeeded, %d failed",
success_count, failure_count,
)
async def _load_effective_schedules() -> list:
"""Load all effective schedules from DB.
For each device: use device-specific schedule if exists, else tenant default.
Returns flat list of (device_id, tenant_id, cron_expression, enabled) objects.
"""
from types import SimpleNamespace
async with AdminAsyncSessionLocal() as session:
# Get all devices
dev_result = await session.execute(select(Device))
devices = dev_result.scalars().all()
# Get all schedules
sched_result = await session.execute(select(ConfigBackupSchedule))
schedules = sched_result.scalars().all()
# Index: device-specific and tenant defaults
device_schedules = {} # device_id -> schedule
tenant_defaults = {} # tenant_id -> schedule
for s in schedules:
if s.device_id:
device_schedules[str(s.device_id)] = s
else:
tenant_defaults[str(s.tenant_id)] = s
effective = []
for dev in devices:
dev_id = str(dev.id)
tenant_id = str(dev.tenant_id)
if dev_id in device_schedules:
sched = device_schedules[dev_id]
elif tenant_id in tenant_defaults:
sched = tenant_defaults[tenant_id]
else:
# No schedule configured — use system default
sched = None
effective.append(SimpleNamespace(
device_id=dev_id,
tenant_id=tenant_id,
cron_expression=sched.cron_expression if sched else DEFAULT_CRON,
enabled=sched.enabled if sched else True,
))
return effective
async def sync_schedules() -> None:
"""Reload all schedules from DB and reconfigure APScheduler jobs."""
global _scheduler
if not _scheduler:
return
# Remove all existing backup jobs (keep other jobs like firmware check)
for job in _scheduler.get_jobs():
if job.id.startswith("backup_cron_"):
job.remove()
schedules = await _load_effective_schedules()
schedule_map = build_schedule_map(schedules)
for cron_expr, devices in schedule_map.items():
trigger = _cron_to_trigger(cron_expr)
if not trigger:
logger.warning("Skipping invalid cron '%s', using default", cron_expr)
trigger = _cron_to_trigger(DEFAULT_CRON)
job_id = f"backup_cron_{cron_expr.replace(' ', '_')}"
_scheduler.add_job(
_run_scheduled_backups,
trigger=trigger,
args=[devices],
id=job_id,
name=f"Backup: {cron_expr} ({len(devices)} devices)",
max_instances=1,
replace_existing=True,
)
logger.info("Scheduled %d devices with cron '%s'", len(devices), cron_expr)
async def on_schedule_change(tenant_id: str, device_id: str) -> None:
"""Called when a schedule is created/updated via API. Hot-reloads all schedules."""
logger.info("Schedule changed for tenant=%s device=%s, resyncing", tenant_id, device_id)
await sync_schedules()
async def start_backup_scheduler() -> None:
"""Start the APScheduler and load initial schedules from DB."""
global _scheduler
_scheduler = AsyncIOScheduler(timezone="UTC")
_scheduler.start()
await sync_schedules()
logger.info("Backup scheduler started with dynamic schedules")
async def stop_backup_scheduler() -> None:
"""Gracefully shutdown the scheduler."""
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info("Backup scheduler stopped")