From ad75a19f5d00b1d1e4310b1da2ede127debfa305 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Sat, 21 Mar 2026 18:34:27 -0500 Subject: [PATCH] feat(16-04): update Scheduler to dispatch by device_type via collectors - Add collectors map[string]Collector field to Scheduler struct - Register RouterOSCollector for "routeros" inside NewScheduler - Replace direct PollDevice call with collector dispatch by dev.DeviceType - Default empty DeviceType to "routeros" for backward compatibility - Log error and exit device loop for unknown device types - Circuit breaker logic unchanged Co-Authored-By: Claude Opus 4.6 (1M context) --- poller/internal/poller/scheduler.go | 39 +++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/poller/internal/poller/scheduler.go b/poller/internal/poller/scheduler.go index 7b04323..38edaf0 100644 --- a/poller/internal/poller/scheduler.go +++ b/poller/internal/poller/scheduler.go @@ -45,6 +45,10 @@ type Scheduler struct { baseBackoff time.Duration maxBackoff time.Duration + // collectors maps device type name to its Collector implementation. + // "routeros" -> RouterOSCollector, "snmp" -> SNMPCollector (future). + collectors map[string]Collector + // activeDevices maps device ID to per-device state. mu sync.Mutex activeDevices map[string]*deviceState @@ -64,7 +68,10 @@ func NewScheduler( baseBackoff time.Duration, maxBackoff time.Duration, ) *Scheduler { - return &Scheduler{ + // lockTTL gives the poll cycle time to complete: interval + connection timeout + 15s margin. + lockTTL := pollInterval + connTimeout + 15*time.Second + + s := &Scheduler{ store: store, locker: locker, publisher: publisher, @@ -76,8 +83,14 @@ func NewScheduler( maxFailures: maxFailures, baseBackoff: baseBackoff, maxBackoff: maxBackoff, + collectors: make(map[string]Collector), activeDevices: make(map[string]*deviceState), } + + // Register built-in collectors. Future device types (SNMP) register here. + s.collectors["routeros"] = NewRouterOSCollector(locker, credentialCache, connTimeout, cmdTimeout, lockTTL) + + return s } // Run is the main scheduler loop. It: @@ -175,16 +188,13 @@ func (s *Scheduler) reconcileDevices(ctx context.Context, wg *sync.WaitGroup) er } // runDeviceLoop is the per-device polling loop. It ticks at pollInterval and -// calls PollDevice synchronously on each tick (not in a sub-goroutine, to avoid -// unbounded goroutine growth if polls are slow). +// dispatches to the appropriate Collector synchronously on each tick (not in a +// sub-goroutine, to avoid unbounded goroutine growth if polls are slow). // // Circuit breaker: when consecutive failures exceed maxFailures, the device enters // exponential backoff. Poll ticks during backoff are skipped. On success, the // circuit breaker resets. func (s *Scheduler) runDeviceLoop(ctx context.Context, dev store.Device, ds *deviceState) { - // lockTTL gives the poll cycle time to complete: interval + connection timeout + 15s margin. - lockTTL := s.pollInterval + s.connTimeout + 15*time.Second - ticker := time.NewTicker(s.pollInterval) defer ticker.Stop() @@ -208,7 +218,22 @@ func (s *Scheduler) runDeviceLoop(ctx context.Context, dev store.Device, ds *dev continue } - err := PollDevice(ctx, dev, s.locker, s.publisher, s.credentialCache, s.connTimeout, s.cmdTimeout, lockTTL) + // Look up collector for this device type. + deviceType := dev.DeviceType + if deviceType == "" { + deviceType = "routeros" // backward compat default + } + + collector, ok := s.collectors[deviceType] + if !ok { + slog.Error("no collector registered for device type", + "device_id", dev.ID, + "device_type", deviceType, + ) + return // skip this device -- no collector available + } + + err := collector.Collect(ctx, dev, s.publisher) if err != nil { ds.consecutiveFailures++