diff --git a/backend/app/main.py b/backend/app/main.py index 6cdb641..4640b1b 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -340,11 +340,41 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: except Exception as exc: logger.warning("winbox reconcile loop could not start (non-fatal)", error=str(exc)) + # Start signal trend detection loop (hourly) + trend_task: Optional[asyncio.Task] = None # type: ignore[type-arg] + try: + from app.services.trend_detector import trend_detection_loop + + trend_task = asyncio.create_task(trend_detection_loop()) + except Exception as exc: + logger.warning("trend detection loop could not start (non-fatal)", error=str(exc)) + + # Start site alert evaluation loop (every 5 minutes) + alert_eval_task: Optional[asyncio.Task] = None # type: ignore[type-arg] + try: + from app.services.alert_evaluator_site import alert_evaluation_loop + + alert_eval_task = asyncio.create_task(alert_evaluation_loop()) + except Exception as exc: + logger.warning("alert evaluation loop could not start (non-fatal)", error=str(exc)) + logger.info("startup complete, ready to serve requests") yield # Shutdown logger.info("shutting down TOD API") + if trend_task and not trend_task.done(): + trend_task.cancel() + try: + await trend_task + except asyncio.CancelledError: + pass + if alert_eval_task and not alert_eval_task.done(): + alert_eval_task.cancel() + try: + await alert_eval_task + except asyncio.CancelledError: + pass if winbox_reconcile_task and not winbox_reconcile_task.done(): winbox_reconcile_task.cancel() try: diff --git a/backend/app/services/alert_evaluator_site.py b/backend/app/services/alert_evaluator_site.py new file mode 100644 index 0000000..85a01fb --- /dev/null +++ b/backend/app/services/alert_evaluator_site.py @@ -0,0 +1,233 @@ +"""Site alert rule evaluator -- periodic evaluation of operator-defined alert rules. + +Runs every 5 minutes (configurable via ALERT_EVALUATION_INTERVAL_SECONDS). +Evaluates each enabled site_alert_rule against current data and creates/resolves +site_alert_events with hysteresis (consecutive_hits >= 2 before confirming). + +Supported rule types: + - device_offline_percent: site-scoped, % of devices offline + - device_offline_count: site-scoped, count of offline devices + - sector_signal_avg: sector-scoped, average signal below threshold (dBm) + - sector_client_drop: sector-scoped, client count drop % over 1 hour + +Uses AdminAsyncSessionLocal (bypasses RLS -- evaluation is system-level). +""" + +import asyncio + +import structlog +from sqlalchemy import text + +from app.config import settings +from app.database import AdminAsyncSessionLocal + +logger = structlog.get_logger(__name__) + + +async def _evaluate_condition(session, rule) -> bool: # noqa: ANN001 + """Evaluate a single rule and return True if the alert condition is met.""" + rule_type = rule.rule_type + site_id = str(rule.site_id) + sector_id = str(rule.sector_id) if rule.sector_id else None + threshold = float(rule.threshold_value) + + if rule_type == "device_offline_percent": + total_result = await session.execute( + text("SELECT count(*) AS cnt FROM devices WHERE site_id = :site_id"), + {"site_id": site_id}, + ) + total = total_result.fetchone().cnt + + if total == 0: + return False + + offline_result = await session.execute( + text( + "SELECT count(*) AS cnt FROM devices " + "WHERE site_id = :site_id AND is_online = false" + ), + {"site_id": site_id}, + ) + offline = offline_result.fetchone().cnt + offline_pct = (offline / total) * 100 + return offline_pct > threshold + + elif rule_type == "device_offline_count": + offline_result = await session.execute( + text( + "SELECT count(*) AS cnt FROM devices " + "WHERE site_id = :site_id AND is_online = false" + ), + {"site_id": site_id}, + ) + offline = offline_result.fetchone().cnt + return offline > threshold + + elif rule_type == "sector_signal_avg": + if not sector_id: + return False + avg_result = await session.execute( + text(""" + SELECT avg(wr.signal_strength) AS avg_signal + FROM wireless_registrations wr + JOIN devices d ON d.id = wr.device_id + WHERE d.sector_id = :sector_id + AND wr.collected_at > now() - interval '10 minutes' + """), + {"sector_id": sector_id}, + ) + row = avg_result.fetchone() + if row is None or row.avg_signal is None: + return False + # Threshold is negative dBm (e.g., -75). Condition met when avg is worse (lower). + return float(row.avg_signal) < threshold + + elif rule_type == "sector_client_drop": + if not sector_id: + return False + + # Current client count (last 10 minutes) + current_result = await session.execute( + text(""" + SELECT count(DISTINCT wr.mac_address) AS cnt + FROM wireless_registrations wr + JOIN devices d ON d.id = wr.device_id + WHERE d.sector_id = :sector_id + AND wr.collected_at > now() - interval '10 minutes' + """), + {"sector_id": sector_id}, + ) + current = current_result.fetchone().cnt + + # Previous client count (60-70 minutes ago) + previous_result = await session.execute( + text(""" + SELECT count(DISTINCT wr.mac_address) AS cnt + FROM wireless_registrations wr + JOIN devices d ON d.id = wr.device_id + WHERE d.sector_id = :sector_id + AND wr.collected_at BETWEEN now() - interval '70 minutes' + AND now() - interval '60 minutes' + """), + {"sector_id": sector_id}, + ) + previous = previous_result.fetchone().cnt + + if previous == 0: + return False + + drop_pct = ((previous - current) / previous) * 100 + return drop_pct > threshold + + else: + logger.warning("unknown rule type", rule_type=rule_type, rule_id=str(rule.id)) + return False + + +async def _evaluate_rules() -> None: + """Evaluate all enabled alert rules and create/resolve events with hysteresis.""" + async with AdminAsyncSessionLocal() as session: + # Fetch all enabled rules across all tenants + rules_result = await session.execute( + text("SELECT * FROM site_alert_rules WHERE enabled = true") + ) + rules = rules_result.fetchall() + + rules_checked = 0 + alerts_triggered = 0 + alerts_resolved = 0 + + for rule in rules: + rules_checked += 1 + rule_id = str(rule.id) + condition_met = await _evaluate_condition(session, rule) + + # Check for existing active event for this rule + existing_result = await session.execute( + text(""" + SELECT id, consecutive_hits FROM site_alert_events + WHERE rule_id = :rule_id + AND state = 'active' + ORDER BY triggered_at DESC + LIMIT 1 + """), + {"rule_id": rule_id}, + ) + active_event = existing_result.fetchone() + + if condition_met: + if active_event: + # Already active -- increment consecutive hits + await session.execute( + text(""" + UPDATE site_alert_events + SET consecutive_hits = consecutive_hits + 1 + WHERE id = :event_id + """), + {"event_id": str(active_event.id)}, + ) + else: + # No active event -- create one with consecutive_hits=1. + # Events with consecutive_hits < 2 are considered "pending" + # (not yet confirmed). On next evaluation if still met, + # consecutive_hits increments to 2 (confirmed alert). + severity = "critical" if rule.rule_type in ( + "device_offline_percent", "device_offline_count" + ) else "warning" + + await session.execute( + text(""" + INSERT INTO site_alert_events + (tenant_id, site_id, sector_id, rule_id, rule_type, + severity, message, state, consecutive_hits, triggered_at) + VALUES + (:tenant_id, :site_id, :sector_id, :rule_id, :rule_type, + :severity, :message, 'active', 1, now()) + """), + { + "tenant_id": str(rule.tenant_id), + "site_id": str(rule.site_id), + "sector_id": str(rule.sector_id) if rule.sector_id else None, + "rule_id": rule_id, + "rule_type": rule.rule_type, + "severity": severity, + "message": f"Alert rule '{rule.name}' condition met", + }, + ) + alerts_triggered += 1 + + else: + # Condition not met + if active_event: + # Auto-resolve: condition cleared + await session.execute( + text(""" + UPDATE site_alert_events + SET state = 'resolved', resolved_at = now() + WHERE id = :event_id + """), + {"event_id": str(active_event.id)}, + ) + alerts_resolved += 1 + + await session.commit() + + logger.info( + "alert evaluation complete", + rules_checked=rules_checked, + alerts_triggered=alerts_triggered, + resolved=alerts_resolved, + ) + + +async def alert_evaluation_loop() -> None: + """Run alert rule evaluation on a configurable interval (default: 5 minutes).""" + interval = getattr(settings, "ALERT_EVALUATION_INTERVAL_SECONDS", 300) + while True: + try: + await _evaluate_rules() + except asyncio.CancelledError: + break + except Exception as e: + logger.error("alert evaluation error", error=str(e)) + await asyncio.sleep(interval) diff --git a/backend/app/services/trend_detector.py b/backend/app/services/trend_detector.py new file mode 100644 index 0000000..9758569 --- /dev/null +++ b/backend/app/services/trend_detector.py @@ -0,0 +1,155 @@ +"""Signal trend detection -- hourly scan for signal degradation across active wireless links. + +Compares 7-day rolling average vs 14-day baseline average per active link. +If the recent average has degraded beyond the configured threshold, creates +a site_alert_event with rule_type 'signal_degradation'. Auto-resolves when +the condition clears. + +Runs as an asyncio background task wired into the FastAPI lifespan. +Uses AdminAsyncSessionLocal (bypasses RLS -- trend detection is system-level). +""" + +import asyncio +from datetime import datetime, timezone + +import structlog +from sqlalchemy import text + +from app.config import settings +from app.database import AdminAsyncSessionLocal + +logger = structlog.get_logger(__name__) + + +async def _detect_trends() -> None: + """Scan all active/degraded wireless links for signal degradation trends.""" + async with AdminAsyncSessionLocal() as session: + # Fetch active/degraded links with site_id derived from the AP device + result = await session.execute( + text(""" + SELECT wl.id, wl.tenant_id, d.site_id, wl.client_mac, wl.ap_device_id + FROM wireless_links wl + JOIN devices d ON d.id = wl.ap_device_id + WHERE wl.state IN ('active', 'degraded') + AND d.site_id IS NOT NULL + """) + ) + links = result.fetchall() + + degradations_found = 0 + resolved_count = 0 + + for link in links: + link_id = link.id + tenant_id = link.tenant_id + site_id = link.site_id + mac = link.client_mac + ap_device_id = link.ap_device_id + + # Compute 7-day average signal + avg_7d_result = await session.execute( + text(""" + SELECT avg(signal_strength) AS avg_signal + FROM wireless_registrations + WHERE mac_address = :mac + AND device_id = :ap_device_id + AND collected_at > now() - interval '7 days' + """), + {"mac": mac, "ap_device_id": str(ap_device_id)}, + ) + avg_7d_row = avg_7d_result.fetchone() + avg_7d = avg_7d_row.avg_signal if avg_7d_row else None + + # Compute 14-day average signal + avg_14d_result = await session.execute( + text(""" + SELECT avg(signal_strength) AS avg_signal + FROM wireless_registrations + WHERE mac_address = :mac + AND device_id = :ap_device_id + AND collected_at > now() - interval '14 days' + """), + {"mac": mac, "ap_device_id": str(ap_device_id)}, + ) + avg_14d_row = avg_14d_result.fetchone() + avg_14d = avg_14d_row.avg_signal if avg_14d_row else None + + if avg_7d is None or avg_14d is None: + continue + + # Signal values are negative dBm -- a more negative 7d avg means degradation. + # delta = 14d_avg - 7d_avg: positive delta means 7d is worse (more negative). + delta = float(avg_14d) - float(avg_7d) + threshold = getattr(settings, "SIGNAL_DEGRADATION_THRESHOLD_DB", 5) + condition_met = delta >= threshold + + # Check for existing active event for this link + existing = await session.execute( + text(""" + SELECT id FROM site_alert_events + WHERE link_id = :link_id + AND rule_type = 'signal_degradation' + AND state = 'active' + LIMIT 1 + """), + {"link_id": str(link_id)}, + ) + active_event = existing.fetchone() + + if condition_met and not active_event: + # Create new degradation alert event + msg = ( + f"Signal degraded {delta:.1f}dB over 2 weeks " + f"(from {float(avg_14d):.0f}dBm to {float(avg_7d):.0f}dBm)" + ) + await session.execute( + text(""" + INSERT INTO site_alert_events + (tenant_id, site_id, link_id, rule_type, severity, message, state, + consecutive_hits, triggered_at) + VALUES + (:tenant_id, :site_id, :link_id, 'signal_degradation', 'warning', + :message, 'active', 1, now()) + """), + { + "tenant_id": str(tenant_id), + "site_id": str(site_id), + "link_id": str(link_id), + "message": msg, + }, + ) + degradations_found += 1 + + elif not condition_met and active_event: + # Auto-resolve: condition cleared + await session.execute( + text(""" + UPDATE site_alert_events + SET state = 'resolved', resolved_at = now() + WHERE id = :event_id + """), + {"event_id": str(active_event.id)}, + ) + resolved_count += 1 + + await session.commit() + + logger.info( + "trend detection complete", + links_checked=len(links), + degradations_found=degradations_found, + resolved=resolved_count, + ) + + +async def trend_detection_loop() -> None: + """Run trend detection on a configurable interval (default: hourly).""" + interval = getattr(settings, "TREND_DETECTION_INTERVAL_SECONDS", 3600) + while True: + try: + await _detect_trends() + except asyncio.CancelledError: + break + except Exception as e: + logger.error("trend detection error", error=str(e)) + await asyncio.sleep(interval)