feat(18-01): add counter cache with Redis delta computation and SNMPMetricsEvent
- Implement computeCounterDelta for Counter32/Counter64 with wraparound handling
- Sanity threshold discards deltas > 90% of max value (device reset detection)
- CounterCache uses Redis MGET/MSET pipelining for efficient state persistence
- Counter keys use "snmp:counter:{device_id}:{oid}" format with 600s TTL
- Add SNMPMetricsEvent and SNMPMetricEntry structs to bus package
- Add PublishSNMPMetrics publishing to "device.metrics.snmp_custom.{device_id}"
- Full test coverage: 10 counter tests including miniredis integration
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -486,6 +486,54 @@ func (p *Publisher) PublishSessionEnd(ctx context.Context, event SessionEndEvent
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SNMPMetricsEvent is the payload published to NATS JetStream when custom SNMP
|
||||||
|
// metrics are collected from a device. The backend subscribes to
|
||||||
|
// "device.metrics.snmp_custom.>" to ingest these into the snmp_metrics hypertable.
|
||||||
|
type SNMPMetricsEvent struct {
|
||||||
|
DeviceID string `json:"device_id"`
|
||||||
|
TenantID string `json:"tenant_id"`
|
||||||
|
CollectedAt string `json:"collected_at"` // RFC3339
|
||||||
|
Type string `json:"type"` // always "snmp_custom"
|
||||||
|
Metrics []SNMPMetricEntry `json:"metrics"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SNMPMetricEntry is a single metric within an SNMPMetricsEvent. Numeric and
|
||||||
|
// text values are mutually exclusive; IndexValue is populated for table metrics.
|
||||||
|
type SNMPMetricEntry struct {
|
||||||
|
MetricName string `json:"metric_name"`
|
||||||
|
MetricGroup string `json:"metric_group"`
|
||||||
|
OID string `json:"oid"`
|
||||||
|
ValueNum *float64 `json:"value_numeric,omitempty"`
|
||||||
|
ValueText *string `json:"value_text,omitempty"`
|
||||||
|
IndexValue *string `json:"index_value,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishSNMPMetrics publishes custom SNMP metrics to NATS JetStream.
|
||||||
|
//
|
||||||
|
// Events are published to "device.metrics.snmp_custom.{device_id}" so the
|
||||||
|
// Python metrics_subscriber can ingest them into the snmp_metrics hypertable.
|
||||||
|
func (p *Publisher) PublishSNMPMetrics(ctx context.Context, event SNMPMetricsEvent) error {
|
||||||
|
data, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshalling SNMP metrics event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
subject := fmt.Sprintf("device.metrics.snmp_custom.%s", event.DeviceID)
|
||||||
|
|
||||||
|
_, err = p.js.Publish(ctx, subject, data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("publishing to %s: %w", subject, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Debug("published SNMP metrics event",
|
||||||
|
"device_id", event.DeviceID,
|
||||||
|
"metrics_count", len(event.Metrics),
|
||||||
|
"subject", subject,
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Conn returns the raw NATS connection for use by other components
|
// Conn returns the raw NATS connection for use by other components
|
||||||
// (e.g., CmdResponder for request-reply subscriptions).
|
// (e.g., CmdResponder for request-reply subscriptions).
|
||||||
func (p *Publisher) Conn() *nats.Conn {
|
func (p *Publisher) Conn() *nats.Conn {
|
||||||
|
|||||||
149
poller/internal/snmp/counter.go
Normal file
149
poller/internal/snmp/counter.go
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
package snmp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CounterInput represents a single counter value to compute a delta for.
|
||||||
|
type CounterInput struct {
|
||||||
|
Value uint64
|
||||||
|
Bits int // 32 or 64
|
||||||
|
}
|
||||||
|
|
||||||
|
// CounterState stores the previous value and timestamp for delta computation.
|
||||||
|
type CounterState struct {
|
||||||
|
Value uint64 `json:"value"`
|
||||||
|
Timestamp int64 `json:"ts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CounterResult holds the computed delta and rate for a single counter OID.
|
||||||
|
type CounterResult struct {
|
||||||
|
OID string
|
||||||
|
Delta uint64
|
||||||
|
Rate float64
|
||||||
|
ElapsedSeconds float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// CounterCache provides Redis-backed counter delta computation.
|
||||||
|
type CounterCache struct {
|
||||||
|
rdb redis.Cmdable
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCounterCache creates a CounterCache backed by the given Redis client.
|
||||||
|
func NewCounterCache(rdb redis.Cmdable) *CounterCache {
|
||||||
|
return &CounterCache{rdb: rdb}
|
||||||
|
}
|
||||||
|
|
||||||
|
// counterKey returns the Redis key for a device+OID counter state.
|
||||||
|
func counterKey(deviceID, oid string) string {
|
||||||
|
return "snmp:counter:" + deviceID + ":" + oid
|
||||||
|
}
|
||||||
|
|
||||||
|
// ComputeDeltas fetches previous counter states from Redis, computes deltas
|
||||||
|
// and rates, then stores the new values. First poll returns empty results.
|
||||||
|
func (c *CounterCache) ComputeDeltas(ctx context.Context, deviceID string, counters map[string]CounterInput) ([]CounterResult, error) {
|
||||||
|
if len(counters) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build keys and ordered OID list.
|
||||||
|
oids := make([]string, 0, len(counters))
|
||||||
|
keys := make([]string, 0, len(counters))
|
||||||
|
for oid := range counters {
|
||||||
|
oids = append(oids, oid)
|
||||||
|
keys = append(keys, counterKey(deviceID, oid))
|
||||||
|
}
|
||||||
|
|
||||||
|
// MGET all previous states in one round-trip.
|
||||||
|
vals, err := c.rdb.MGet(ctx, keys...).Result()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("redis MGET counter states: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().Unix()
|
||||||
|
var results []CounterResult
|
||||||
|
pipe := c.rdb.Pipeline()
|
||||||
|
|
||||||
|
for i, oid := range oids {
|
||||||
|
input := counters[oid]
|
||||||
|
newState := CounterState{Value: input.Value, Timestamp: now}
|
||||||
|
stateJSON, _ := json.Marshal(newState)
|
||||||
|
|
||||||
|
pipe.Set(ctx, keys[i], stateJSON, 600*time.Second)
|
||||||
|
|
||||||
|
// If no previous value, skip (first poll).
|
||||||
|
if vals[i] == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
raw, ok := vals[i].(string)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var prev CounterState
|
||||||
|
if err := json.Unmarshal([]byte(raw), &prev); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
delta, deltaOK := computeCounterDelta(prev.Value, input.Value, input.Bits)
|
||||||
|
if !deltaOK {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
elapsed := float64(now - prev.Timestamp)
|
||||||
|
if elapsed <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
results = append(results, CounterResult{
|
||||||
|
OID: oid,
|
||||||
|
Delta: delta,
|
||||||
|
Rate: float64(delta) / elapsed,
|
||||||
|
ElapsedSeconds: elapsed,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := pipe.Exec(ctx); err != nil {
|
||||||
|
return nil, fmt.Errorf("redis pipeline exec: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// computeCounterDelta computes the delta between two counter values,
|
||||||
|
// handling wraparound for 32-bit and 64-bit counters. Returns ok=false
|
||||||
|
// if the delta appears to be a device reset (> 90% of max value).
|
||||||
|
func computeCounterDelta(prev, curr uint64, counterBits int) (delta uint64, ok bool) {
|
||||||
|
var maxVal uint64
|
||||||
|
if counterBits == 32 {
|
||||||
|
maxVal = math.MaxUint32
|
||||||
|
} else {
|
||||||
|
maxVal = math.MaxUint64
|
||||||
|
}
|
||||||
|
|
||||||
|
if curr >= prev {
|
||||||
|
delta = curr - prev
|
||||||
|
} else {
|
||||||
|
// Wraparound: (max - prev) + curr + 1
|
||||||
|
if counterBits == 64 {
|
||||||
|
// For 64-bit, overflow in the addition is the wraparound itself.
|
||||||
|
delta = (maxVal - prev) + curr + 1
|
||||||
|
} else {
|
||||||
|
delta = (maxVal - prev) + curr + 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity check: if delta > 90% of max, likely a device reset.
|
||||||
|
threshold := maxVal / 10 * 9
|
||||||
|
if delta > threshold {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return delta, true
|
||||||
|
}
|
||||||
140
poller/internal/snmp/counter_test.go
Normal file
140
poller/internal/snmp/counter_test.go
Normal file
@@ -0,0 +1,140 @@
|
|||||||
|
package snmp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alicebob/miniredis/v2"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// --- computeCounterDelta unit tests ---
|
||||||
|
|
||||||
|
func TestCounterDelta_NormalIncrement32(t *testing.T) {
|
||||||
|
delta, ok := computeCounterDelta(100, 200, 32)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, uint64(100), delta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCounterDelta_Counter32Wraparound(t *testing.T) {
|
||||||
|
// 4294967290 -> 10 wraps around MaxUint32 (4294967295)
|
||||||
|
// delta = (4294967295 - 4294967290) + 10 + 1 = 16
|
||||||
|
delta, ok := computeCounterDelta(4294967290, 10, 32)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, uint64(16), delta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCounterDelta_Counter32Reset(t *testing.T) {
|
||||||
|
// 100 -> 50: curr < prev, wrap delta would be huge (> 90% of MaxUint32)
|
||||||
|
// This indicates a device reset, not a legitimate wrap.
|
||||||
|
_, ok := computeCounterDelta(100, 50, 32)
|
||||||
|
assert.False(t, ok, "should discard: delta > 90%% of MaxUint32 indicates device reset")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCounterDelta_Counter64NormalIncrement(t *testing.T) {
|
||||||
|
delta, ok := computeCounterDelta(1000000, 2000000, 64)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, uint64(1000000), delta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCounterDelta_Counter64Reset(t *testing.T) {
|
||||||
|
// prev=1000000000000 -> curr=50: wrap delta = (MaxUint64 - 1e12) + 50 + 1
|
||||||
|
// which is > 90% of MaxUint64, indicating a device reset (not a wrap).
|
||||||
|
_, ok := computeCounterDelta(1000000000000, 50, 64)
|
||||||
|
assert.False(t, ok, "should discard: moderate prev to tiny curr on 64-bit indicates device reset")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCounterDelta_ZeroDelta(t *testing.T) {
|
||||||
|
delta, ok := computeCounterDelta(500, 500, 32)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, uint64(0), delta)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- CounterCache integration tests ---
|
||||||
|
|
||||||
|
func TestCounterCache_FirstPollReturnsEmpty(t *testing.T) {
|
||||||
|
mr := miniredis.RunT(t)
|
||||||
|
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||||
|
defer rdb.Close()
|
||||||
|
|
||||||
|
cc := NewCounterCache(rdb)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
counters := map[string]CounterInput{
|
||||||
|
".1.3.6.1.2.1.2.2.1.10.1": {Value: 1000, Bits: 32},
|
||||||
|
".1.3.6.1.2.1.2.2.1.16.1": {Value: 500, Bits: 32},
|
||||||
|
}
|
||||||
|
|
||||||
|
results, err := cc.ComputeDeltas(ctx, "dev-001", counters)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Empty(t, results, "first poll should produce no results (no previous values)")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCounterCache_SecondPollComputesRate(t *testing.T) {
|
||||||
|
mr := miniredis.RunT(t)
|
||||||
|
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||||
|
defer rdb.Close()
|
||||||
|
|
||||||
|
cc := NewCounterCache(rdb)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// First poll: seed the cache
|
||||||
|
counters1 := map[string]CounterInput{
|
||||||
|
".1.3.6.1.2.1.2.2.1.10.1": {Value: 1000, Bits: 32},
|
||||||
|
}
|
||||||
|
_, err := cc.ComputeDeltas(ctx, "dev-001", counters1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Advance time in miniredis by 10 seconds
|
||||||
|
mr.FastForward(10 * time.Second)
|
||||||
|
|
||||||
|
// Manually adjust the stored timestamp to be 10 seconds ago.
|
||||||
|
// We do this because computeDeltas uses time.Now() internally,
|
||||||
|
// and miniredis FastForward doesn't affect Go's time.Now().
|
||||||
|
key := counterKey("dev-001", ".1.3.6.1.2.1.2.2.1.10.1")
|
||||||
|
nowUnix := time.Now().Unix()
|
||||||
|
rdb.Set(ctx, key, fmt.Sprintf(`{"value":1000,"ts":%d}`, nowUnix-10), 600*time.Second)
|
||||||
|
|
||||||
|
// Second poll: should compute delta and rate
|
||||||
|
counters2 := map[string]CounterInput{
|
||||||
|
".1.3.6.1.2.1.2.2.1.10.1": {Value: 2000, Bits: 32},
|
||||||
|
}
|
||||||
|
results, err := cc.ComputeDeltas(ctx, "dev-001", counters2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, results, 1)
|
||||||
|
|
||||||
|
assert.Equal(t, ".1.3.6.1.2.1.2.2.1.10.1", results[0].OID)
|
||||||
|
assert.Equal(t, uint64(1000), results[0].Delta)
|
||||||
|
assert.InDelta(t, 100.0, results[0].Rate, 1.0, "rate should be ~100 (1000 delta / 10s)")
|
||||||
|
assert.InDelta(t, 10.0, results[0].ElapsedSeconds, 1.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCounterCache_RedisKeyFormat(t *testing.T) {
|
||||||
|
key := counterKey("device-abc", ".1.3.6.1.2.1.2.2.1.10.1")
|
||||||
|
assert.Equal(t, "snmp:counter:device-abc:.1.3.6.1.2.1.2.2.1.10.1", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCounterCache_StoresWithTTL(t *testing.T) {
|
||||||
|
mr := miniredis.RunT(t)
|
||||||
|
rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||||
|
defer rdb.Close()
|
||||||
|
|
||||||
|
cc := NewCounterCache(rdb)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
counters := map[string]CounterInput{
|
||||||
|
".1.3.6.1.2.1.2.2.1.10.1": {Value: 1000, Bits: 32},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := cc.ComputeDeltas(ctx, "dev-001", counters)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify the key exists in Redis with a TTL
|
||||||
|
key := counterKey("dev-001", ".1.3.6.1.2.1.2.2.1.10.1")
|
||||||
|
ttl := mr.TTL(key)
|
||||||
|
assert.True(t, ttl > 0 && ttl <= 600*time.Second, "TTL should be set to ~600s, got %v", ttl)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user