feat(13-01): add DeviceInterfaceEvent publisher and wire into PollDevice

- DeviceInterfaceEvent type publishes to device.interfaces.{device_id}
- PublishDeviceInterfaces method follows existing publisher pattern
- DEVICE_EVENTS stream includes device.interfaces.> subject
- PollDevice collects interface info after traffic counters, before health
- Non-fatal errors with Prometheus metrics for publish success/failure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jason Staack
2026-03-19 06:05:55 -05:00
parent 6939584428
commit 397a33abef
2 changed files with 62 additions and 0 deletions

View File

@@ -132,6 +132,7 @@ func NewPublisher(natsURL string) (*Publisher, error) {
Subjects: []string{ Subjects: []string{
"device.status.>", "device.status.>",
"device.metrics.>", "device.metrics.>",
"device.interfaces.>",
"device.firmware.>", "device.firmware.>",
"device.credential_changed.>", "device.credential_changed.>",
"config.changed.>", "config.changed.>",
@@ -226,6 +227,44 @@ func (p *Publisher) PublishMetrics(ctx context.Context, event DeviceMetricsEvent
return nil return nil
} }
// DeviceInterfaceEvent is the payload published to the DEVICE_EVENTS NATS stream
// when interface identity data (name, MAC, type, running) is collected from a
// device. The link discovery system uses MAC addresses to resolve which managed
// device owns each end of a wireless link.
type DeviceInterfaceEvent struct {
DeviceID string `json:"device_id"`
TenantID string `json:"tenant_id"`
CollectedAt string `json:"collected_at"` // RFC3339
Interfaces []device.InterfaceInfo `json:"interfaces"`
}
// PublishDeviceInterfaces publishes interface identity data to the DEVICE_EVENTS
// NATS stream for link discovery.
//
// Events are published to "device.interfaces.{device_id}" so consumers can
// subscribe to all interface data or filter by device.
func (p *Publisher) PublishDeviceInterfaces(ctx context.Context, event DeviceInterfaceEvent) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshalling device interface event: %w", err)
}
subject := fmt.Sprintf("device.interfaces.%s", event.DeviceID)
_, err = p.js.Publish(ctx, subject, data)
if err != nil {
return fmt.Errorf("publishing to %s: %w", subject, err)
}
slog.Debug("published device interface event",
"device_id", event.DeviceID,
"interfaces", len(event.Interfaces),
"subject", subject,
)
return nil
}
// PublishWirelessRegistrations publishes per-client wireless registration data // PublishWirelessRegistrations publishes per-client wireless registration data
// and RF monitor stats to the WIRELESS_REGISTRATIONS NATS stream. // and RF monitor stats to the WIRELESS_REGISTRATIONS NATS stream.
// //

View File

@@ -326,6 +326,29 @@ func PollDevice(
observability.NATSPublishTotal.WithLabelValues("metrics", "success").Inc() observability.NATSPublishTotal.WithLabelValues("metrics", "success").Inc()
} }
// Interface identity data for link discovery (MAC addresses, types).
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
ifaceInfo, err := withTimeout[[]device.InterfaceInfo](cmdCtx, func() ([]device.InterfaceInfo, error) {
return device.CollectInterfaceInfo(client)
})
cmdCancel()
if err != nil {
slog.Warn("failed to collect interface info", "device_id", dev.ID, "error", err)
}
if len(ifaceInfo) > 0 {
if pubErr := pub.PublishDeviceInterfaces(ctx, bus.DeviceInterfaceEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
CollectedAt: collectedAt,
Interfaces: ifaceInfo,
}); pubErr != nil {
slog.Warn("failed to publish device interfaces", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("interfaces_info", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("interfaces_info", "success").Inc()
}
}
// System health (CPU, memory, disk, temperature). // System health (CPU, memory, disk, temperature).
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout) cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
health, err := withTimeout[device.HealthMetrics](cmdCtx, func() (device.HealthMetrics, error) { health, err := withTimeout[device.HealthMetrics](cmdCtx, func() (device.HealthMetrics, error) {