feat(13-03): add interface and link discovery NATS subscribers
- Interface subscriber consumes device.interfaces.> from DEVICE_EVENTS, upserts device_interfaces table - Link discovery subscriber consumes wireless.registrations.> with separate durable consumer - MAC resolution against device_interfaces for AP-CPE link discovery - State machine: active (signal >= -80dBm), degraded (< -80), down (3 missed), stale (24h) - missed_polls resets to 0 on any observation, enabling link revival Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
201
backend/app/services/interface_subscriber.py
Normal file
201
backend/app/services/interface_subscriber.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""NATS JetStream subscriber for device interface data.
|
||||
|
||||
Subscribes to device.interfaces.> on the DEVICE_EVENTS stream and upserts
|
||||
per-interface rows into the device_interfaces table for MAC-to-device
|
||||
resolution during link discovery.
|
||||
|
||||
Uses AdminAsyncSessionLocal (superuser bypass RLS) since interface data
|
||||
arrives from the Go poller without tenant context in the DB session.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import nats
|
||||
from nats.js import JetStreamContext
|
||||
from nats.aio.client import Client as NATSClient
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.config import settings
|
||||
from app.database import AdminAsyncSessionLocal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_interface_client: Optional[NATSClient] = None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# MAIN MESSAGE HANDLER
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def on_device_interfaces(msg) -> None:
|
||||
"""Handle a device.interfaces event published by the Go poller.
|
||||
|
||||
Each message contains interface metadata (name, MAC, type, running) for
|
||||
a single device. Upserts into device_interfaces using ON CONFLICT so
|
||||
existing interfaces are updated rather than duplicated.
|
||||
|
||||
On success, acknowledges the message. On error, NAKs so NATS can redeliver.
|
||||
"""
|
||||
try:
|
||||
data = json.loads(msg.data)
|
||||
device_id = data.get("device_id")
|
||||
|
||||
if not device_id:
|
||||
logger.warning("device.interfaces event missing 'device_id' -- skipping")
|
||||
await msg.ack()
|
||||
return
|
||||
|
||||
tenant_id = data.get("tenant_id")
|
||||
interfaces = data.get("interfaces")
|
||||
|
||||
if not interfaces:
|
||||
await msg.ack()
|
||||
return
|
||||
|
||||
async with AdminAsyncSessionLocal() as session:
|
||||
for iface in interfaces:
|
||||
mac_address = iface.get("mac_address", "")
|
||||
if not mac_address:
|
||||
continue # Skip interfaces without MAC (loopback, bridge without MAC)
|
||||
|
||||
await session.execute(
|
||||
text("""
|
||||
INSERT INTO device_interfaces
|
||||
(id, device_id, tenant_id, name, mac_address, type, running, updated_at)
|
||||
VALUES
|
||||
(gen_random_uuid(), :device_id, :tenant_id, :name, :mac_address,
|
||||
:type, :running, NOW())
|
||||
ON CONFLICT (device_id, name)
|
||||
DO UPDATE SET
|
||||
mac_address = EXCLUDED.mac_address,
|
||||
type = EXCLUDED.type,
|
||||
running = EXCLUDED.running,
|
||||
updated_at = NOW()
|
||||
"""),
|
||||
{
|
||||
"device_id": device_id,
|
||||
"tenant_id": tenant_id,
|
||||
"name": iface.get("name", ""),
|
||||
"mac_address": mac_address.lower(),
|
||||
"type": iface.get("type", ""),
|
||||
"running": iface.get("running", False),
|
||||
},
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
|
||||
logger.debug(
|
||||
"device.interfaces processed",
|
||||
extra={"device_id": device_id, "count": len(interfaces)},
|
||||
)
|
||||
await msg.ack()
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"Failed to process device.interfaces event: %s",
|
||||
exc,
|
||||
exc_info=True,
|
||||
)
|
||||
try:
|
||||
await msg.nak()
|
||||
except Exception:
|
||||
pass # If NAK also fails, NATS will redeliver after ack_wait
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# SUBSCRIPTION SETUP
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def _subscribe_with_retry(js: JetStreamContext) -> None:
|
||||
"""Subscribe to device.interfaces.> with durable consumer, retrying if stream not ready."""
|
||||
max_attempts = 6 # ~30 seconds at 5s intervals
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
await js.subscribe(
|
||||
"device.interfaces.>",
|
||||
cb=on_device_interfaces,
|
||||
durable="api-interface-consumer",
|
||||
stream="DEVICE_EVENTS",
|
||||
)
|
||||
logger.info(
|
||||
"NATS: subscribed to device.interfaces.> (durable: api-interface-consumer)"
|
||||
)
|
||||
return
|
||||
except Exception as exc:
|
||||
if attempt < max_attempts:
|
||||
logger.warning(
|
||||
"NATS: stream DEVICE_EVENTS not ready (attempt %d/%d): %s -- retrying in 5s",
|
||||
attempt,
|
||||
max_attempts,
|
||||
exc,
|
||||
)
|
||||
await asyncio.sleep(5)
|
||||
else:
|
||||
logger.warning(
|
||||
"NATS: giving up on device.interfaces.> after %d attempts: %s "
|
||||
"-- API will run without interface ingestion",
|
||||
max_attempts,
|
||||
exc,
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
async def start_interface_subscriber() -> Optional[NATSClient]:
|
||||
"""Connect to NATS and start the device.interfaces.> subscription.
|
||||
|
||||
Returns the NATS connection (must be passed to stop_interface_subscriber
|
||||
on shutdown).
|
||||
"""
|
||||
global _interface_client
|
||||
|
||||
logger.info("NATS device interfaces: connecting to %s", settings.NATS_URL)
|
||||
|
||||
nc = await nats.connect(
|
||||
settings.NATS_URL,
|
||||
max_reconnect_attempts=-1,
|
||||
reconnect_time_wait=2,
|
||||
error_cb=_on_error,
|
||||
reconnected_cb=_on_reconnected,
|
||||
disconnected_cb=_on_disconnected,
|
||||
)
|
||||
|
||||
logger.info("NATS device interfaces: connected to %s", settings.NATS_URL)
|
||||
|
||||
js = nc.jetstream()
|
||||
await _subscribe_with_retry(js)
|
||||
|
||||
_interface_client = nc
|
||||
return nc
|
||||
|
||||
|
||||
async def stop_interface_subscriber(nc: Optional[NATSClient]) -> None:
|
||||
"""Drain and close the interface NATS connection gracefully."""
|
||||
if nc is None:
|
||||
return
|
||||
try:
|
||||
logger.info("NATS device interfaces: draining connection...")
|
||||
await nc.drain()
|
||||
logger.info("NATS device interfaces: connection closed")
|
||||
except Exception as exc:
|
||||
logger.warning("NATS device interfaces: error during drain: %s", exc)
|
||||
try:
|
||||
await nc.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def _on_error(exc: Exception) -> None:
|
||||
logger.error("NATS device interfaces error: %s", exc)
|
||||
|
||||
|
||||
async def _on_reconnected() -> None:
|
||||
logger.info("NATS device interfaces: reconnected")
|
||||
|
||||
|
||||
async def _on_disconnected() -> None:
|
||||
logger.warning("NATS device interfaces: disconnected")
|
||||
322
backend/app/services/link_discovery_subscriber.py
Normal file
322
backend/app/services/link_discovery_subscriber.py
Normal file
@@ -0,0 +1,322 @@
|
||||
"""NATS JetStream subscriber for wireless link discovery.
|
||||
|
||||
Subscribes to wireless.registrations.> on the WIRELESS_REGISTRATIONS stream
|
||||
using a SEPARATE durable consumer from the wireless_registration_subscriber.
|
||||
Both consumers independently process the same messages.
|
||||
|
||||
Resolves client MAC addresses against the device_interfaces table to discover
|
||||
AP-CPE relationships, manages link state transitions (active/degraded/down/stale),
|
||||
and tracks missed polls for link health.
|
||||
|
||||
Uses AdminAsyncSessionLocal (superuser bypass RLS) since registration data
|
||||
arrives from the Go poller without tenant context in the DB session.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import nats
|
||||
from nats.js import JetStreamContext
|
||||
from nats.aio.client import Client as NATSClient
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.config import settings
|
||||
from app.database import AdminAsyncSessionLocal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_link_discovery_client: Optional[NATSClient] = None
|
||||
|
||||
# Configurable thresholds for link state transitions
|
||||
DEGRADED_SIGNAL_THRESHOLD = -80 # dBm — signals weaker than this mark link as degraded
|
||||
CONSECUTIVE_MISS_THRESHOLD = 3 # Missed polls before marking link as down
|
||||
STALE_HOURS = 24 # Hours after down before marking link as stale
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# MAIN MESSAGE HANDLER
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def on_wireless_registration_for_links(msg) -> None:
|
||||
"""Handle a wireless.registrations event for link discovery.
|
||||
|
||||
For each client registration:
|
||||
1. Resolve the client MAC against device_interfaces to find a CPE device
|
||||
2. If found, upsert a wireless_link with state based on signal strength
|
||||
3. Increment missed_polls for links from this AP NOT seen in this batch
|
||||
4. Mark stale any links in 'down' state older than STALE_HOURS
|
||||
|
||||
On success, acknowledges the message. On error, NAKs so NATS can redeliver.
|
||||
"""
|
||||
try:
|
||||
data = json.loads(msg.data)
|
||||
device_id = data.get("device_id") # This is the AP
|
||||
|
||||
if not device_id:
|
||||
logger.warning("wireless.registrations event missing 'device_id' -- skipping")
|
||||
await msg.ack()
|
||||
return
|
||||
|
||||
tenant_id = data.get("tenant_id")
|
||||
registrations = data.get("registrations")
|
||||
|
||||
if not registrations:
|
||||
await msg.ack()
|
||||
return
|
||||
|
||||
seen_cpe_ids = []
|
||||
|
||||
async with AdminAsyncSessionLocal() as session:
|
||||
for reg in registrations:
|
||||
client_mac = (reg.get("mac_address") or "").lower()
|
||||
if not client_mac:
|
||||
continue
|
||||
|
||||
# Resolve MAC against device_interfaces to find CPE device
|
||||
result = await session.execute(
|
||||
text("""
|
||||
SELECT device_id FROM device_interfaces
|
||||
WHERE LOWER(mac_address) = :mac AND tenant_id = :tenant_id
|
||||
LIMIT 1
|
||||
"""),
|
||||
{"mac": client_mac, "tenant_id": tenant_id},
|
||||
)
|
||||
row = result.fetchone()
|
||||
|
||||
if not row:
|
||||
# Unresolved MAC -- stays in wireless_registrations for unknown client queries
|
||||
continue
|
||||
|
||||
cpe_device_id = str(row[0])
|
||||
seen_cpe_ids.append(cpe_device_id)
|
||||
|
||||
signal_strength = reg.get("signal_strength")
|
||||
|
||||
# Upsert wireless_link with state based on signal strength
|
||||
await session.execute(
|
||||
text("""
|
||||
INSERT INTO wireless_links
|
||||
(id, ap_device_id, cpe_device_id, tenant_id, interface, client_mac,
|
||||
signal_strength, tx_ccq, tx_rate, rx_rate, state, missed_polls,
|
||||
discovered_at, last_seen, updated_at)
|
||||
VALUES
|
||||
(gen_random_uuid(), :ap_device_id, :cpe_device_id, :tenant_id,
|
||||
:interface, :client_mac, :signal_strength, :tx_ccq, :tx_rate,
|
||||
:rx_rate,
|
||||
CASE WHEN :signal_strength::int IS NULL THEN 'active'
|
||||
WHEN :signal_strength::int < :degraded_threshold THEN 'degraded'
|
||||
ELSE 'active' END,
|
||||
0, NOW(), NOW(), NOW())
|
||||
ON CONFLICT (ap_device_id, cpe_device_id) DO UPDATE SET
|
||||
interface = EXCLUDED.interface,
|
||||
client_mac = EXCLUDED.client_mac,
|
||||
signal_strength = EXCLUDED.signal_strength,
|
||||
tx_ccq = EXCLUDED.tx_ccq,
|
||||
tx_rate = EXCLUDED.tx_rate,
|
||||
rx_rate = EXCLUDED.rx_rate,
|
||||
state = CASE WHEN EXCLUDED.signal_strength IS NULL THEN 'active'
|
||||
WHEN EXCLUDED.signal_strength < :degraded_threshold THEN 'degraded'
|
||||
ELSE 'active' END,
|
||||
missed_polls = 0,
|
||||
last_seen = NOW(),
|
||||
updated_at = NOW()
|
||||
"""),
|
||||
{
|
||||
"ap_device_id": device_id,
|
||||
"cpe_device_id": cpe_device_id,
|
||||
"tenant_id": tenant_id,
|
||||
"interface": reg.get("interface"),
|
||||
"client_mac": client_mac,
|
||||
"signal_strength": signal_strength,
|
||||
"tx_ccq": reg.get("tx_ccq"),
|
||||
"tx_rate": reg.get("tx_rate"),
|
||||
"rx_rate": reg.get("rx_rate"),
|
||||
"degraded_threshold": DEGRADED_SIGNAL_THRESHOLD,
|
||||
},
|
||||
)
|
||||
|
||||
# Increment missed_polls for links from this AP NOT seen in this batch
|
||||
if seen_cpe_ids:
|
||||
await session.execute(
|
||||
text("""
|
||||
UPDATE wireless_links
|
||||
SET missed_polls = missed_polls + 1,
|
||||
state = CASE
|
||||
WHEN missed_polls + 1 >= :miss_threshold THEN 'down'
|
||||
ELSE state
|
||||
END,
|
||||
updated_at = NOW()
|
||||
WHERE ap_device_id = :ap_device_id
|
||||
AND tenant_id = :tenant_id
|
||||
AND cpe_device_id NOT IN (
|
||||
SELECT unnest(:seen_cpe_ids::uuid[])
|
||||
)
|
||||
AND state NOT IN ('down', 'stale')
|
||||
"""),
|
||||
{
|
||||
"ap_device_id": device_id,
|
||||
"tenant_id": tenant_id,
|
||||
"seen_cpe_ids": seen_cpe_ids,
|
||||
"miss_threshold": CONSECUTIVE_MISS_THRESHOLD,
|
||||
},
|
||||
)
|
||||
else:
|
||||
# No CPEs resolved -- increment all links for this AP
|
||||
await session.execute(
|
||||
text("""
|
||||
UPDATE wireless_links
|
||||
SET missed_polls = missed_polls + 1,
|
||||
state = CASE
|
||||
WHEN missed_polls + 1 >= :miss_threshold THEN 'down'
|
||||
ELSE state
|
||||
END,
|
||||
updated_at = NOW()
|
||||
WHERE ap_device_id = :ap_device_id
|
||||
AND tenant_id = :tenant_id
|
||||
AND state NOT IN ('down', 'stale')
|
||||
"""),
|
||||
{
|
||||
"ap_device_id": device_id,
|
||||
"tenant_id": tenant_id,
|
||||
"miss_threshold": CONSECUTIVE_MISS_THRESHOLD,
|
||||
},
|
||||
)
|
||||
|
||||
# Mark stale: any links in 'down' state where last_seen > STALE_HOURS ago
|
||||
await session.execute(
|
||||
text("""
|
||||
UPDATE wireless_links
|
||||
SET state = 'stale', updated_at = NOW()
|
||||
WHERE ap_device_id = :ap_device_id
|
||||
AND tenant_id = :tenant_id
|
||||
AND state = 'down'
|
||||
AND last_seen < NOW() - INTERVAL ':stale_hours hours'
|
||||
""".replace(":stale_hours", str(STALE_HOURS))),
|
||||
{
|
||||
"ap_device_id": device_id,
|
||||
"tenant_id": tenant_id,
|
||||
},
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
|
||||
logger.debug(
|
||||
"wireless.registrations link discovery processed",
|
||||
extra={
|
||||
"device_id": device_id,
|
||||
"registrations": len(registrations),
|
||||
"resolved_cpes": len(seen_cpe_ids),
|
||||
},
|
||||
)
|
||||
await msg.ack()
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"Failed to process wireless.registrations for link discovery: %s",
|
||||
exc,
|
||||
exc_info=True,
|
||||
)
|
||||
try:
|
||||
await msg.nak()
|
||||
except Exception:
|
||||
pass # If NAK also fails, NATS will redeliver after ack_wait
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# SUBSCRIPTION SETUP
|
||||
# =============================================================================
|
||||
|
||||
|
||||
async def _subscribe_with_retry(js: JetStreamContext) -> None:
|
||||
"""Subscribe to wireless.registrations.> with durable consumer, retrying if stream not ready."""
|
||||
max_attempts = 6 # ~30 seconds at 5s intervals
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
await js.subscribe(
|
||||
"wireless.registrations.>",
|
||||
cb=on_wireless_registration_for_links,
|
||||
durable="api-link-discovery-consumer",
|
||||
stream="WIRELESS_REGISTRATIONS",
|
||||
)
|
||||
logger.info(
|
||||
"NATS: subscribed to wireless.registrations.> "
|
||||
"(durable: api-link-discovery-consumer)"
|
||||
)
|
||||
return
|
||||
except Exception as exc:
|
||||
if attempt < max_attempts:
|
||||
logger.warning(
|
||||
"NATS: stream WIRELESS_REGISTRATIONS not ready (attempt %d/%d): %s "
|
||||
"-- retrying in 5s",
|
||||
attempt,
|
||||
max_attempts,
|
||||
exc,
|
||||
)
|
||||
await asyncio.sleep(5)
|
||||
else:
|
||||
logger.warning(
|
||||
"NATS: giving up on wireless.registrations.> link discovery "
|
||||
"after %d attempts: %s -- API will run without link discovery",
|
||||
max_attempts,
|
||||
exc,
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
async def start_link_discovery_subscriber() -> Optional[NATSClient]:
|
||||
"""Connect to NATS and start the wireless.registrations.> link discovery subscription.
|
||||
|
||||
Returns the NATS connection (must be passed to stop_link_discovery_subscriber
|
||||
on shutdown).
|
||||
"""
|
||||
global _link_discovery_client
|
||||
|
||||
logger.info("NATS link discovery: connecting to %s", settings.NATS_URL)
|
||||
|
||||
nc = await nats.connect(
|
||||
settings.NATS_URL,
|
||||
max_reconnect_attempts=-1,
|
||||
reconnect_time_wait=2,
|
||||
error_cb=_on_error,
|
||||
reconnected_cb=_on_reconnected,
|
||||
disconnected_cb=_on_disconnected,
|
||||
)
|
||||
|
||||
logger.info("NATS link discovery: connected to %s", settings.NATS_URL)
|
||||
|
||||
js = nc.jetstream()
|
||||
await _subscribe_with_retry(js)
|
||||
|
||||
_link_discovery_client = nc
|
||||
return nc
|
||||
|
||||
|
||||
async def stop_link_discovery_subscriber(nc: Optional[NATSClient]) -> None:
|
||||
"""Drain and close the link discovery NATS connection gracefully."""
|
||||
if nc is None:
|
||||
return
|
||||
try:
|
||||
logger.info("NATS link discovery: draining connection...")
|
||||
await nc.drain()
|
||||
logger.info("NATS link discovery: connection closed")
|
||||
except Exception as exc:
|
||||
logger.warning("NATS link discovery: error during drain: %s", exc)
|
||||
try:
|
||||
await nc.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def _on_error(exc: Exception) -> None:
|
||||
logger.error("NATS link discovery error: %s", exc)
|
||||
|
||||
|
||||
async def _on_reconnected() -> None:
|
||||
logger.info("NATS link discovery: reconnected")
|
||||
|
||||
|
||||
async def _on_disconnected() -> None:
|
||||
logger.warning("NATS link discovery: disconnected")
|
||||
Reference in New Issue
Block a user