From caa33ca8d71956dc393c71c15fa3fb2d75a1379a Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Thu, 19 Mar 2026 05:38:14 -0500 Subject: [PATCH] feat(12-01): add RF monitor collector, WIRELESS_REGISTRATIONS stream, wire into poll cycle - RFMonitorStats struct for per-interface RF data (noise floor, channel width, TX power) - CollectRFMonitor with v6/v7 RouterOS version routing - WIRELESS_REGISTRATIONS NATS stream with 30-day retention (separate from DEVICE_EVENTS) - WirelessRegistrationEvent type and PublishWirelessRegistrations method - Poll cycle collects per-client registrations and RF stats, publishes combined event Co-Authored-By: Claude Opus 4.6 (1M context) --- poller/internal/bus/publisher.go | 60 ++++++++++++ poller/internal/device/rf_monitor.go | 107 ++++++++++++++++++++++ poller/internal/device/rf_monitor_test.go | 32 +++++++ poller/internal/poller/worker.go | 38 ++++++++ 4 files changed, 237 insertions(+) create mode 100644 poller/internal/device/rf_monitor.go create mode 100644 poller/internal/device/rf_monitor_test.go diff --git a/poller/internal/bus/publisher.go b/poller/internal/bus/publisher.go index e28f000..1c4039b 100644 --- a/poller/internal/bus/publisher.go +++ b/poller/internal/bus/publisher.go @@ -151,6 +151,26 @@ func NewPublisher(natsURL string) (*Publisher, error) { slog.Info("NATS JetStream DEVICE_EVENTS stream ready") + // Ensure the WIRELESS_REGISTRATIONS stream exists for per-client wireless data. + // Separate stream with 30-day retention (vs 24h for DEVICE_EVENTS) to support + // historical client analytics and hypertable ingestion. + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel2() + + _, err = js.CreateOrUpdateStream(ctx2, jetstream.StreamConfig{ + Name: "WIRELESS_REGISTRATIONS", + Subjects: []string{"wireless.registrations.>"}, + MaxAge: 30 * 24 * time.Hour, // 30-day retention + MaxBytes: 256 * 1024 * 1024, // 256MB cap + Discard: jetstream.DiscardOld, + }) + if err != nil { + nc.Close() + return nil, fmt.Errorf("ensuring WIRELESS_REGISTRATIONS stream: %w", err) + } + + slog.Info("NATS JetStream WIRELESS_REGISTRATIONS stream ready") + return &Publisher{nc: nc, js: js}, nil } @@ -206,6 +226,34 @@ func (p *Publisher) PublishMetrics(ctx context.Context, event DeviceMetricsEvent return nil } +// PublishWirelessRegistrations publishes per-client wireless registration data +// and RF monitor stats to the WIRELESS_REGISTRATIONS NATS stream. +// +// Events are published to "wireless.registrations.{device_id}" so consumers +// can subscribe to all wireless data or filter by device. +func (p *Publisher) PublishWirelessRegistrations(ctx context.Context, event WirelessRegistrationEvent) error { + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("marshalling wireless registration event: %w", err) + } + + subject := fmt.Sprintf("wireless.registrations.%s", event.DeviceID) + + _, err = p.js.Publish(ctx, subject, data) + if err != nil { + return fmt.Errorf("publishing to %s: %w", subject, err) + } + + slog.Debug("published wireless registration event", + "device_id", event.DeviceID, + "registrations", len(event.Registrations), + "rf_stats", len(event.RFStats), + "subject", subject, + ) + + return nil +} + // DeviceFirmwareEvent is the payload published to NATS JetStream when the poller // checks a device's firmware update status (rate-limited to once per day per device). type DeviceFirmwareEvent struct { @@ -350,6 +398,18 @@ func (p *Publisher) PublishPushAlert(ctx context.Context, event PushAlertEvent) return nil } +// WirelessRegistrationEvent is the payload published to the WIRELESS_REGISTRATIONS +// NATS stream when per-client wireless registration data and RF monitor stats +// are collected from a device. This is separate from DEVICE_EVENTS to allow +// independent retention (30 days vs 24 hours) and consumer scaling. +type WirelessRegistrationEvent struct { + DeviceID string `json:"device_id"` + TenantID string `json:"tenant_id"` + CollectedAt string `json:"collected_at"` // RFC3339 + Registrations []device.RegistrationEntry `json:"registrations"` + RFStats []device.RFMonitorStats `json:"rf_stats,omitempty"` +} + // SessionEndEvent is the payload published to NATS JetStream when an SSH // relay session ends. The backend subscribes to audit.session.end.> and // writes an audit log entry with the session duration. diff --git a/poller/internal/device/rf_monitor.go b/poller/internal/device/rf_monitor.go new file mode 100644 index 0000000..03fac95 --- /dev/null +++ b/poller/internal/device/rf_monitor.go @@ -0,0 +1,107 @@ +package device + +import ( + "log/slog" + "strconv" + + routeros "github.com/go-routeros/routeros/v3" +) + +// RFMonitorStats holds per-interface RF monitoring data collected from +// the RouterOS monitor command. These stats describe the RF environment +// rather than individual clients. +type RFMonitorStats struct { + Interface string `json:"interface"` + NoiseFloor int `json:"noise_floor"` // dBm, e.g. -105 + ChannelWidth string `json:"channel_width"` // e.g. "20MHz", "40MHz" + TxPower int `json:"tx_power"` // dBm, e.g. 24 + RegisteredClients int `json:"registered_clients"` // count from monitor +} + +// CollectRFMonitor queries the RouterOS device for per-interface RF statistics +// using the monitor command. +// +// Version routing: +// - majorVersion >= 7: runs /interface/wifi/print to list interfaces, then +// /interface/wifi/monitor for each interface. +// - majorVersion < 7: runs /interface/wireless/print to list interfaces, +// then /interface/wireless/monitor for each interface. +// +// Returns nil, nil when the device has no wireless interfaces. +func CollectRFMonitor(client *routeros.Client, majorVersion int) ([]RFMonitorStats, error) { + var printCmd, monitorCmd string + + if majorVersion >= 7 { + printCmd = "/interface/wifi/print" + monitorCmd = "/interface/wifi/monitor" + } else { + printCmd = "/interface/wireless/print" + monitorCmd = "/interface/wireless/monitor" + } + + // List wireless interface names. + listReply, err := client.Run(printCmd, "=.proplist=name") + if err != nil { + slog.Debug("device has no wireless interfaces for RF monitor", "error", err) + return nil, nil + } + + if len(listReply.Re) == 0 { + return nil, nil + } + + stats := make([]RFMonitorStats, 0, len(listReply.Re)) + for _, s := range listReply.Re { + ifaceName := s.Map["name"] + if ifaceName == "" { + continue + } + + // Run monitor command for this interface. + monReply, monErr := client.Run(monitorCmd, "=numbers="+ifaceName, "=once=") + if monErr != nil { + slog.Debug("RF monitor command failed for interface", + "interface", ifaceName, "command", monitorCmd, "error", monErr) + continue + } + + if len(monReply.Re) == 0 { + continue + } + + m := monReply.Re[0].Map + entry := RFMonitorStats{ + Interface: ifaceName, + } + + // Noise floor. + if nf, parseErr := strconv.Atoi(m["noise-floor"]); parseErr == nil { + entry.NoiseFloor = nf + } + + // Channel width: v6 uses "channel-width", v7 may use "channel" with width embedded. + if majorVersion >= 7 { + entry.ChannelWidth = m["channel"] + } else { + entry.ChannelWidth = m["channel-width"] + } + + // TX power. + if txp, parseErr := strconv.Atoi(m["tx-power"]); parseErr == nil { + entry.TxPower = txp + } + + // Registered clients count. + if rc, parseErr := strconv.Atoi(m["registered-clients"]); parseErr == nil { + entry.RegisteredClients = rc + } + + stats = append(stats, entry) + } + + if len(stats) == 0 { + return nil, nil + } + + return stats, nil +} diff --git a/poller/internal/device/rf_monitor_test.go b/poller/internal/device/rf_monitor_test.go new file mode 100644 index 0000000..0976e4a --- /dev/null +++ b/poller/internal/device/rf_monitor_test.go @@ -0,0 +1,32 @@ +package device + +import ( + "testing" +) + +func TestRFMonitorStatsFields(t *testing.T) { + // Compilation test: ensure RFMonitorStats has all required fields + // with correct types. + stats := RFMonitorStats{ + Interface: "wlan1", + NoiseFloor: -105, + ChannelWidth: "20MHz", + TxPower: 24, + RegisteredClients: 15, + } + if stats.Interface != "wlan1" { + t.Error("Interface field not set correctly") + } + if stats.NoiseFloor != -105 { + t.Error("NoiseFloor field not set correctly") + } + if stats.ChannelWidth != "20MHz" { + t.Error("ChannelWidth field not set correctly") + } + if stats.TxPower != 24 { + t.Error("TxPower field not set correctly") + } + if stats.RegisteredClients != 15 { + t.Error("RegisteredClients field not set correctly") + } +} diff --git a/poller/internal/poller/worker.go b/poller/internal/poller/worker.go index c2b057e..033d96d 100644 --- a/poller/internal/poller/worker.go +++ b/poller/internal/poller/worker.go @@ -372,6 +372,44 @@ func PollDevice( } } + // Per-client wireless registrations (dedicated stream, not DEVICE_EVENTS). + cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout) + registrations, err := withTimeout[[]device.RegistrationEntry](cmdCtx, func() ([]device.RegistrationEntry, error) { + return device.CollectRegistrations(client, info.MajorVersion) + }) + cmdCancel() + if err != nil { + slog.Warn("failed to collect wireless registrations", "device_id", dev.ID, "error", err) + } + + var rfStats []device.RFMonitorStats + if len(registrations) > 0 || len(wireless) > 0 { + // Only collect RF monitor if device has wireless interfaces. + cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout) + rfStats, err = withTimeout[[]device.RFMonitorStats](cmdCtx, func() ([]device.RFMonitorStats, error) { + return device.CollectRFMonitor(client, info.MajorVersion) + }) + cmdCancel() + if err != nil { + slog.Warn("failed to collect RF monitor stats", "device_id", dev.ID, "error", err) + } + } + + if len(registrations) > 0 || len(rfStats) > 0 { + if pubErr := pub.PublishWirelessRegistrations(ctx, bus.WirelessRegistrationEvent{ + DeviceID: dev.ID, + TenantID: dev.TenantID, + CollectedAt: collectedAt, + Registrations: registrations, + RFStats: rfStats, + }); pubErr != nil { + slog.Warn("failed to publish wireless registrations", "device_id", dev.ID, "error", pubErr) + observability.NATSPublishTotal.WithLabelValues("wireless_registrations", "error").Inc() + } else { + observability.NATSPublishTotal.WithLabelValues("wireless_registrations", "success").Inc() + } + } + // ========================================================================= // FIRMWARE CHECK (rate-limited to once per day per device) // Checks if a firmware update is available and publishes the result.