From 0697563a1344a1e35bbea037e6c0d261cb3ce105 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Sat, 21 Mar 2026 19:21:25 -0500 Subject: [PATCH] feat(18-01): add counter cache with Redis delta computation and SNMPMetricsEvent - Implement computeCounterDelta for Counter32/Counter64 with wraparound handling - Sanity threshold discards deltas > 90% of max value (device reset detection) - CounterCache uses Redis MGET/MSET pipelining for efficient state persistence - Counter keys use "snmp:counter:{device_id}:{oid}" format with 600s TTL - Add SNMPMetricsEvent and SNMPMetricEntry structs to bus package - Add PublishSNMPMetrics publishing to "device.metrics.snmp_custom.{device_id}" - Full test coverage: 10 counter tests including miniredis integration Co-Authored-By: Claude Opus 4.6 (1M context) --- poller/internal/bus/publisher.go | 48 +++++++++ poller/internal/snmp/counter.go | 149 +++++++++++++++++++++++++++ poller/internal/snmp/counter_test.go | 140 +++++++++++++++++++++++++ 3 files changed, 337 insertions(+) create mode 100644 poller/internal/snmp/counter.go create mode 100644 poller/internal/snmp/counter_test.go diff --git a/poller/internal/bus/publisher.go b/poller/internal/bus/publisher.go index f818341..2da8ecd 100644 --- a/poller/internal/bus/publisher.go +++ b/poller/internal/bus/publisher.go @@ -486,6 +486,54 @@ func (p *Publisher) PublishSessionEnd(ctx context.Context, event SessionEndEvent return nil } +// SNMPMetricsEvent is the payload published to NATS JetStream when custom SNMP +// metrics are collected from a device. The backend subscribes to +// "device.metrics.snmp_custom.>" to ingest these into the snmp_metrics hypertable. +type SNMPMetricsEvent struct { + DeviceID string `json:"device_id"` + TenantID string `json:"tenant_id"` + CollectedAt string `json:"collected_at"` // RFC3339 + Type string `json:"type"` // always "snmp_custom" + Metrics []SNMPMetricEntry `json:"metrics"` +} + +// SNMPMetricEntry is a single metric within an SNMPMetricsEvent. Numeric and +// text values are mutually exclusive; IndexValue is populated for table metrics. +type SNMPMetricEntry struct { + MetricName string `json:"metric_name"` + MetricGroup string `json:"metric_group"` + OID string `json:"oid"` + ValueNum *float64 `json:"value_numeric,omitempty"` + ValueText *string `json:"value_text,omitempty"` + IndexValue *string `json:"index_value,omitempty"` +} + +// PublishSNMPMetrics publishes custom SNMP metrics to NATS JetStream. +// +// Events are published to "device.metrics.snmp_custom.{device_id}" so the +// Python metrics_subscriber can ingest them into the snmp_metrics hypertable. +func (p *Publisher) PublishSNMPMetrics(ctx context.Context, event SNMPMetricsEvent) error { + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("marshalling SNMP metrics event: %w", err) + } + + subject := fmt.Sprintf("device.metrics.snmp_custom.%s", event.DeviceID) + + _, err = p.js.Publish(ctx, subject, data) + if err != nil { + return fmt.Errorf("publishing to %s: %w", subject, err) + } + + slog.Debug("published SNMP metrics event", + "device_id", event.DeviceID, + "metrics_count", len(event.Metrics), + "subject", subject, + ) + + return nil +} + // Conn returns the raw NATS connection for use by other components // (e.g., CmdResponder for request-reply subscriptions). func (p *Publisher) Conn() *nats.Conn { diff --git a/poller/internal/snmp/counter.go b/poller/internal/snmp/counter.go new file mode 100644 index 0000000..dc94806 --- /dev/null +++ b/poller/internal/snmp/counter.go @@ -0,0 +1,149 @@ +package snmp + +import ( + "context" + "encoding/json" + "fmt" + "math" + "time" + + "github.com/redis/go-redis/v9" +) + +// CounterInput represents a single counter value to compute a delta for. +type CounterInput struct { + Value uint64 + Bits int // 32 or 64 +} + +// CounterState stores the previous value and timestamp for delta computation. +type CounterState struct { + Value uint64 `json:"value"` + Timestamp int64 `json:"ts"` +} + +// CounterResult holds the computed delta and rate for a single counter OID. +type CounterResult struct { + OID string + Delta uint64 + Rate float64 + ElapsedSeconds float64 +} + +// CounterCache provides Redis-backed counter delta computation. +type CounterCache struct { + rdb redis.Cmdable +} + +// NewCounterCache creates a CounterCache backed by the given Redis client. +func NewCounterCache(rdb redis.Cmdable) *CounterCache { + return &CounterCache{rdb: rdb} +} + +// counterKey returns the Redis key for a device+OID counter state. +func counterKey(deviceID, oid string) string { + return "snmp:counter:" + deviceID + ":" + oid +} + +// ComputeDeltas fetches previous counter states from Redis, computes deltas +// and rates, then stores the new values. First poll returns empty results. +func (c *CounterCache) ComputeDeltas(ctx context.Context, deviceID string, counters map[string]CounterInput) ([]CounterResult, error) { + if len(counters) == 0 { + return nil, nil + } + + // Build keys and ordered OID list. + oids := make([]string, 0, len(counters)) + keys := make([]string, 0, len(counters)) + for oid := range counters { + oids = append(oids, oid) + keys = append(keys, counterKey(deviceID, oid)) + } + + // MGET all previous states in one round-trip. + vals, err := c.rdb.MGet(ctx, keys...).Result() + if err != nil { + return nil, fmt.Errorf("redis MGET counter states: %w", err) + } + + now := time.Now().Unix() + var results []CounterResult + pipe := c.rdb.Pipeline() + + for i, oid := range oids { + input := counters[oid] + newState := CounterState{Value: input.Value, Timestamp: now} + stateJSON, _ := json.Marshal(newState) + + pipe.Set(ctx, keys[i], stateJSON, 600*time.Second) + + // If no previous value, skip (first poll). + if vals[i] == nil { + continue + } + raw, ok := vals[i].(string) + if !ok { + continue + } + + var prev CounterState + if err := json.Unmarshal([]byte(raw), &prev); err != nil { + continue + } + + delta, deltaOK := computeCounterDelta(prev.Value, input.Value, input.Bits) + if !deltaOK { + continue + } + + elapsed := float64(now - prev.Timestamp) + if elapsed <= 0 { + continue + } + + results = append(results, CounterResult{ + OID: oid, + Delta: delta, + Rate: float64(delta) / elapsed, + ElapsedSeconds: elapsed, + }) + } + + if _, err := pipe.Exec(ctx); err != nil { + return nil, fmt.Errorf("redis pipeline exec: %w", err) + } + + return results, nil +} + +// computeCounterDelta computes the delta between two counter values, +// handling wraparound for 32-bit and 64-bit counters. Returns ok=false +// if the delta appears to be a device reset (> 90% of max value). +func computeCounterDelta(prev, curr uint64, counterBits int) (delta uint64, ok bool) { + var maxVal uint64 + if counterBits == 32 { + maxVal = math.MaxUint32 + } else { + maxVal = math.MaxUint64 + } + + if curr >= prev { + delta = curr - prev + } else { + // Wraparound: (max - prev) + curr + 1 + if counterBits == 64 { + // For 64-bit, overflow in the addition is the wraparound itself. + delta = (maxVal - prev) + curr + 1 + } else { + delta = (maxVal - prev) + curr + 1 + } + } + + // Sanity check: if delta > 90% of max, likely a device reset. + threshold := maxVal / 10 * 9 + if delta > threshold { + return 0, false + } + + return delta, true +} diff --git a/poller/internal/snmp/counter_test.go b/poller/internal/snmp/counter_test.go new file mode 100644 index 0000000..516266b --- /dev/null +++ b/poller/internal/snmp/counter_test.go @@ -0,0 +1,140 @@ +package snmp + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --- computeCounterDelta unit tests --- + +func TestCounterDelta_NormalIncrement32(t *testing.T) { + delta, ok := computeCounterDelta(100, 200, 32) + assert.True(t, ok) + assert.Equal(t, uint64(100), delta) +} + +func TestCounterDelta_Counter32Wraparound(t *testing.T) { + // 4294967290 -> 10 wraps around MaxUint32 (4294967295) + // delta = (4294967295 - 4294967290) + 10 + 1 = 16 + delta, ok := computeCounterDelta(4294967290, 10, 32) + assert.True(t, ok) + assert.Equal(t, uint64(16), delta) +} + +func TestCounterDelta_Counter32Reset(t *testing.T) { + // 100 -> 50: curr < prev, wrap delta would be huge (> 90% of MaxUint32) + // This indicates a device reset, not a legitimate wrap. + _, ok := computeCounterDelta(100, 50, 32) + assert.False(t, ok, "should discard: delta > 90%% of MaxUint32 indicates device reset") +} + +func TestCounterDelta_Counter64NormalIncrement(t *testing.T) { + delta, ok := computeCounterDelta(1000000, 2000000, 64) + assert.True(t, ok) + assert.Equal(t, uint64(1000000), delta) +} + +func TestCounterDelta_Counter64Reset(t *testing.T) { + // prev=1000000000000 -> curr=50: wrap delta = (MaxUint64 - 1e12) + 50 + 1 + // which is > 90% of MaxUint64, indicating a device reset (not a wrap). + _, ok := computeCounterDelta(1000000000000, 50, 64) + assert.False(t, ok, "should discard: moderate prev to tiny curr on 64-bit indicates device reset") +} + +func TestCounterDelta_ZeroDelta(t *testing.T) { + delta, ok := computeCounterDelta(500, 500, 32) + assert.True(t, ok) + assert.Equal(t, uint64(0), delta) +} + +// --- CounterCache integration tests --- + +func TestCounterCache_FirstPollReturnsEmpty(t *testing.T) { + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer rdb.Close() + + cc := NewCounterCache(rdb) + ctx := context.Background() + + counters := map[string]CounterInput{ + ".1.3.6.1.2.1.2.2.1.10.1": {Value: 1000, Bits: 32}, + ".1.3.6.1.2.1.2.2.1.16.1": {Value: 500, Bits: 32}, + } + + results, err := cc.ComputeDeltas(ctx, "dev-001", counters) + require.NoError(t, err) + assert.Empty(t, results, "first poll should produce no results (no previous values)") +} + +func TestCounterCache_SecondPollComputesRate(t *testing.T) { + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer rdb.Close() + + cc := NewCounterCache(rdb) + ctx := context.Background() + + // First poll: seed the cache + counters1 := map[string]CounterInput{ + ".1.3.6.1.2.1.2.2.1.10.1": {Value: 1000, Bits: 32}, + } + _, err := cc.ComputeDeltas(ctx, "dev-001", counters1) + require.NoError(t, err) + + // Advance time in miniredis by 10 seconds + mr.FastForward(10 * time.Second) + + // Manually adjust the stored timestamp to be 10 seconds ago. + // We do this because computeDeltas uses time.Now() internally, + // and miniredis FastForward doesn't affect Go's time.Now(). + key := counterKey("dev-001", ".1.3.6.1.2.1.2.2.1.10.1") + nowUnix := time.Now().Unix() + rdb.Set(ctx, key, fmt.Sprintf(`{"value":1000,"ts":%d}`, nowUnix-10), 600*time.Second) + + // Second poll: should compute delta and rate + counters2 := map[string]CounterInput{ + ".1.3.6.1.2.1.2.2.1.10.1": {Value: 2000, Bits: 32}, + } + results, err := cc.ComputeDeltas(ctx, "dev-001", counters2) + require.NoError(t, err) + require.Len(t, results, 1) + + assert.Equal(t, ".1.3.6.1.2.1.2.2.1.10.1", results[0].OID) + assert.Equal(t, uint64(1000), results[0].Delta) + assert.InDelta(t, 100.0, results[0].Rate, 1.0, "rate should be ~100 (1000 delta / 10s)") + assert.InDelta(t, 10.0, results[0].ElapsedSeconds, 1.0) +} + +func TestCounterCache_RedisKeyFormat(t *testing.T) { + key := counterKey("device-abc", ".1.3.6.1.2.1.2.2.1.10.1") + assert.Equal(t, "snmp:counter:device-abc:.1.3.6.1.2.1.2.2.1.10.1", key) +} + +func TestCounterCache_StoresWithTTL(t *testing.T) { + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer rdb.Close() + + cc := NewCounterCache(rdb) + ctx := context.Background() + + counters := map[string]CounterInput{ + ".1.3.6.1.2.1.2.2.1.10.1": {Value: 1000, Bits: 32}, + } + + _, err := cc.ComputeDeltas(ctx, "dev-001", counters) + require.NoError(t, err) + + // Verify the key exists in Redis with a TTL + key := counterKey("dev-001", ".1.3.6.1.2.1.2.2.1.10.1") + ttl := mr.TTL(key) + assert.True(t, ttl > 0 && ttl <= 600*time.Second, "TTL should be set to ~600s, got %v", ttl) +}