fix(18): lift Redis distributed lock into scheduler for all collector types
The SNMP collector was missing the per-device Redis lock that prevents duplicate polls across pods. Rather than adding the lock to each collector individually, lift it into runDeviceLoop so ALL collector types (RouterOS and SNMP) are protected uniformly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,7 @@ package poller
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -240,8 +241,31 @@ func (s *Scheduler) runDeviceLoop(ctx context.Context, dev store.Device, ds *dev
|
|||||||
return // skip this device -- no collector available
|
return // skip this device -- no collector available
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Acquire per-device distributed lock. Prevents duplicate polls across pods.
|
||||||
|
// This protects ALL collector types (RouterOS and SNMP) uniformly.
|
||||||
|
lockKey := fmt.Sprintf("poll:device:%s", dev.ID)
|
||||||
|
lockTTL := s.pollInterval + s.connTimeout + 15*time.Second
|
||||||
|
lock, lockErr := s.locker.Obtain(ctx, lockKey, lockTTL, nil)
|
||||||
|
if lockErr == redislock.ErrNotObtained {
|
||||||
|
slog.Debug("skipping poll — lock held by another pod", "device_id", dev.ID)
|
||||||
|
observability.PollTotal.WithLabelValues("skipped").Inc()
|
||||||
|
observability.RedisLockTotal.WithLabelValues("not_obtained").Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if lockErr != nil {
|
||||||
|
slog.Error("failed to obtain Redis lock", "device_id", dev.ID, "error", lockErr)
|
||||||
|
observability.RedisLockTotal.WithLabelValues("error").Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
observability.RedisLockTotal.WithLabelValues("obtained").Inc()
|
||||||
|
|
||||||
err := collector.Collect(ctx, dev, s.publisher)
|
err := collector.Collect(ctx, dev, s.publisher)
|
||||||
|
|
||||||
|
// Release lock after collection completes.
|
||||||
|
if releaseErr := lock.Release(ctx); releaseErr != nil && releaseErr != redislock.ErrLockNotHeld {
|
||||||
|
slog.Warn("failed to release Redis lock", "device_id", dev.ID, "error", releaseErr)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ds.consecutiveFailures++
|
ds.consecutiveFailures++
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user