--- phase: 02-poller-config-collection plan: 02 type: execute wave: 2 depends_on: ["02-01"] files_modified: - poller/internal/poller/backup_scheduler.go - poller/internal/poller/backup_scheduler_test.go - poller/internal/poller/interfaces.go - poller/cmd/poller/main.go autonomous: true requirements: [COLL-01, COLL-03, COLL-05, COLL-06] must_haves: truths: - "Poller runs /export show-sensitive via SSH on each online RouterOS device at a configurable interval (default 6h)" - "Poller publishes normalized config snapshot to NATS config.snapshot.create with device_id, tenant_id, sha256_hash, config_text" - "Unreachable devices log a warning and are retried on the next interval without blocking other devices" - "Backup interval is configurable via CONFIG_BACKUP_INTERVAL environment variable" - "First backup runs with randomized jitter (30-300s) after device discovery" - "Global concurrency is limited via CONFIG_BACKUP_MAX_CONCURRENT semaphore" - "Auth failures and host key mismatches block retries until resolved" artifacts: - path: "poller/internal/poller/backup_scheduler.go" provides: "BackupScheduler managing per-device backup goroutines with concurrency, retry, and NATS publishing" exports: ["BackupScheduler", "NewBackupScheduler"] min_lines: 200 - path: "poller/internal/poller/backup_scheduler_test.go" provides: "Unit tests for backup scheduling, jitter, concurrency, error handling" - path: "poller/internal/poller/interfaces.go" provides: "SSHHostKeyUpdater interface for device store dependency" - path: "poller/cmd/poller/main.go" provides: "BackupScheduler initialization and lifecycle wiring" key_links: - from: "poller/internal/poller/backup_scheduler.go" to: "poller/internal/device/ssh_executor.go" via: "Calls device.RunCommand to execute /export show-sensitive" pattern: "device\\.RunCommand" - from: "poller/internal/poller/backup_scheduler.go" to: "poller/internal/device/normalize.go" via: "Calls device.NormalizeConfig and device.HashConfig on SSH output" pattern: "device\\.NormalizeConfig|device\\.HashConfig" - from: "poller/internal/poller/backup_scheduler.go" to: "poller/internal/bus/publisher.go" via: "Calls publisher.PublishConfigSnapshot with ConfigSnapshotEvent" pattern: "publisher\\.PublishConfigSnapshot|bus\\.ConfigSnapshotEvent" - from: "poller/internal/poller/backup_scheduler.go" to: "poller/internal/store/devices.go" via: "Calls store.UpdateSSHHostKey for TOFU fingerprint storage" pattern: "UpdateSSHHostKey" - from: "poller/cmd/poller/main.go" to: "poller/internal/poller/backup_scheduler.go" via: "Creates and starts BackupScheduler in main goroutine lifecycle" pattern: "NewBackupScheduler|backupScheduler\\.Run" --- Build the backup scheduler that orchestrates periodic SSH config collection from RouterOS devices, normalizes output, and publishes to NATS. Wire it into the poller's main lifecycle. Purpose: This is the core orchestration that ties together the SSH executor, normalizer, and NATS publisher from Plan 01 into a running backup collection system with proper scheduling, concurrency control, error handling, and retry logic. Output: BackupScheduler module fully integrated into the poller's main.go lifecycle. @/Users/jasonstaack/.claude/get-shit-done/workflows/execute-plan.md @/Users/jasonstaack/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-poller-config-collection/02-CONTEXT.md @.planning/phases/02-poller-config-collection/02-01-SUMMARY.md @poller/internal/poller/scheduler.go @poller/internal/poller/worker.go @poller/internal/poller/interfaces.go @poller/cmd/poller/main.go @poller/internal/device/ssh_executor.go @poller/internal/device/normalize.go @poller/internal/bus/publisher.go @poller/internal/config/config.go @poller/internal/store/devices.go @poller/internal/observability/metrics.go From poller/internal/device/ssh_executor.go (created in Plan 01): ```go type SSHErrorKind string const ( ErrAuthFailed SSHErrorKind = "auth_failed" ErrHostKeyMismatch SSHErrorKind = "host_key_mismatch" ErrTimeout SSHErrorKind = "timeout" ErrTruncatedOutput SSHErrorKind = "truncated_output" ErrConnectionRefused SSHErrorKind = "connection_refused" ErrUnknown SSHErrorKind = "unknown" ) type SSHError struct { Kind SSHErrorKind; Err error; Message string } type CommandResult struct { Stdout string; Stderr string; ExitCode int; Duration time.Duration } func RunCommand(ctx context.Context, ip string, port int, username, password string, timeout time.Duration, knownFingerprint string, command string) (*CommandResult, string, error) ``` From poller/internal/device/normalize.go (created in Plan 01): ```go func NormalizeConfig(raw string) string func HashConfig(normalized string) string const NormalizationVersion = 1 ``` From poller/internal/bus/publisher.go (modified in Plan 01): ```go type ConfigSnapshotEvent struct { DeviceID string `json:"device_id"` TenantID string `json:"tenant_id"` RouterOSVersion string `json:"routeros_version,omitempty"` CollectedAt string `json:"collected_at"` SHA256Hash string `json:"sha256_hash"` ConfigText string `json:"config_text"` NormalizationVersion int `json:"normalization_version"` } func (p *Publisher) PublishConfigSnapshot(ctx context.Context, event ConfigSnapshotEvent) error ``` From poller/internal/store/devices.go (modified in Plan 01): ```go type Device struct { // ... existing fields ... SSHPort int SSHHostKeyFingerprint *string } func (s *DeviceStore) UpdateSSHHostKey(ctx context.Context, deviceID string, fingerprint string) error ``` From poller/internal/config/config.go (modified in Plan 01): ```go type Config struct { // ... existing fields ... ConfigBackupIntervalSeconds int ConfigBackupMaxConcurrent int ConfigBackupCommandTimeoutSeconds int } ``` From poller/internal/observability/metrics.go (modified in Plan 01): ```go var ConfigBackupTotal *prometheus.CounterVec // labels: ["status"] var ConfigBackupDuration prometheus.Histogram var ConfigBackupActive prometheus.Gauge ``` From poller/internal/poller/scheduler.go: ```go type Scheduler struct { ... } func NewScheduler(...) *Scheduler func (s *Scheduler) Run(ctx context.Context) error func (s *Scheduler) reconcileDevices(ctx context.Context, wg *sync.WaitGroup) error func (s *Scheduler) runDeviceLoop(ctx context.Context, dev store.Device, ds *deviceState) // per-device goroutine with ticker ``` From poller/internal/poller/interfaces.go: ```go type DeviceFetcher interface { FetchDevices(ctx context.Context) ([]store.Device, error) } ``` Task 1: BackupScheduler with per-device goroutines, concurrency control, and retry logic poller/internal/poller/backup_scheduler.go, poller/internal/poller/backup_scheduler_test.go, poller/internal/poller/interfaces.go - Test jitter generation: randomJitter(30, 300) returns value in [30s, 300s] range - Test backoff sequence: given consecutive failures, backoff returns 5m, 15m, 1h, then caps at 1h - Test auth failure blocking: when last error is ErrAuthFailed, shouldRetry returns false - Test host key mismatch blocking: when last error is ErrHostKeyMismatch, shouldRetry returns false - Test online-only gating: backup is skipped for devices not currently marked online - Test concurrency semaphore: when semaphore is full, backup waits (does not drop) **1. Update interfaces.go:** Add `SSHHostKeyUpdater` interface (consumer-side, Go best practice): ```go type SSHHostKeyUpdater interface { UpdateSSHHostKey(ctx context.Context, deviceID string, fingerprint string) error } ``` **2. Create backup_scheduler.go:** Define `backupDeviceState` struct tracking per-device backup state: - `cancel context.CancelFunc` - `lastAttemptAt time.Time` - `lastSuccessAt time.Time` - `lastStatus string` — "success", "error", "skipped_offline", "auth_blocked", "hostkey_blocked" - `lastError string` - `consecutiveFailures int` - `backoffUntil time.Time` - `lastErrorKind device.SSHErrorKind` — tracks whether error is auth/hostkey (blocks retry) Define `BackupScheduler` struct: - `store DeviceFetcher` — reuse existing interface for FetchDevices - `hostKeyStore SSHHostKeyUpdater` — for UpdateSSHHostKey - `locker *redislock.Client` — per-device distributed lock - `publisher *bus.Publisher` — for NATS publishing - `credentialCache *vault.CredentialCache` — for decrypting device SSH creds - `redisClient *redis.Client` — for tracking device online status - `backupInterval time.Duration` - `commandTimeout time.Duration` - `refreshPeriod time.Duration` — how often to reconcile devices (reuse from existing scheduler, e.g., 60s) - `semaphore chan struct{}` — buffered channel of size maxConcurrent - `mu sync.Mutex` - `activeDevices map[string]*backupDeviceState` `NewBackupScheduler(...)` constructor — accept all dependencies, create semaphore as `make(chan struct{}, maxConcurrent)`. `Run(ctx context.Context) error` — mirrors existing Scheduler.Run pattern: - defer shutdown: cancel all device goroutines, wait for WaitGroup - Loop: reconcileBackupDevices(ctx, &wg), then select on ctx.Done or time.After(refreshPeriod) `reconcileBackupDevices(ctx, wg)` — mirrors reconcileDevices: - FetchDevices from store - Start backup goroutines for new devices - Stop goroutines for removed devices `runBackupLoop(ctx, dev, state)` — per-device backup goroutine: - On first run: sleep for randomJitter(30, 300) seconds, then do initial backup - After initial: ticker at backupInterval - On each tick: a. Check if device is online via Redis key `device:{id}:status` (set by status poll). If not online, log debug "skipped_offline", update state, increment ConfigBackupTotal("skipped_offline"), continue b. Check if lastErrorKind is ErrAuthFailed — skip with "skipped_auth_blocked", log warning with guidance to update credentials c. Check if lastErrorKind is ErrHostKeyMismatch — skip with "skipped_hostkey_blocked", log warning with guidance to reset host key d. Check backoff: if time.Now().Before(state.backoffUntil), skip e. Acquire semaphore (blocks if at max concurrency, does not drop) f. Acquire Redis lock `backup:device:{id}` with TTL = commandTimeout + 30s g. Call `collectAndPublish(ctx, dev, state)` h. Release semaphore i. Update state based on result `collectAndPublish(ctx, dev, state) error`: - Increment ConfigBackupActive gauge - Defer decrement ConfigBackupActive gauge - Start timer for ConfigBackupDuration - Decrypt credentials via credentialCache.GetCredentials - Call `device.RunCommand(ctx, dev.IPAddress, dev.SSHPort, username, password, commandTimeout, knownFingerprint, "/export show-sensitive")` - On error: classify error kind, update state, apply backoff (transient: 5m/15m/1h exponential; auth/hostkey: block), return - If new fingerprint returned (TOFU first connect): call hostKeyStore.UpdateSSHHostKey - Validate output is non-empty and looks like RouterOS config (basic sanity: contains "/") - Call `device.NormalizeConfig(result.Stdout)` - Call `device.HashConfig(normalized)` - Build `bus.ConfigSnapshotEvent` with device_id, tenant_id, routeros_version (from device or Redis), collected_at (RFC3339 now), sha256_hash, config_text, normalization_version - Call `publisher.PublishConfigSnapshot(ctx, event)` - On success: reset consecutiveFailures, update lastSuccessAt, increment ConfigBackupTotal("success") - Record ConfigBackupDuration `randomJitter(minSeconds, maxSeconds int) time.Duration` — uses math/rand for uniform distribution Backoff for transient errors: `calculateBackupBackoff(failures int) time.Duration`: - 1 failure: 5 min - 2 failures: 15 min - 3+ failures: 1 hour (cap) Device online check via Redis: check if key `device:{id}:status` equals "online". This key is set by the existing status poll publisher flow. If key doesn't exist, assume device might be online (first poll hasn't happened yet) — allow backup attempt. RouterOS version: read from the Device struct's RouterOSVersion field (populated by store query). If nil, use empty string in the event. **Important implementation notes:** - Use `log/slog` for all logging (structured JSON, matching existing pattern) - Use existing `redislock` pattern from worker.go for per-device locking - Semaphore pattern: `s.semaphore <- struct{}{}` to acquire, `<-s.semaphore` to release - Do NOT share circuit breaker state with the status poll scheduler — these are independent - Partial/truncated output (SSHError with Kind ErrTruncatedOutput) is treated as transient error — never publish, apply backoff cd /Volumes/ssd01/v9/the-other-dude/poller && go test ./internal/poller/ -run "TestBackup|TestJitter|TestBackoff|TestShouldRetry" -v -count=1 - BackupScheduler manages per-device backup goroutines independently from status poll scheduler - First backup uses 30-300s random jitter delay - Concurrency limited by buffered channel semaphore (default 10) - Per-device Redis lock prevents duplicate backups across pods - Auth failures and host key mismatches block retries with clear log messages - Transient errors use 5m/15m/1h exponential backoff - Offline devices are skipped without error - Successful backups normalize config, compute SHA256, and publish to NATS - TOFU fingerprint stored on first successful connection - All unit tests pass Task 2: Wire BackupScheduler into main.go lifecycle poller/cmd/poller/main.go Add BackupScheduler initialization and startup to main.go, following the existing pattern of scheduler initialization (lines 250-278). After the existing scheduler creation (around line 270), add a new section: ``` // ----------------------------------------------------------------------- // Start the config backup scheduler // ----------------------------------------------------------------------- ``` 1. Convert config values to durations: ```go backupInterval := time.Duration(cfg.ConfigBackupIntervalSeconds) * time.Second backupCmdTimeout := time.Duration(cfg.ConfigBackupCommandTimeoutSeconds) * time.Second ``` 2. Create BackupScheduler: ```go backupScheduler := poller.NewBackupScheduler( deviceStore, deviceStore, // SSHHostKeyUpdater (DeviceStore satisfies this interface) locker, publisher, credentialCache, redisClient, backupInterval, backupCmdTimeout, refreshPeriod, // reuse existing device refresh period cfg.ConfigBackupMaxConcurrent, ) ``` 3. Start in a goroutine (runs parallel with the main status poll scheduler): ```go go func() { slog.Info("starting config backup scheduler", "interval", backupInterval, "max_concurrent", cfg.ConfigBackupMaxConcurrent, "command_timeout", backupCmdTimeout, ) if err := backupScheduler.Run(ctx); err != nil { slog.Error("backup scheduler exited with error", "error", err) } }() ``` The BackupScheduler shares the same ctx as everything else, so SIGINT/SIGTERM will trigger its shutdown via context cancellation. No additional shutdown logic needed — Run() returns when ctx is cancelled. Log the startup with the same pattern as the existing scheduler startup log (line 273-276). cd /Volumes/ssd01/v9/the-other-dude/poller && go build ./cmd/poller/ && echo "build successful" - BackupScheduler created in main.go with all dependencies injected - Runs as a goroutine parallel to the status poll scheduler - Shares the same context for graceful shutdown - Startup logged with interval, max_concurrent, and command_timeout - Poller binary compiles successfully with the new scheduler wired in 1. `cd poller && go build ./cmd/poller/` — binary compiles with backup scheduler wired in 2. `cd poller && go vet ./...` — no static analysis issues 3. `cd poller && go test ./internal/poller/ -v -count=1` — all poller tests pass (existing + new backup tests) 4. `cd poller && go test ./... -count=1` — full test suite passes - BackupScheduler runs independently from status poll scheduler with its own per-device goroutines - Devices get their first backup 30-300s after discovery, then every CONFIG_BACKUP_INTERVAL - SSH command execution uses TOFU host key verification and stores fingerprints on first connect - Config output is normalized, hashed, and published to NATS config.snapshot.create - Concurrency limited to CONFIG_BACKUP_MAX_CONCURRENT parallel SSH sessions - Auth/hostkey errors block retries; transient errors use exponential backoff (5m/15m/1h) - Offline devices are skipped gracefully - BackupScheduler is wired into main.go and starts/stops with the poller lifecycle - All tests pass, project compiles clean After completion, create `.planning/phases/02-poller-config-collection/02-02-SUMMARY.md`