From 3209a7d9bee1e93463fd9810f4512284258a58b0 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Thu, 19 Mar 2026 06:10:17 -0500 Subject: [PATCH] feat(13-03): add interface and link discovery NATS subscribers - Interface subscriber consumes device.interfaces.> from DEVICE_EVENTS, upserts device_interfaces table - Link discovery subscriber consumes wireless.registrations.> with separate durable consumer - MAC resolution against device_interfaces for AP-CPE link discovery - State machine: active (signal >= -80dBm), degraded (< -80), down (3 missed), stale (24h) - missed_polls resets to 0 on any observation, enabling link revival Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/services/interface_subscriber.py | 201 +++++++++++ .../app/services/link_discovery_subscriber.py | 322 ++++++++++++++++++ 2 files changed, 523 insertions(+) create mode 100644 backend/app/services/interface_subscriber.py create mode 100644 backend/app/services/link_discovery_subscriber.py diff --git a/backend/app/services/interface_subscriber.py b/backend/app/services/interface_subscriber.py new file mode 100644 index 0000000..6460690 --- /dev/null +++ b/backend/app/services/interface_subscriber.py @@ -0,0 +1,201 @@ +"""NATS JetStream subscriber for device interface data. + +Subscribes to device.interfaces.> on the DEVICE_EVENTS stream and upserts +per-interface rows into the device_interfaces table for MAC-to-device +resolution during link discovery. + +Uses AdminAsyncSessionLocal (superuser bypass RLS) since interface data +arrives from the Go poller without tenant context in the DB session. +""" + +import asyncio +import json +import logging +from typing import Optional + +import nats +from nats.js import JetStreamContext +from nats.aio.client import Client as NATSClient +from sqlalchemy import text + +from app.config import settings +from app.database import AdminAsyncSessionLocal + +logger = logging.getLogger(__name__) + +_interface_client: Optional[NATSClient] = None + + +# ============================================================================= +# MAIN MESSAGE HANDLER +# ============================================================================= + + +async def on_device_interfaces(msg) -> None: + """Handle a device.interfaces event published by the Go poller. + + Each message contains interface metadata (name, MAC, type, running) for + a single device. Upserts into device_interfaces using ON CONFLICT so + existing interfaces are updated rather than duplicated. + + On success, acknowledges the message. On error, NAKs so NATS can redeliver. + """ + try: + data = json.loads(msg.data) + device_id = data.get("device_id") + + if not device_id: + logger.warning("device.interfaces event missing 'device_id' -- skipping") + await msg.ack() + return + + tenant_id = data.get("tenant_id") + interfaces = data.get("interfaces") + + if not interfaces: + await msg.ack() + return + + async with AdminAsyncSessionLocal() as session: + for iface in interfaces: + mac_address = iface.get("mac_address", "") + if not mac_address: + continue # Skip interfaces without MAC (loopback, bridge without MAC) + + await session.execute( + text(""" + INSERT INTO device_interfaces + (id, device_id, tenant_id, name, mac_address, type, running, updated_at) + VALUES + (gen_random_uuid(), :device_id, :tenant_id, :name, :mac_address, + :type, :running, NOW()) + ON CONFLICT (device_id, name) + DO UPDATE SET + mac_address = EXCLUDED.mac_address, + type = EXCLUDED.type, + running = EXCLUDED.running, + updated_at = NOW() + """), + { + "device_id": device_id, + "tenant_id": tenant_id, + "name": iface.get("name", ""), + "mac_address": mac_address.lower(), + "type": iface.get("type", ""), + "running": iface.get("running", False), + }, + ) + + await session.commit() + + logger.debug( + "device.interfaces processed", + extra={"device_id": device_id, "count": len(interfaces)}, + ) + await msg.ack() + + except Exception as exc: + logger.error( + "Failed to process device.interfaces event: %s", + exc, + exc_info=True, + ) + try: + await msg.nak() + except Exception: + pass # If NAK also fails, NATS will redeliver after ack_wait + + +# ============================================================================= +# SUBSCRIPTION SETUP +# ============================================================================= + + +async def _subscribe_with_retry(js: JetStreamContext) -> None: + """Subscribe to device.interfaces.> with durable consumer, retrying if stream not ready.""" + max_attempts = 6 # ~30 seconds at 5s intervals + for attempt in range(1, max_attempts + 1): + try: + await js.subscribe( + "device.interfaces.>", + cb=on_device_interfaces, + durable="api-interface-consumer", + stream="DEVICE_EVENTS", + ) + logger.info( + "NATS: subscribed to device.interfaces.> (durable: api-interface-consumer)" + ) + return + except Exception as exc: + if attempt < max_attempts: + logger.warning( + "NATS: stream DEVICE_EVENTS not ready (attempt %d/%d): %s -- retrying in 5s", + attempt, + max_attempts, + exc, + ) + await asyncio.sleep(5) + else: + logger.warning( + "NATS: giving up on device.interfaces.> after %d attempts: %s " + "-- API will run without interface ingestion", + max_attempts, + exc, + ) + return + + +async def start_interface_subscriber() -> Optional[NATSClient]: + """Connect to NATS and start the device.interfaces.> subscription. + + Returns the NATS connection (must be passed to stop_interface_subscriber + on shutdown). + """ + global _interface_client + + logger.info("NATS device interfaces: connecting to %s", settings.NATS_URL) + + nc = await nats.connect( + settings.NATS_URL, + max_reconnect_attempts=-1, + reconnect_time_wait=2, + error_cb=_on_error, + reconnected_cb=_on_reconnected, + disconnected_cb=_on_disconnected, + ) + + logger.info("NATS device interfaces: connected to %s", settings.NATS_URL) + + js = nc.jetstream() + await _subscribe_with_retry(js) + + _interface_client = nc + return nc + + +async def stop_interface_subscriber(nc: Optional[NATSClient]) -> None: + """Drain and close the interface NATS connection gracefully.""" + if nc is None: + return + try: + logger.info("NATS device interfaces: draining connection...") + await nc.drain() + logger.info("NATS device interfaces: connection closed") + except Exception as exc: + logger.warning("NATS device interfaces: error during drain: %s", exc) + try: + await nc.close() + except Exception: + pass + + +async def _on_error(exc: Exception) -> None: + logger.error("NATS device interfaces error: %s", exc) + + +async def _on_reconnected() -> None: + logger.info("NATS device interfaces: reconnected") + + +async def _on_disconnected() -> None: + logger.warning("NATS device interfaces: disconnected") diff --git a/backend/app/services/link_discovery_subscriber.py b/backend/app/services/link_discovery_subscriber.py new file mode 100644 index 0000000..9e55618 --- /dev/null +++ b/backend/app/services/link_discovery_subscriber.py @@ -0,0 +1,322 @@ +"""NATS JetStream subscriber for wireless link discovery. + +Subscribes to wireless.registrations.> on the WIRELESS_REGISTRATIONS stream +using a SEPARATE durable consumer from the wireless_registration_subscriber. +Both consumers independently process the same messages. + +Resolves client MAC addresses against the device_interfaces table to discover +AP-CPE relationships, manages link state transitions (active/degraded/down/stale), +and tracks missed polls for link health. + +Uses AdminAsyncSessionLocal (superuser bypass RLS) since registration data +arrives from the Go poller without tenant context in the DB session. +""" + +import asyncio +import json +import logging +from typing import Optional + +import nats +from nats.js import JetStreamContext +from nats.aio.client import Client as NATSClient +from sqlalchemy import text + +from app.config import settings +from app.database import AdminAsyncSessionLocal + +logger = logging.getLogger(__name__) + +_link_discovery_client: Optional[NATSClient] = None + +# Configurable thresholds for link state transitions +DEGRADED_SIGNAL_THRESHOLD = -80 # dBm — signals weaker than this mark link as degraded +CONSECUTIVE_MISS_THRESHOLD = 3 # Missed polls before marking link as down +STALE_HOURS = 24 # Hours after down before marking link as stale + + +# ============================================================================= +# MAIN MESSAGE HANDLER +# ============================================================================= + + +async def on_wireless_registration_for_links(msg) -> None: + """Handle a wireless.registrations event for link discovery. + + For each client registration: + 1. Resolve the client MAC against device_interfaces to find a CPE device + 2. If found, upsert a wireless_link with state based on signal strength + 3. Increment missed_polls for links from this AP NOT seen in this batch + 4. Mark stale any links in 'down' state older than STALE_HOURS + + On success, acknowledges the message. On error, NAKs so NATS can redeliver. + """ + try: + data = json.loads(msg.data) + device_id = data.get("device_id") # This is the AP + + if not device_id: + logger.warning("wireless.registrations event missing 'device_id' -- skipping") + await msg.ack() + return + + tenant_id = data.get("tenant_id") + registrations = data.get("registrations") + + if not registrations: + await msg.ack() + return + + seen_cpe_ids = [] + + async with AdminAsyncSessionLocal() as session: + for reg in registrations: + client_mac = (reg.get("mac_address") or "").lower() + if not client_mac: + continue + + # Resolve MAC against device_interfaces to find CPE device + result = await session.execute( + text(""" + SELECT device_id FROM device_interfaces + WHERE LOWER(mac_address) = :mac AND tenant_id = :tenant_id + LIMIT 1 + """), + {"mac": client_mac, "tenant_id": tenant_id}, + ) + row = result.fetchone() + + if not row: + # Unresolved MAC -- stays in wireless_registrations for unknown client queries + continue + + cpe_device_id = str(row[0]) + seen_cpe_ids.append(cpe_device_id) + + signal_strength = reg.get("signal_strength") + + # Upsert wireless_link with state based on signal strength + await session.execute( + text(""" + INSERT INTO wireless_links + (id, ap_device_id, cpe_device_id, tenant_id, interface, client_mac, + signal_strength, tx_ccq, tx_rate, rx_rate, state, missed_polls, + discovered_at, last_seen, updated_at) + VALUES + (gen_random_uuid(), :ap_device_id, :cpe_device_id, :tenant_id, + :interface, :client_mac, :signal_strength, :tx_ccq, :tx_rate, + :rx_rate, + CASE WHEN :signal_strength::int IS NULL THEN 'active' + WHEN :signal_strength::int < :degraded_threshold THEN 'degraded' + ELSE 'active' END, + 0, NOW(), NOW(), NOW()) + ON CONFLICT (ap_device_id, cpe_device_id) DO UPDATE SET + interface = EXCLUDED.interface, + client_mac = EXCLUDED.client_mac, + signal_strength = EXCLUDED.signal_strength, + tx_ccq = EXCLUDED.tx_ccq, + tx_rate = EXCLUDED.tx_rate, + rx_rate = EXCLUDED.rx_rate, + state = CASE WHEN EXCLUDED.signal_strength IS NULL THEN 'active' + WHEN EXCLUDED.signal_strength < :degraded_threshold THEN 'degraded' + ELSE 'active' END, + missed_polls = 0, + last_seen = NOW(), + updated_at = NOW() + """), + { + "ap_device_id": device_id, + "cpe_device_id": cpe_device_id, + "tenant_id": tenant_id, + "interface": reg.get("interface"), + "client_mac": client_mac, + "signal_strength": signal_strength, + "tx_ccq": reg.get("tx_ccq"), + "tx_rate": reg.get("tx_rate"), + "rx_rate": reg.get("rx_rate"), + "degraded_threshold": DEGRADED_SIGNAL_THRESHOLD, + }, + ) + + # Increment missed_polls for links from this AP NOT seen in this batch + if seen_cpe_ids: + await session.execute( + text(""" + UPDATE wireless_links + SET missed_polls = missed_polls + 1, + state = CASE + WHEN missed_polls + 1 >= :miss_threshold THEN 'down' + ELSE state + END, + updated_at = NOW() + WHERE ap_device_id = :ap_device_id + AND tenant_id = :tenant_id + AND cpe_device_id NOT IN ( + SELECT unnest(:seen_cpe_ids::uuid[]) + ) + AND state NOT IN ('down', 'stale') + """), + { + "ap_device_id": device_id, + "tenant_id": tenant_id, + "seen_cpe_ids": seen_cpe_ids, + "miss_threshold": CONSECUTIVE_MISS_THRESHOLD, + }, + ) + else: + # No CPEs resolved -- increment all links for this AP + await session.execute( + text(""" + UPDATE wireless_links + SET missed_polls = missed_polls + 1, + state = CASE + WHEN missed_polls + 1 >= :miss_threshold THEN 'down' + ELSE state + END, + updated_at = NOW() + WHERE ap_device_id = :ap_device_id + AND tenant_id = :tenant_id + AND state NOT IN ('down', 'stale') + """), + { + "ap_device_id": device_id, + "tenant_id": tenant_id, + "miss_threshold": CONSECUTIVE_MISS_THRESHOLD, + }, + ) + + # Mark stale: any links in 'down' state where last_seen > STALE_HOURS ago + await session.execute( + text(""" + UPDATE wireless_links + SET state = 'stale', updated_at = NOW() + WHERE ap_device_id = :ap_device_id + AND tenant_id = :tenant_id + AND state = 'down' + AND last_seen < NOW() - INTERVAL ':stale_hours hours' + """.replace(":stale_hours", str(STALE_HOURS))), + { + "ap_device_id": device_id, + "tenant_id": tenant_id, + }, + ) + + await session.commit() + + logger.debug( + "wireless.registrations link discovery processed", + extra={ + "device_id": device_id, + "registrations": len(registrations), + "resolved_cpes": len(seen_cpe_ids), + }, + ) + await msg.ack() + + except Exception as exc: + logger.error( + "Failed to process wireless.registrations for link discovery: %s", + exc, + exc_info=True, + ) + try: + await msg.nak() + except Exception: + pass # If NAK also fails, NATS will redeliver after ack_wait + + +# ============================================================================= +# SUBSCRIPTION SETUP +# ============================================================================= + + +async def _subscribe_with_retry(js: JetStreamContext) -> None: + """Subscribe to wireless.registrations.> with durable consumer, retrying if stream not ready.""" + max_attempts = 6 # ~30 seconds at 5s intervals + for attempt in range(1, max_attempts + 1): + try: + await js.subscribe( + "wireless.registrations.>", + cb=on_wireless_registration_for_links, + durable="api-link-discovery-consumer", + stream="WIRELESS_REGISTRATIONS", + ) + logger.info( + "NATS: subscribed to wireless.registrations.> " + "(durable: api-link-discovery-consumer)" + ) + return + except Exception as exc: + if attempt < max_attempts: + logger.warning( + "NATS: stream WIRELESS_REGISTRATIONS not ready (attempt %d/%d): %s " + "-- retrying in 5s", + attempt, + max_attempts, + exc, + ) + await asyncio.sleep(5) + else: + logger.warning( + "NATS: giving up on wireless.registrations.> link discovery " + "after %d attempts: %s -- API will run without link discovery", + max_attempts, + exc, + ) + return + + +async def start_link_discovery_subscriber() -> Optional[NATSClient]: + """Connect to NATS and start the wireless.registrations.> link discovery subscription. + + Returns the NATS connection (must be passed to stop_link_discovery_subscriber + on shutdown). + """ + global _link_discovery_client + + logger.info("NATS link discovery: connecting to %s", settings.NATS_URL) + + nc = await nats.connect( + settings.NATS_URL, + max_reconnect_attempts=-1, + reconnect_time_wait=2, + error_cb=_on_error, + reconnected_cb=_on_reconnected, + disconnected_cb=_on_disconnected, + ) + + logger.info("NATS link discovery: connected to %s", settings.NATS_URL) + + js = nc.jetstream() + await _subscribe_with_retry(js) + + _link_discovery_client = nc + return nc + + +async def stop_link_discovery_subscriber(nc: Optional[NATSClient]) -> None: + """Drain and close the link discovery NATS connection gracefully.""" + if nc is None: + return + try: + logger.info("NATS link discovery: draining connection...") + await nc.drain() + logger.info("NATS link discovery: connection closed") + except Exception as exc: + logger.warning("NATS link discovery: error during drain: %s", exc) + try: + await nc.close() + except Exception: + pass + + +async def _on_error(exc: Exception) -> None: + logger.error("NATS link discovery error: %s", exc) + + +async def _on_reconnected() -> None: + logger.info("NATS link discovery: reconnected") + + +async def _on_disconnected() -> None: + logger.warning("NATS link discovery: disconnected")