diff --git a/poller/internal/poller/scheduler.go b/poller/internal/poller/scheduler.go index 5356688..fa26a8c 100644 --- a/poller/internal/poller/scheduler.go +++ b/poller/internal/poller/scheduler.go @@ -2,6 +2,7 @@ package poller import ( "context" + "fmt" "log/slog" "sync" "time" @@ -240,8 +241,31 @@ func (s *Scheduler) runDeviceLoop(ctx context.Context, dev store.Device, ds *dev return // skip this device -- no collector available } + // Acquire per-device distributed lock. Prevents duplicate polls across pods. + // This protects ALL collector types (RouterOS and SNMP) uniformly. + lockKey := fmt.Sprintf("poll:device:%s", dev.ID) + lockTTL := s.pollInterval + s.connTimeout + 15*time.Second + lock, lockErr := s.locker.Obtain(ctx, lockKey, lockTTL, nil) + if lockErr == redislock.ErrNotObtained { + slog.Debug("skipping poll — lock held by another pod", "device_id", dev.ID) + observability.PollTotal.WithLabelValues("skipped").Inc() + observability.RedisLockTotal.WithLabelValues("not_obtained").Inc() + continue + } + if lockErr != nil { + slog.Error("failed to obtain Redis lock", "device_id", dev.ID, "error", lockErr) + observability.RedisLockTotal.WithLabelValues("error").Inc() + continue + } + observability.RedisLockTotal.WithLabelValues("obtained").Inc() + err := collector.Collect(ctx, dev, s.publisher) + // Release lock after collection completes. + if releaseErr := lock.Release(ctx); releaseErr != nil && releaseErr != redislock.ErrLockNotHeld { + slog.Warn("failed to release Redis lock", "device_id", dev.ID, "error", releaseErr) + } + if err != nil { ds.consecutiveFailures++