200 lines
6.6 KiB
Python
200 lines
6.6 KiB
Python
"""NATS JetStream subscriber for device interface data.
|
|
|
|
Subscribes to device.interfaces.> on the DEVICE_EVENTS stream and upserts
|
|
per-interface rows into the device_interfaces table for MAC-to-device
|
|
resolution during link discovery.
|
|
|
|
Uses AdminAsyncSessionLocal (superuser bypass RLS) since interface data
|
|
arrives from the Go poller without tenant context in the DB session.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import nats
|
|
from nats.js import JetStreamContext
|
|
from nats.aio.client import Client as NATSClient
|
|
from sqlalchemy import text
|
|
|
|
from app.config import settings
|
|
from app.database import AdminAsyncSessionLocal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_interface_client: Optional[NATSClient] = None
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN MESSAGE HANDLER
|
|
# =============================================================================
|
|
|
|
|
|
async def on_device_interfaces(msg) -> None:
|
|
"""Handle a device.interfaces event published by the Go poller.
|
|
|
|
Each message contains interface metadata (name, MAC, type, running) for
|
|
a single device. Upserts into device_interfaces using ON CONFLICT so
|
|
existing interfaces are updated rather than duplicated.
|
|
|
|
On success, acknowledges the message. On error, NAKs so NATS can redeliver.
|
|
"""
|
|
try:
|
|
data = json.loads(msg.data)
|
|
device_id = data.get("device_id")
|
|
|
|
if not device_id:
|
|
logger.warning("device.interfaces event missing 'device_id' -- skipping")
|
|
await msg.ack()
|
|
return
|
|
|
|
tenant_id = data.get("tenant_id")
|
|
interfaces = data.get("interfaces")
|
|
|
|
if not interfaces:
|
|
await msg.ack()
|
|
return
|
|
|
|
async with AdminAsyncSessionLocal() as session:
|
|
for iface in interfaces:
|
|
mac_address = iface.get("mac_address", "")
|
|
if not mac_address:
|
|
continue # Skip interfaces without MAC (loopback, bridge without MAC)
|
|
|
|
await session.execute(
|
|
text("""
|
|
INSERT INTO device_interfaces
|
|
(id, device_id, tenant_id, name, mac_address, type, running, updated_at)
|
|
VALUES
|
|
(gen_random_uuid(), :device_id, :tenant_id, :name, :mac_address,
|
|
:type, :running, NOW())
|
|
ON CONFLICT (device_id, name)
|
|
DO UPDATE SET
|
|
mac_address = EXCLUDED.mac_address,
|
|
type = EXCLUDED.type,
|
|
running = EXCLUDED.running,
|
|
updated_at = NOW()
|
|
"""),
|
|
{
|
|
"device_id": device_id,
|
|
"tenant_id": tenant_id,
|
|
"name": iface.get("name", ""),
|
|
"mac_address": mac_address.lower(),
|
|
"type": iface.get("type", ""),
|
|
"running": iface.get("running", False),
|
|
},
|
|
)
|
|
|
|
await session.commit()
|
|
|
|
logger.debug(
|
|
"device.interfaces processed",
|
|
extra={"device_id": device_id, "count": len(interfaces)},
|
|
)
|
|
await msg.ack()
|
|
|
|
except Exception as exc:
|
|
logger.error(
|
|
"Failed to process device.interfaces event: %s",
|
|
exc,
|
|
exc_info=True,
|
|
)
|
|
try:
|
|
await msg.nak()
|
|
except Exception:
|
|
pass # If NAK also fails, NATS will redeliver after ack_wait
|
|
|
|
|
|
# =============================================================================
|
|
# SUBSCRIPTION SETUP
|
|
# =============================================================================
|
|
|
|
|
|
async def _subscribe_with_retry(js: JetStreamContext) -> None:
|
|
"""Subscribe to device.interfaces.> with durable consumer, retrying if stream not ready."""
|
|
max_attempts = 6 # ~30 seconds at 5s intervals
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
await js.subscribe(
|
|
"device.interfaces.>",
|
|
cb=on_device_interfaces,
|
|
durable="api-interface-consumer",
|
|
stream="DEVICE_EVENTS",
|
|
)
|
|
logger.info("NATS: subscribed to device.interfaces.> (durable: api-interface-consumer)")
|
|
return
|
|
except Exception as exc:
|
|
if attempt < max_attempts:
|
|
logger.warning(
|
|
"NATS: stream DEVICE_EVENTS not ready (attempt %d/%d): %s -- retrying in 5s",
|
|
attempt,
|
|
max_attempts,
|
|
exc,
|
|
)
|
|
await asyncio.sleep(5)
|
|
else:
|
|
logger.warning(
|
|
"NATS: giving up on device.interfaces.> after %d attempts: %s "
|
|
"-- API will run without interface ingestion",
|
|
max_attempts,
|
|
exc,
|
|
)
|
|
return
|
|
|
|
|
|
async def start_interface_subscriber() -> Optional[NATSClient]:
|
|
"""Connect to NATS and start the device.interfaces.> subscription.
|
|
|
|
Returns the NATS connection (must be passed to stop_interface_subscriber
|
|
on shutdown).
|
|
"""
|
|
global _interface_client
|
|
|
|
logger.info("NATS device interfaces: connecting to %s", settings.NATS_URL)
|
|
|
|
nc = await nats.connect(
|
|
settings.NATS_URL,
|
|
max_reconnect_attempts=-1,
|
|
reconnect_time_wait=2,
|
|
error_cb=_on_error,
|
|
reconnected_cb=_on_reconnected,
|
|
disconnected_cb=_on_disconnected,
|
|
)
|
|
|
|
logger.info("NATS device interfaces: connected to %s", settings.NATS_URL)
|
|
|
|
js = nc.jetstream()
|
|
await _subscribe_with_retry(js)
|
|
|
|
_interface_client = nc
|
|
return nc
|
|
|
|
|
|
async def stop_interface_subscriber(nc: Optional[NATSClient]) -> None:
|
|
"""Drain and close the interface NATS connection gracefully."""
|
|
if nc is None:
|
|
return
|
|
try:
|
|
logger.info("NATS device interfaces: draining connection...")
|
|
await nc.drain()
|
|
logger.info("NATS device interfaces: connection closed")
|
|
except Exception as exc:
|
|
logger.warning("NATS device interfaces: error during drain: %s", exc)
|
|
try:
|
|
await nc.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def _on_error(exc: Exception) -> None:
|
|
logger.error("NATS device interfaces error: %s", exc)
|
|
|
|
|
|
async def _on_reconnected() -> None:
|
|
logger.info("NATS device interfaces: reconnected")
|
|
|
|
|
|
async def _on_disconnected() -> None:
|
|
logger.warning("NATS device interfaces: disconnected")
|