From cec645a109a0899d3644487c8016fb9840251378 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Sat, 21 Mar 2026 19:43:06 -0500 Subject: [PATCH] fix(18): lift Redis distributed lock into scheduler for all collector types The SNMP collector was missing the per-device Redis lock that prevents duplicate polls across pods. Rather than adding the lock to each collector individually, lift it into runDeviceLoop so ALL collector types (RouterOS and SNMP) are protected uniformly. Co-Authored-By: Claude Opus 4.6 (1M context) --- poller/internal/poller/scheduler.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/poller/internal/poller/scheduler.go b/poller/internal/poller/scheduler.go index 5356688..fa26a8c 100644 --- a/poller/internal/poller/scheduler.go +++ b/poller/internal/poller/scheduler.go @@ -2,6 +2,7 @@ package poller import ( "context" + "fmt" "log/slog" "sync" "time" @@ -240,8 +241,31 @@ func (s *Scheduler) runDeviceLoop(ctx context.Context, dev store.Device, ds *dev return // skip this device -- no collector available } + // Acquire per-device distributed lock. Prevents duplicate polls across pods. + // This protects ALL collector types (RouterOS and SNMP) uniformly. + lockKey := fmt.Sprintf("poll:device:%s", dev.ID) + lockTTL := s.pollInterval + s.connTimeout + 15*time.Second + lock, lockErr := s.locker.Obtain(ctx, lockKey, lockTTL, nil) + if lockErr == redislock.ErrNotObtained { + slog.Debug("skipping poll — lock held by another pod", "device_id", dev.ID) + observability.PollTotal.WithLabelValues("skipped").Inc() + observability.RedisLockTotal.WithLabelValues("not_obtained").Inc() + continue + } + if lockErr != nil { + slog.Error("failed to obtain Redis lock", "device_id", dev.ID, "error", lockErr) + observability.RedisLockTotal.WithLabelValues("error").Inc() + continue + } + observability.RedisLockTotal.WithLabelValues("obtained").Inc() + err := collector.Collect(ctx, dev, s.publisher) + // Release lock after collection completes. + if releaseErr := lock.Release(ctx); releaseErr != nil && releaseErr != redislock.ErrLockNotHeld { + slog.Warn("failed to release Redis lock", "device_id", dev.ID, "error", releaseErr) + } + if err != nil { ds.consecutiveFailures++