docs(02): create phase plan for poller config collection
Two plans covering SSH executor, config normalization, NATS publishing, backup scheduler, and main.go wiring for periodic RouterOS config backup. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
394
.planning/phases/02-poller-config-collection/02-02-PLAN.md
Normal file
394
.planning/phases/02-poller-config-collection/02-02-PLAN.md
Normal file
@@ -0,0 +1,394 @@
|
||||
---
|
||||
phase: 02-poller-config-collection
|
||||
plan: 02
|
||||
type: execute
|
||||
wave: 2
|
||||
depends_on: ["02-01"]
|
||||
files_modified:
|
||||
- poller/internal/poller/backup_scheduler.go
|
||||
- poller/internal/poller/backup_scheduler_test.go
|
||||
- poller/internal/poller/interfaces.go
|
||||
- poller/cmd/poller/main.go
|
||||
autonomous: true
|
||||
requirements: [COLL-01, COLL-03, COLL-05, COLL-06]
|
||||
|
||||
must_haves:
|
||||
truths:
|
||||
- "Poller runs /export show-sensitive via SSH on each online RouterOS device at a configurable interval (default 6h)"
|
||||
- "Poller publishes normalized config snapshot to NATS config.snapshot.create with device_id, tenant_id, sha256_hash, config_text"
|
||||
- "Unreachable devices log a warning and are retried on the next interval without blocking other devices"
|
||||
- "Backup interval is configurable via CONFIG_BACKUP_INTERVAL environment variable"
|
||||
- "First backup runs with randomized jitter (30-300s) after device discovery"
|
||||
- "Global concurrency is limited via CONFIG_BACKUP_MAX_CONCURRENT semaphore"
|
||||
- "Auth failures and host key mismatches block retries until resolved"
|
||||
artifacts:
|
||||
- path: "poller/internal/poller/backup_scheduler.go"
|
||||
provides: "BackupScheduler managing per-device backup goroutines with concurrency, retry, and NATS publishing"
|
||||
exports: ["BackupScheduler", "NewBackupScheduler"]
|
||||
min_lines: 200
|
||||
- path: "poller/internal/poller/backup_scheduler_test.go"
|
||||
provides: "Unit tests for backup scheduling, jitter, concurrency, error handling"
|
||||
- path: "poller/internal/poller/interfaces.go"
|
||||
provides: "SSHHostKeyUpdater interface for device store dependency"
|
||||
- path: "poller/cmd/poller/main.go"
|
||||
provides: "BackupScheduler initialization and lifecycle wiring"
|
||||
key_links:
|
||||
- from: "poller/internal/poller/backup_scheduler.go"
|
||||
to: "poller/internal/device/ssh_executor.go"
|
||||
via: "Calls device.RunCommand to execute /export show-sensitive"
|
||||
pattern: "device\\.RunCommand"
|
||||
- from: "poller/internal/poller/backup_scheduler.go"
|
||||
to: "poller/internal/device/normalize.go"
|
||||
via: "Calls device.NormalizeConfig and device.HashConfig on SSH output"
|
||||
pattern: "device\\.NormalizeConfig|device\\.HashConfig"
|
||||
- from: "poller/internal/poller/backup_scheduler.go"
|
||||
to: "poller/internal/bus/publisher.go"
|
||||
via: "Calls publisher.PublishConfigSnapshot with ConfigSnapshotEvent"
|
||||
pattern: "publisher\\.PublishConfigSnapshot|bus\\.ConfigSnapshotEvent"
|
||||
- from: "poller/internal/poller/backup_scheduler.go"
|
||||
to: "poller/internal/store/devices.go"
|
||||
via: "Calls store.UpdateSSHHostKey for TOFU fingerprint storage"
|
||||
pattern: "UpdateSSHHostKey"
|
||||
- from: "poller/cmd/poller/main.go"
|
||||
to: "poller/internal/poller/backup_scheduler.go"
|
||||
via: "Creates and starts BackupScheduler in main goroutine lifecycle"
|
||||
pattern: "NewBackupScheduler|backupScheduler\\.Run"
|
||||
---
|
||||
|
||||
<objective>
|
||||
Build the backup scheduler that orchestrates periodic SSH config collection from RouterOS devices, normalizes output, and publishes to NATS. Wire it into the poller's main lifecycle.
|
||||
|
||||
Purpose: This is the core orchestration that ties together the SSH executor, normalizer, and NATS publisher from Plan 01 into a running backup collection system with proper scheduling, concurrency control, error handling, and retry logic.
|
||||
Output: BackupScheduler module fully integrated into the poller's main.go lifecycle.
|
||||
</objective>
|
||||
|
||||
<execution_context>
|
||||
@/Users/jasonstaack/.claude/get-shit-done/workflows/execute-plan.md
|
||||
@/Users/jasonstaack/.claude/get-shit-done/templates/summary.md
|
||||
</execution_context>
|
||||
|
||||
<context>
|
||||
@.planning/PROJECT.md
|
||||
@.planning/ROADMAP.md
|
||||
@.planning/STATE.md
|
||||
@.planning/phases/02-poller-config-collection/02-CONTEXT.md
|
||||
@.planning/phases/02-poller-config-collection/02-01-SUMMARY.md
|
||||
|
||||
@poller/internal/poller/scheduler.go
|
||||
@poller/internal/poller/worker.go
|
||||
@poller/internal/poller/interfaces.go
|
||||
@poller/cmd/poller/main.go
|
||||
@poller/internal/device/ssh_executor.go
|
||||
@poller/internal/device/normalize.go
|
||||
@poller/internal/bus/publisher.go
|
||||
@poller/internal/config/config.go
|
||||
@poller/internal/store/devices.go
|
||||
@poller/internal/observability/metrics.go
|
||||
|
||||
<interfaces>
|
||||
<!-- From Plan 01 outputs (executor and normalizer) -->
|
||||
|
||||
From poller/internal/device/ssh_executor.go (created in Plan 01):
|
||||
```go
|
||||
type SSHErrorKind string
|
||||
const (
|
||||
ErrAuthFailed SSHErrorKind = "auth_failed"
|
||||
ErrHostKeyMismatch SSHErrorKind = "host_key_mismatch"
|
||||
ErrTimeout SSHErrorKind = "timeout"
|
||||
ErrTruncatedOutput SSHErrorKind = "truncated_output"
|
||||
ErrConnectionRefused SSHErrorKind = "connection_refused"
|
||||
ErrUnknown SSHErrorKind = "unknown"
|
||||
)
|
||||
|
||||
type SSHError struct { Kind SSHErrorKind; Err error; Message string }
|
||||
type CommandResult struct { Stdout string; Stderr string; ExitCode int; Duration time.Duration }
|
||||
|
||||
func RunCommand(ctx context.Context, ip string, port int, username, password string, timeout time.Duration, knownFingerprint string, command string) (*CommandResult, string, error)
|
||||
```
|
||||
|
||||
From poller/internal/device/normalize.go (created in Plan 01):
|
||||
```go
|
||||
func NormalizeConfig(raw string) string
|
||||
func HashConfig(normalized string) string
|
||||
const NormalizationVersion = 1
|
||||
```
|
||||
|
||||
From poller/internal/bus/publisher.go (modified in Plan 01):
|
||||
```go
|
||||
type ConfigSnapshotEvent struct {
|
||||
DeviceID string `json:"device_id"`
|
||||
TenantID string `json:"tenant_id"`
|
||||
RouterOSVersion string `json:"routeros_version,omitempty"`
|
||||
CollectedAt string `json:"collected_at"`
|
||||
SHA256Hash string `json:"sha256_hash"`
|
||||
ConfigText string `json:"config_text"`
|
||||
NormalizationVersion int `json:"normalization_version"`
|
||||
}
|
||||
func (p *Publisher) PublishConfigSnapshot(ctx context.Context, event ConfigSnapshotEvent) error
|
||||
```
|
||||
|
||||
From poller/internal/store/devices.go (modified in Plan 01):
|
||||
```go
|
||||
type Device struct {
|
||||
// ... existing fields ...
|
||||
SSHPort int
|
||||
SSHHostKeyFingerprint *string
|
||||
}
|
||||
func (s *DeviceStore) UpdateSSHHostKey(ctx context.Context, deviceID string, fingerprint string) error
|
||||
```
|
||||
|
||||
From poller/internal/config/config.go (modified in Plan 01):
|
||||
```go
|
||||
type Config struct {
|
||||
// ... existing fields ...
|
||||
ConfigBackupIntervalSeconds int
|
||||
ConfigBackupMaxConcurrent int
|
||||
ConfigBackupCommandTimeoutSeconds int
|
||||
}
|
||||
```
|
||||
|
||||
From poller/internal/observability/metrics.go (modified in Plan 01):
|
||||
```go
|
||||
var ConfigBackupTotal *prometheus.CounterVec // labels: ["status"]
|
||||
var ConfigBackupDuration prometheus.Histogram
|
||||
var ConfigBackupActive prometheus.Gauge
|
||||
```
|
||||
|
||||
<!-- Existing patterns to follow -->
|
||||
|
||||
From poller/internal/poller/scheduler.go:
|
||||
```go
|
||||
type Scheduler struct { ... }
|
||||
func NewScheduler(...) *Scheduler
|
||||
func (s *Scheduler) Run(ctx context.Context) error
|
||||
func (s *Scheduler) reconcileDevices(ctx context.Context, wg *sync.WaitGroup) error
|
||||
func (s *Scheduler) runDeviceLoop(ctx context.Context, dev store.Device, ds *deviceState) // per-device goroutine with ticker
|
||||
```
|
||||
|
||||
From poller/internal/poller/interfaces.go:
|
||||
```go
|
||||
type DeviceFetcher interface {
|
||||
FetchDevices(ctx context.Context) ([]store.Device, error)
|
||||
}
|
||||
```
|
||||
</interfaces>
|
||||
</context>
|
||||
|
||||
<tasks>
|
||||
|
||||
<task type="auto" tdd="true">
|
||||
<name>Task 1: BackupScheduler with per-device goroutines, concurrency control, and retry logic</name>
|
||||
<files>
|
||||
poller/internal/poller/backup_scheduler.go,
|
||||
poller/internal/poller/backup_scheduler_test.go,
|
||||
poller/internal/poller/interfaces.go
|
||||
</files>
|
||||
<behavior>
|
||||
- Test jitter generation: randomJitter(30, 300) returns value in [30s, 300s] range
|
||||
- Test backoff sequence: given consecutive failures, backoff returns 5m, 15m, 1h, then caps at 1h
|
||||
- Test auth failure blocking: when last error is ErrAuthFailed, shouldRetry returns false
|
||||
- Test host key mismatch blocking: when last error is ErrHostKeyMismatch, shouldRetry returns false
|
||||
- Test online-only gating: backup is skipped for devices not currently marked online
|
||||
- Test concurrency semaphore: when semaphore is full, backup waits (does not drop)
|
||||
</behavior>
|
||||
<action>
|
||||
**1. Update interfaces.go:**
|
||||
Add `SSHHostKeyUpdater` interface (consumer-side, Go best practice):
|
||||
```go
|
||||
type SSHHostKeyUpdater interface {
|
||||
UpdateSSHHostKey(ctx context.Context, deviceID string, fingerprint string) error
|
||||
}
|
||||
```
|
||||
|
||||
**2. Create backup_scheduler.go:**
|
||||
|
||||
Define `backupDeviceState` struct tracking per-device backup state:
|
||||
- `cancel context.CancelFunc`
|
||||
- `lastAttemptAt time.Time`
|
||||
- `lastSuccessAt time.Time`
|
||||
- `lastStatus string` — "success", "error", "skipped_offline", "auth_blocked", "hostkey_blocked"
|
||||
- `lastError string`
|
||||
- `consecutiveFailures int`
|
||||
- `backoffUntil time.Time`
|
||||
- `lastErrorKind device.SSHErrorKind` — tracks whether error is auth/hostkey (blocks retry)
|
||||
|
||||
Define `BackupScheduler` struct:
|
||||
- `store DeviceFetcher` — reuse existing interface for FetchDevices
|
||||
- `hostKeyStore SSHHostKeyUpdater` — for UpdateSSHHostKey
|
||||
- `locker *redislock.Client` — per-device distributed lock
|
||||
- `publisher *bus.Publisher` — for NATS publishing
|
||||
- `credentialCache *vault.CredentialCache` — for decrypting device SSH creds
|
||||
- `redisClient *redis.Client` — for tracking device online status
|
||||
- `backupInterval time.Duration`
|
||||
- `commandTimeout time.Duration`
|
||||
- `refreshPeriod time.Duration` — how often to reconcile devices (reuse from existing scheduler, e.g., 60s)
|
||||
- `semaphore chan struct{}` — buffered channel of size maxConcurrent
|
||||
- `mu sync.Mutex`
|
||||
- `activeDevices map[string]*backupDeviceState`
|
||||
|
||||
`NewBackupScheduler(...)` constructor — accept all dependencies, create semaphore as `make(chan struct{}, maxConcurrent)`.
|
||||
|
||||
`Run(ctx context.Context) error` — mirrors existing Scheduler.Run pattern:
|
||||
- defer shutdown: cancel all device goroutines, wait for WaitGroup
|
||||
- Loop: reconcileBackupDevices(ctx, &wg), then select on ctx.Done or time.After(refreshPeriod)
|
||||
|
||||
`reconcileBackupDevices(ctx, wg)` — mirrors reconcileDevices:
|
||||
- FetchDevices from store
|
||||
- Start backup goroutines for new devices
|
||||
- Stop goroutines for removed devices
|
||||
|
||||
`runBackupLoop(ctx, dev, state)` — per-device backup goroutine:
|
||||
- On first run: sleep for randomJitter(30, 300) seconds, then do initial backup
|
||||
- After initial: ticker at backupInterval
|
||||
- On each tick:
|
||||
a. Check if device is online via Redis key `device:{id}:status` (set by status poll). If not online, log debug "skipped_offline", update state, increment ConfigBackupTotal("skipped_offline"), continue
|
||||
b. Check if lastErrorKind is ErrAuthFailed — skip with "skipped_auth_blocked", log warning with guidance to update credentials
|
||||
c. Check if lastErrorKind is ErrHostKeyMismatch — skip with "skipped_hostkey_blocked", log warning with guidance to reset host key
|
||||
d. Check backoff: if time.Now().Before(state.backoffUntil), skip
|
||||
e. Acquire semaphore (blocks if at max concurrency, does not drop)
|
||||
f. Acquire Redis lock `backup:device:{id}` with TTL = commandTimeout + 30s
|
||||
g. Call `collectAndPublish(ctx, dev, state)`
|
||||
h. Release semaphore
|
||||
i. Update state based on result
|
||||
|
||||
`collectAndPublish(ctx, dev, state) error`:
|
||||
- Increment ConfigBackupActive gauge
|
||||
- Defer decrement ConfigBackupActive gauge
|
||||
- Start timer for ConfigBackupDuration
|
||||
- Decrypt credentials via credentialCache.GetCredentials
|
||||
- Call `device.RunCommand(ctx, dev.IPAddress, dev.SSHPort, username, password, commandTimeout, knownFingerprint, "/export show-sensitive")`
|
||||
- On error: classify error kind, update state, apply backoff (transient: 5m/15m/1h exponential; auth/hostkey: block), return
|
||||
- If new fingerprint returned (TOFU first connect): call hostKeyStore.UpdateSSHHostKey
|
||||
- Validate output is non-empty and looks like RouterOS config (basic sanity: contains "/")
|
||||
- Call `device.NormalizeConfig(result.Stdout)`
|
||||
- Call `device.HashConfig(normalized)`
|
||||
- Build `bus.ConfigSnapshotEvent` with device_id, tenant_id, routeros_version (from device or Redis), collected_at (RFC3339 now), sha256_hash, config_text, normalization_version
|
||||
- Call `publisher.PublishConfigSnapshot(ctx, event)`
|
||||
- On success: reset consecutiveFailures, update lastSuccessAt, increment ConfigBackupTotal("success")
|
||||
- Record ConfigBackupDuration
|
||||
|
||||
`randomJitter(minSeconds, maxSeconds int) time.Duration` — uses math/rand for uniform distribution
|
||||
|
||||
Backoff for transient errors: `calculateBackupBackoff(failures int) time.Duration`:
|
||||
- 1 failure: 5 min
|
||||
- 2 failures: 15 min
|
||||
- 3+ failures: 1 hour (cap)
|
||||
|
||||
Device online check via Redis: check if key `device:{id}:status` equals "online". This key is set by the existing status poll publisher flow. If key doesn't exist, assume device might be online (first poll hasn't happened yet) — allow backup attempt.
|
||||
|
||||
RouterOS version: read from the Device struct's RouterOSVersion field (populated by store query). If nil, use empty string in the event.
|
||||
|
||||
**Important implementation notes:**
|
||||
- Use `log/slog` for all logging (structured JSON, matching existing pattern)
|
||||
- Use existing `redislock` pattern from worker.go for per-device locking
|
||||
- Semaphore pattern: `s.semaphore <- struct{}{}` to acquire, `<-s.semaphore` to release
|
||||
- Do NOT share circuit breaker state with the status poll scheduler — these are independent
|
||||
- Partial/truncated output (SSHError with Kind ErrTruncatedOutput) is treated as transient error — never publish, apply backoff
|
||||
</action>
|
||||
<verify>
|
||||
<automated>cd /Volumes/ssd01/v9/the-other-dude/poller && go test ./internal/poller/ -run "TestBackup|TestJitter|TestBackoff|TestShouldRetry" -v -count=1</automated>
|
||||
</verify>
|
||||
<done>
|
||||
- BackupScheduler manages per-device backup goroutines independently from status poll scheduler
|
||||
- First backup uses 30-300s random jitter delay
|
||||
- Concurrency limited by buffered channel semaphore (default 10)
|
||||
- Per-device Redis lock prevents duplicate backups across pods
|
||||
- Auth failures and host key mismatches block retries with clear log messages
|
||||
- Transient errors use 5m/15m/1h exponential backoff
|
||||
- Offline devices are skipped without error
|
||||
- Successful backups normalize config, compute SHA256, and publish to NATS
|
||||
- TOFU fingerprint stored on first successful connection
|
||||
- All unit tests pass
|
||||
</done>
|
||||
</task>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 2: Wire BackupScheduler into main.go lifecycle</name>
|
||||
<files>poller/cmd/poller/main.go</files>
|
||||
<action>
|
||||
Add BackupScheduler initialization and startup to main.go, following the existing pattern of scheduler initialization (lines 250-278).
|
||||
|
||||
After the existing scheduler creation (around line 270), add a new section:
|
||||
|
||||
```
|
||||
// -----------------------------------------------------------------------
|
||||
// Start the config backup scheduler
|
||||
// -----------------------------------------------------------------------
|
||||
```
|
||||
|
||||
1. Convert config values to durations:
|
||||
```go
|
||||
backupInterval := time.Duration(cfg.ConfigBackupIntervalSeconds) * time.Second
|
||||
backupCmdTimeout := time.Duration(cfg.ConfigBackupCommandTimeoutSeconds) * time.Second
|
||||
```
|
||||
|
||||
2. Create BackupScheduler:
|
||||
```go
|
||||
backupScheduler := poller.NewBackupScheduler(
|
||||
deviceStore,
|
||||
deviceStore, // SSHHostKeyUpdater (DeviceStore satisfies this interface)
|
||||
locker,
|
||||
publisher,
|
||||
credentialCache,
|
||||
redisClient,
|
||||
backupInterval,
|
||||
backupCmdTimeout,
|
||||
refreshPeriod, // reuse existing device refresh period
|
||||
cfg.ConfigBackupMaxConcurrent,
|
||||
)
|
||||
```
|
||||
|
||||
3. Start in a goroutine (runs parallel with the main status poll scheduler):
|
||||
```go
|
||||
go func() {
|
||||
slog.Info("starting config backup scheduler",
|
||||
"interval", backupInterval,
|
||||
"max_concurrent", cfg.ConfigBackupMaxConcurrent,
|
||||
"command_timeout", backupCmdTimeout,
|
||||
)
|
||||
if err := backupScheduler.Run(ctx); err != nil {
|
||||
slog.Error("backup scheduler exited with error", "error", err)
|
||||
}
|
||||
}()
|
||||
```
|
||||
|
||||
The BackupScheduler shares the same ctx as everything else, so SIGINT/SIGTERM will trigger its shutdown via context cancellation. No additional shutdown logic needed — Run() returns when ctx is cancelled.
|
||||
|
||||
Log the startup with the same pattern as the existing scheduler startup log (line 273-276).
|
||||
</action>
|
||||
<verify>
|
||||
<automated>cd /Volumes/ssd01/v9/the-other-dude/poller && go build ./cmd/poller/ && echo "build successful"</automated>
|
||||
</verify>
|
||||
<done>
|
||||
- BackupScheduler created in main.go with all dependencies injected
|
||||
- Runs as a goroutine parallel to the status poll scheduler
|
||||
- Shares the same context for graceful shutdown
|
||||
- Startup logged with interval, max_concurrent, and command_timeout
|
||||
- Poller binary compiles successfully with the new scheduler wired in
|
||||
</done>
|
||||
</task>
|
||||
|
||||
</tasks>
|
||||
|
||||
<verification>
|
||||
1. `cd poller && go build ./cmd/poller/` — binary compiles with backup scheduler wired in
|
||||
2. `cd poller && go vet ./...` — no static analysis issues
|
||||
3. `cd poller && go test ./internal/poller/ -v -count=1` — all poller tests pass (existing + new backup tests)
|
||||
4. `cd poller && go test ./... -count=1` — full test suite passes
|
||||
</verification>
|
||||
|
||||
<success_criteria>
|
||||
- BackupScheduler runs independently from status poll scheduler with its own per-device goroutines
|
||||
- Devices get their first backup 30-300s after discovery, then every CONFIG_BACKUP_INTERVAL
|
||||
- SSH command execution uses TOFU host key verification and stores fingerprints on first connect
|
||||
- Config output is normalized, hashed, and published to NATS config.snapshot.create
|
||||
- Concurrency limited to CONFIG_BACKUP_MAX_CONCURRENT parallel SSH sessions
|
||||
- Auth/hostkey errors block retries; transient errors use exponential backoff (5m/15m/1h)
|
||||
- Offline devices are skipped gracefully
|
||||
- BackupScheduler is wired into main.go and starts/stops with the poller lifecycle
|
||||
- All tests pass, project compiles clean
|
||||
</success_criteria>
|
||||
|
||||
<output>
|
||||
After completion, create `.planning/phases/02-poller-config-collection/02-02-SUMMARY.md`
|
||||
</output>
|
||||
Reference in New Issue
Block a user