From 397a33abefc13b36a424f61190454da66d40143a Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Thu, 19 Mar 2026 06:05:55 -0500 Subject: [PATCH] feat(13-01): add DeviceInterfaceEvent publisher and wire into PollDevice - DeviceInterfaceEvent type publishes to device.interfaces.{device_id} - PublishDeviceInterfaces method follows existing publisher pattern - DEVICE_EVENTS stream includes device.interfaces.> subject - PollDevice collects interface info after traffic counters, before health - Non-fatal errors with Prometheus metrics for publish success/failure Co-Authored-By: Claude Opus 4.6 (1M context) --- poller/internal/bus/publisher.go | 39 ++++++++++++++++++++++++++++++++ poller/internal/poller/worker.go | 23 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/poller/internal/bus/publisher.go b/poller/internal/bus/publisher.go index 1c4039b..f818341 100644 --- a/poller/internal/bus/publisher.go +++ b/poller/internal/bus/publisher.go @@ -132,6 +132,7 @@ func NewPublisher(natsURL string) (*Publisher, error) { Subjects: []string{ "device.status.>", "device.metrics.>", + "device.interfaces.>", "device.firmware.>", "device.credential_changed.>", "config.changed.>", @@ -226,6 +227,44 @@ func (p *Publisher) PublishMetrics(ctx context.Context, event DeviceMetricsEvent return nil } +// DeviceInterfaceEvent is the payload published to the DEVICE_EVENTS NATS stream +// when interface identity data (name, MAC, type, running) is collected from a +// device. The link discovery system uses MAC addresses to resolve which managed +// device owns each end of a wireless link. +type DeviceInterfaceEvent struct { + DeviceID string `json:"device_id"` + TenantID string `json:"tenant_id"` + CollectedAt string `json:"collected_at"` // RFC3339 + Interfaces []device.InterfaceInfo `json:"interfaces"` +} + +// PublishDeviceInterfaces publishes interface identity data to the DEVICE_EVENTS +// NATS stream for link discovery. +// +// Events are published to "device.interfaces.{device_id}" so consumers can +// subscribe to all interface data or filter by device. +func (p *Publisher) PublishDeviceInterfaces(ctx context.Context, event DeviceInterfaceEvent) error { + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("marshalling device interface event: %w", err) + } + + subject := fmt.Sprintf("device.interfaces.%s", event.DeviceID) + + _, err = p.js.Publish(ctx, subject, data) + if err != nil { + return fmt.Errorf("publishing to %s: %w", subject, err) + } + + slog.Debug("published device interface event", + "device_id", event.DeviceID, + "interfaces", len(event.Interfaces), + "subject", subject, + ) + + return nil +} + // PublishWirelessRegistrations publishes per-client wireless registration data // and RF monitor stats to the WIRELESS_REGISTRATIONS NATS stream. // diff --git a/poller/internal/poller/worker.go b/poller/internal/poller/worker.go index 033d96d..be933ff 100644 --- a/poller/internal/poller/worker.go +++ b/poller/internal/poller/worker.go @@ -326,6 +326,29 @@ func PollDevice( observability.NATSPublishTotal.WithLabelValues("metrics", "success").Inc() } + // Interface identity data for link discovery (MAC addresses, types). + cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout) + ifaceInfo, err := withTimeout[[]device.InterfaceInfo](cmdCtx, func() ([]device.InterfaceInfo, error) { + return device.CollectInterfaceInfo(client) + }) + cmdCancel() + if err != nil { + slog.Warn("failed to collect interface info", "device_id", dev.ID, "error", err) + } + if len(ifaceInfo) > 0 { + if pubErr := pub.PublishDeviceInterfaces(ctx, bus.DeviceInterfaceEvent{ + DeviceID: dev.ID, + TenantID: dev.TenantID, + CollectedAt: collectedAt, + Interfaces: ifaceInfo, + }); pubErr != nil { + slog.Warn("failed to publish device interfaces", "device_id", dev.ID, "error", pubErr) + observability.NATSPublishTotal.WithLabelValues("interfaces_info", "error").Inc() + } else { + observability.NATSPublishTotal.WithLabelValues("interfaces_info", "success").Inc() + } + } + // System health (CPU, memory, disk, temperature). cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout) health, err := withTimeout[device.HealthMetrics](cmdCtx, func() (device.HealthMetrics, error) {