Files
the-other-dude/poller/internal/poller/worker.go
Jason Staack 397a33abef feat(13-01): add DeviceInterfaceEvent publisher and wire into PollDevice
- DeviceInterfaceEvent type publishes to device.interfaces.{device_id}
- PublishDeviceInterfaces method follows existing publisher pattern
- DEVICE_EVENTS stream includes device.interfaces.> subject
- PollDevice collects interface info after traffic counters, before health
- Non-fatal errors with Prometheus metrics for publish success/failure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 06:05:55 -05:00

498 lines
19 KiB
Go

// Package poller implements the polling logic for individual devices.
package poller
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
"github.com/staack/the-other-dude/poller/internal/bus"
"github.com/staack/the-other-dude/poller/internal/device"
"github.com/staack/the-other-dude/poller/internal/observability"
"github.com/staack/the-other-dude/poller/internal/store"
"github.com/staack/the-other-dude/poller/internal/vault"
)
// ErrDeviceOffline is returned by PollDevice when a device cannot be reached.
// The scheduler uses this to drive the circuit breaker — consecutive offline
// events trigger exponential backoff without logging as a hard error.
var ErrDeviceOffline = errors.New("device offline")
// redisClientForFirmware is a module-level Redis client reference used
// for firmware check rate limiting. Set by the scheduler before starting polls.
var redisClientForFirmware *redis.Client
// SetRedisClient sets the Redis client used for firmware rate limiting.
func SetRedisClient(c *redis.Client) {
redisClientForFirmware = c
}
// withTimeout runs fn in a goroutine and returns its result, or a timeout error
// if ctx expires first. This wraps RouterOS API calls that don't accept a context
// parameter, enforcing per-command timeouts to prevent indefinite blocking.
func withTimeout[T any](ctx context.Context, fn func() (T, error)) (T, error) {
type result struct {
val T
err error
}
ch := make(chan result, 1)
go func() {
v, e := fn()
ch <- result{v, e}
}()
select {
case r := <-ch:
return r.val, r.err
case <-ctx.Done():
var zero T
return zero, fmt.Errorf("command timed out: %w", ctx.Err())
}
}
// PollDevice performs a single poll cycle for one device:
// 1. Acquire distributed Redis lock to prevent duplicate polls across pods.
// 2. Decrypt device credentials.
// 3. Attempt TLS connection to the RouterOS binary API.
// 4. On failure: publish offline event, return ErrDeviceOffline.
// 5. On success: run /system/resource/print, publish online event with metadata.
// 6. Collect interface, health, and wireless metrics; publish as separate events.
// 7. Release lock and close connection via deferred calls.
//
// lockTTL should be longer than the expected poll duration to prevent the lock
// from expiring while the poll is still in progress.
//
// cmdTimeout is the per-command timeout for individual RouterOS API calls.
func PollDevice(
ctx context.Context,
dev store.Device,
locker *redislock.Client,
pub *bus.Publisher,
credentialCache *vault.CredentialCache,
connTimeout time.Duration,
cmdTimeout time.Duration,
lockTTL time.Duration,
) error {
startTime := time.Now()
pollStatus := "success"
lockKey := fmt.Sprintf("poll:device:%s", dev.ID)
// Acquire per-device lock. If another pod already holds the lock, skip this cycle.
lock, err := locker.Obtain(ctx, lockKey, lockTTL, nil)
if err == redislock.ErrNotObtained {
slog.Debug("skipping poll — lock held by another pod", "device_id", dev.ID)
observability.PollTotal.WithLabelValues("skipped").Inc()
observability.RedisLockTotal.WithLabelValues("not_obtained").Inc()
return nil
}
if err != nil {
observability.RedisLockTotal.WithLabelValues("error").Inc()
return fmt.Errorf("obtaining Redis lock for device %s: %w", dev.ID, err)
}
observability.RedisLockTotal.WithLabelValues("obtained").Inc()
defer func() {
if releaseErr := lock.Release(ctx); releaseErr != nil && releaseErr != redislock.ErrLockNotHeld {
slog.Warn("failed to release Redis lock", "device_id", dev.ID, "error", releaseErr)
}
}()
// Deferred metric recording — captures poll duration and status at exit.
defer func() {
observability.PollDuration.Observe(time.Since(startTime).Seconds())
observability.PollTotal.WithLabelValues(pollStatus).Inc()
}()
// Decrypt device credentials via credential cache (Transit preferred, legacy fallback).
username, password, err := credentialCache.GetCredentials(
dev.ID,
dev.TenantID,
dev.EncryptedCredentialsTransit,
dev.EncryptedCredentials,
)
if err != nil {
pollStatus = "error"
return fmt.Errorf("decrypting credentials for device %s: %w", dev.ID, err)
}
// Prepare CA cert PEM for TLS verification (only populated for portal_ca devices).
var caCertPEM []byte
if dev.CACertPEM != nil {
caCertPEM = []byte(*dev.CACertPEM)
}
// Attempt connection. On failure, publish offline event and return ErrDeviceOffline.
client, err := device.ConnectDevice(dev.IPAddress, dev.APISSLPort, dev.APIPort, username, password, connTimeout, caCertPEM, dev.TLSMode)
if err != nil {
slog.Info("device offline", "device_id", dev.ID, "ip", dev.IPAddress, "error", err)
observability.DeviceConnectionErrors.Inc()
offlineEvent := bus.DeviceStatusEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
Status: "offline",
LastSeen: time.Now().UTC().Format(time.RFC3339),
}
if pubErr := pub.PublishStatus(ctx, offlineEvent); pubErr != nil {
slog.Warn("failed to publish offline event", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("status", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("status", "success").Inc()
}
// Write device status to Redis so the backup scheduler can check
// if a device is online before attempting a backup.
if redisClientForFirmware != nil {
statusKey := fmt.Sprintf("device:%s:status", dev.ID)
if err := redisClientForFirmware.Set(ctx, statusKey, "offline", 10*time.Minute).Err(); err != nil {
slog.Warn("Redis SET failed", "key", statusKey, "error", err)
}
}
// Check for recent config push — trigger rollback or alert if device
// went offline shortly after a push (Redis key set by push_tracker).
if redisClientForFirmware != nil {
pushKey := fmt.Sprintf("push:recent:%s", dev.ID)
pushData, pushErr := redisClientForFirmware.Get(ctx, pushKey).Result()
if pushErr == nil && pushData != "" {
var pushInfo struct {
DeviceID string `json:"device_id"`
TenantID string `json:"tenant_id"`
PushType string `json:"push_type"`
PushOperationID string `json:"push_operation_id"`
PrePushCommitSHA string `json:"pre_push_commit_sha"`
}
if unmarshalErr := json.Unmarshal([]byte(pushData), &pushInfo); unmarshalErr == nil {
slog.Warn("device went offline after recent config push",
"device_id", dev.ID,
"push_type", pushInfo.PushType,
)
if pushInfo.PushType == "template" || pushInfo.PushType == "restore" {
// Auto-rollback for template/restore pushes
if rollbackErr := pub.PublishPushRollback(ctx, bus.PushRollbackEvent{
DeviceID: pushInfo.DeviceID,
TenantID: pushInfo.TenantID,
PushOperationID: pushInfo.PushOperationID,
PrePushCommitSHA: pushInfo.PrePushCommitSHA,
}); rollbackErr != nil {
slog.Error("failed to publish push rollback event", "device_id", dev.ID, "error", rollbackErr)
}
} else {
// Alert only for editor pushes (one-click rollback in UI)
if alertErr := pub.PublishPushAlert(ctx, bus.PushAlertEvent{
DeviceID: pushInfo.DeviceID,
TenantID: pushInfo.TenantID,
PushType: pushInfo.PushType,
}); alertErr != nil {
slog.Error("failed to publish push alert event", "device_id", dev.ID, "error", alertErr)
}
}
}
}
}
return ErrDeviceOffline
}
defer device.CloseDevice(client)
// Query device resources (version, uptime, CPU, memory) with per-command timeout.
cmdCtx, cmdCancel := context.WithTimeout(ctx, cmdTimeout)
info, err := withTimeout[device.DeviceInfo](cmdCtx, func() (device.DeviceInfo, error) {
return device.DetectVersion(client)
})
cmdCancel()
if err != nil {
slog.Warn("failed to detect version", "device_id", dev.ID, "error", err)
// Fall back to DB-cached version so we don't publish an empty version string.
if dev.RouterOSVersion != nil {
info.Version = *dev.RouterOSVersion
}
}
onlineEvent := bus.DeviceStatusEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
Status: "online",
RouterOSVersion: info.Version,
MajorVersion: info.MajorVersion,
BoardName: info.BoardName,
Architecture: info.Architecture,
Uptime: info.Uptime,
CPULoad: info.CPULoad,
FreeMemory: info.FreeMemory,
TotalMemory: info.TotalMemory,
SerialNumber: info.SerialNumber,
FirmwareVersion: info.FirmwareVersion,
LastSeen: time.Now().UTC().Format(time.RFC3339),
}
if pubErr := pub.PublishStatus(ctx, onlineEvent); pubErr != nil {
observability.NATSPublishTotal.WithLabelValues("status", "error").Inc()
pollStatus = "error"
return fmt.Errorf("publishing online event for device %s: %w", dev.ID, pubErr)
}
observability.NATSPublishTotal.WithLabelValues("status", "success").Inc()
// =========================================================================
// CONFIG CHANGE DETECTION
// Compare last-config-change from /system/resource/print against the
// previous value stored in Redis. If it changed (and we have a previous
// value — skip first poll), publish a ConfigChangedEvent so the backend
// can trigger an event-driven backup.
// =========================================================================
if info.LastConfigChange != "" && redisClientForFirmware != nil {
redisKey := fmt.Sprintf("device:%s:last_config_change", dev.ID)
prev, redisErr := redisClientForFirmware.Get(ctx, redisKey).Result()
if redisErr != nil && redisErr != redis.Nil {
slog.Warn("Redis GET last_config_change error", "device_id", dev.ID, "error", redisErr)
}
if prev != info.LastConfigChange {
if prev != "" { // Skip first poll — no previous value to compare
slog.Info("config change detected on device",
"device_id", dev.ID,
"old_timestamp", prev,
"new_timestamp", info.LastConfigChange,
)
if pubErr := pub.PublishConfigChanged(ctx, bus.ConfigChangedEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
OldTimestamp: prev,
NewTimestamp: info.LastConfigChange,
}); pubErr != nil {
slog.Warn("failed to publish config.changed", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("config_changed", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("config_changed", "success").Inc()
}
}
// Update Redis with current value (24h TTL)
if err := redisClientForFirmware.Set(ctx, redisKey, info.LastConfigChange, 24*time.Hour).Err(); err != nil {
slog.Warn("Redis SET failed", "key", redisKey, "error", err)
}
}
}
slog.Info("device polled successfully",
"device_id", dev.ID,
"ip", dev.IPAddress,
"status", "online",
"version", info.Version,
)
// Write device status to Redis so the backup scheduler can check
// if a device is online before attempting a backup.
if redisClientForFirmware != nil {
statusKey := fmt.Sprintf("device:%s:status", dev.ID)
if err := redisClientForFirmware.Set(ctx, statusKey, "online", 10*time.Minute).Err(); err != nil {
slog.Warn("Redis SET failed", "key", statusKey, "error", err)
}
}
// =========================================================================
// METRICS COLLECTION
// Errors are non-fatal — a metric collection failure should not fail the
// poll cycle. Publish failures are also non-fatal for the same reason.
// Each collection call is wrapped with a per-command timeout.
// =========================================================================
collectedAt := time.Now().UTC().Format(time.RFC3339)
// Interface traffic counters.
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
interfaces, err := withTimeout[[]device.InterfaceStats](cmdCtx, func() ([]device.InterfaceStats, error) {
return device.CollectInterfaces(client)
})
cmdCancel()
if err != nil {
slog.Warn("failed to collect interface metrics", "device_id", dev.ID, "error", err)
}
if pubErr := pub.PublishMetrics(ctx, bus.DeviceMetricsEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
CollectedAt: collectedAt,
Type: "interfaces",
Interfaces: interfaces,
}); pubErr != nil {
slog.Warn("failed to publish interface metrics", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("metrics", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("metrics", "success").Inc()
}
// Interface identity data for link discovery (MAC addresses, types).
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
ifaceInfo, err := withTimeout[[]device.InterfaceInfo](cmdCtx, func() ([]device.InterfaceInfo, error) {
return device.CollectInterfaceInfo(client)
})
cmdCancel()
if err != nil {
slog.Warn("failed to collect interface info", "device_id", dev.ID, "error", err)
}
if len(ifaceInfo) > 0 {
if pubErr := pub.PublishDeviceInterfaces(ctx, bus.DeviceInterfaceEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
CollectedAt: collectedAt,
Interfaces: ifaceInfo,
}); pubErr != nil {
slog.Warn("failed to publish device interfaces", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("interfaces_info", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("interfaces_info", "success").Inc()
}
}
// System health (CPU, memory, disk, temperature).
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
health, err := withTimeout[device.HealthMetrics](cmdCtx, func() (device.HealthMetrics, error) {
return device.CollectHealth(client, info)
})
cmdCancel()
if err != nil {
slog.Warn("failed to collect health metrics", "device_id", dev.ID, "error", err)
}
if pubErr := pub.PublishMetrics(ctx, bus.DeviceMetricsEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
CollectedAt: collectedAt,
Type: "health",
Health: &health,
}); pubErr != nil {
slog.Warn("failed to publish health metrics", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("metrics", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("metrics", "success").Inc()
}
// Wireless client stats (only publish if the device has wireless interfaces).
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
wireless, err := withTimeout[[]device.WirelessStats](cmdCtx, func() ([]device.WirelessStats, error) {
return device.CollectWireless(client, info.MajorVersion)
})
cmdCancel()
if err != nil {
slog.Warn("failed to collect wireless metrics", "device_id", dev.ID, "error", err)
}
if len(wireless) > 0 {
if pubErr := pub.PublishMetrics(ctx, bus.DeviceMetricsEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
CollectedAt: collectedAt,
Type: "wireless",
Wireless: wireless,
}); pubErr != nil {
slog.Warn("failed to publish wireless metrics", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("metrics", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("metrics", "success").Inc()
}
}
// Per-client wireless registrations (dedicated stream, not DEVICE_EVENTS).
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
registrations, err := withTimeout[[]device.RegistrationEntry](cmdCtx, func() ([]device.RegistrationEntry, error) {
return device.CollectRegistrations(client, info.MajorVersion)
})
cmdCancel()
if err != nil {
slog.Warn("failed to collect wireless registrations", "device_id", dev.ID, "error", err)
}
var rfStats []device.RFMonitorStats
if len(registrations) > 0 || len(wireless) > 0 {
// Only collect RF monitor if device has wireless interfaces.
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
rfStats, err = withTimeout[[]device.RFMonitorStats](cmdCtx, func() ([]device.RFMonitorStats, error) {
return device.CollectRFMonitor(client, info.MajorVersion)
})
cmdCancel()
if err != nil {
slog.Warn("failed to collect RF monitor stats", "device_id", dev.ID, "error", err)
}
}
if len(registrations) > 0 || len(rfStats) > 0 {
if pubErr := pub.PublishWirelessRegistrations(ctx, bus.WirelessRegistrationEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
CollectedAt: collectedAt,
Registrations: registrations,
RFStats: rfStats,
}); pubErr != nil {
slog.Warn("failed to publish wireless registrations", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("wireless_registrations", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("wireless_registrations", "success").Inc()
}
}
// =========================================================================
// FIRMWARE CHECK (rate-limited to once per day per device)
// Checks if a firmware update is available and publishes the result.
// Uses a Redis key with 24h TTL to ensure we don't hammer devices every 60s.
// =========================================================================
if redisClientForFirmware != nil {
fwCacheKey := fmt.Sprintf("firmware:checked:%s", dev.ID)
exists, _ := redisClientForFirmware.Exists(ctx, fwCacheKey).Result()
if exists == 0 {
cmdCtx, cmdCancel = context.WithTimeout(ctx, cmdTimeout)
fwInfo, fwErr := withTimeout[device.FirmwareInfo](cmdCtx, func() (device.FirmwareInfo, error) {
return device.CheckFirmwareUpdate(client)
})
cmdCancel()
if fwErr != nil {
slog.Warn("firmware check failed", "device_id", dev.ID, "error", fwErr)
// Set cooldown on failure too, but shorter (6h) so we retry sooner than success (24h).
// Prevents hammering devices that can't reach MikroTik update servers every poll cycle.
// Also set the main checked key to prevent the success path from re-checking.
if err := redisClientForFirmware.Set(ctx, fwCacheKey, "1", 6*time.Hour).Err(); err != nil {
slog.Warn("Redis SET failed", "key", fwCacheKey, "error", err)
}
} else {
fwEvent := bus.DeviceFirmwareEvent{
DeviceID: dev.ID,
TenantID: dev.TenantID,
InstalledVersion: fwInfo.InstalledVersion,
LatestVersion: fwInfo.LatestVersion,
Channel: fwInfo.Channel,
Status: fwInfo.Status,
Architecture: fwInfo.Architecture,
}
if pubErr := pub.PublishFirmware(ctx, fwEvent); pubErr != nil {
slog.Warn("failed to publish firmware event", "device_id", dev.ID, "error", pubErr)
observability.NATSPublishTotal.WithLabelValues("firmware", "error").Inc()
} else {
observability.NATSPublishTotal.WithLabelValues("firmware", "success").Inc()
// Set Redis key with 24h TTL — firmware checked for today.
// If the check succeeded but status is "check-failed",
// use shorter cooldown since the device couldn't reach update servers.
if fwInfo.Status == "check-failed" {
if err := redisClientForFirmware.Set(ctx, fwCacheKey, "1", 6*time.Hour).Err(); err != nil {
slog.Warn("Redis SET failed", "key", fwCacheKey, "error", err)
}
} else {
if err := redisClientForFirmware.Set(ctx, fwCacheKey, "1", 24*time.Hour).Err(); err != nil {
slog.Warn("Redis SET failed", "key", fwCacheKey, "error", err)
}
}
slog.Info("firmware check published",
"device_id", dev.ID,
"installed", fwInfo.InstalledVersion,
"latest", fwInfo.LatestVersion,
"channel", fwInfo.Channel,
)
}
}
}
}
return nil
}