From 390df0531ddc497064867e7f8b5db355bb05c204 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Sat, 21 Mar 2026 18:51:02 -0500 Subject: [PATCH] feat(17-02): add snmp_custom handler and NAK safety net to metrics subscriber - Add _insert_snmp_custom_metrics handler for custom SNMP OID events - Insert all 9 columns into snmp_metrics hypertable - Change unknown metric types from ACK to NAK for redelivery safety - Prevents permanent data loss during deployment ordering mismatches Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/services/metrics_subscriber.py | 51 +++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/backend/app/services/metrics_subscriber.py b/backend/app/services/metrics_subscriber.py index 791a130..26b68da 100644 --- a/backend/app/services/metrics_subscriber.py +++ b/backend/app/services/metrics_subscriber.py @@ -4,6 +4,7 @@ Subscribes to device.metrics.> and inserts into TimescaleDB hypertables: - interface_metrics — per-interface rx/tx byte counters - health_metrics — CPU, memory, disk, temperature per device - wireless_metrics — per-wireless-interface aggregated client stats + - snmp_metrics — custom SNMP OID metrics (UPS, vendor, tenant profiles) Also maintains denormalized last_cpu_load and last_memory_used_pct columns on the devices table for efficient fleet table display. @@ -178,6 +179,41 @@ async def _insert_wireless_metrics(session, data: dict) -> None: ) +async def _insert_snmp_custom_metrics(session, data: dict) -> None: + """Insert custom SNMP OID metrics into snmp_metrics hypertable.""" + metrics = data.get("metrics") + if not metrics: + logger.warning("snmp_custom event missing 'metrics' field — skipping") + return + + device_id = data.get("device_id") + tenant_id = data.get("tenant_id") + collected_at = _parse_timestamp(data.get("collected_at")) + + for m in metrics: + await session.execute( + text(""" + INSERT INTO snmp_metrics + (time, device_id, tenant_id, metric_name, metric_group, + value_numeric, value_text, oid, index_value) + VALUES + (:time, :device_id, :tenant_id, :metric_name, :metric_group, + :value_numeric, :value_text, :oid, :index_value) + """), + { + "time": collected_at, + "device_id": device_id, + "tenant_id": tenant_id, + "metric_name": m.get("metric_name"), + "metric_group": m.get("metric_group"), + "value_numeric": m.get("value_numeric"), + "value_text": m.get("value_text"), + "oid": m.get("oid"), + "index_value": m.get("index_value"), + }, + ) + + # ============================================================================= # MAIN MESSAGE HANDLER # ============================================================================= @@ -187,10 +223,13 @@ async def on_device_metrics(msg) -> None: """Handle a device.metrics event published by the Go poller. Dispatches to the appropriate insert handler based on the 'type' field: - - "health" → _insert_health_metrics + update devices - - "interfaces" → _insert_interface_metrics - - "wireless" → _insert_wireless_metrics + - "health" → _insert_health_metrics + update devices + - "interfaces" → _insert_interface_metrics + - "wireless" → _insert_wireless_metrics + - "snmp_custom" → _insert_snmp_custom_metrics (custom SNMP OID data) + Unknown types are NAKed (not ACKed) so NATS can redeliver once the + subscriber is updated -- prevents permanent data loss during deployments. On success, acknowledges the message. On error, NAKs so NATS can redeliver. """ try: @@ -210,9 +249,11 @@ async def on_device_metrics(msg) -> None: await _insert_interface_metrics(session, data) elif metric_type == "wireless": await _insert_wireless_metrics(session, data) + elif metric_type == "snmp_custom": + await _insert_snmp_custom_metrics(session, data) else: - logger.warning("Unknown metric type '%s' — skipping", metric_type) - await msg.ack() + logger.warning("Unknown metric type '%s' — NAKing for redelivery", metric_type) + await msg.nak() return await session.commit()