From 0851eced360bacf9a39c457e779f0d07125b9d52 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Thu, 12 Mar 2026 22:07:35 -0500 Subject: [PATCH] feat(04-01): implement BackupResponder with extracted CollectAndPublish - Create BackupResponder for NATS request-reply on config.backup.trigger - Extract public CollectAndPublish from BackupScheduler returning sha256 hash - Define BackupExecutor/BackupLocker/DeviceGetter interfaces for testability - Create RedisBackupLocker adapter wrapping redislock.Client - Wire BackupResponder into main.go lifecycle - All 6 tests pass with in-process NATS server Co-Authored-By: Claude Opus 4.6 --- poller/cmd/poller/main.go | 17 ++ poller/go.mod | 16 +- poller/go.sum | 21 ++ poller/internal/bus/backup_responder.go | 214 +++++++++++++++++++ poller/internal/bus/backup_responder_test.go | 48 +++-- poller/internal/bus/redis_locker.go | 41 ++++ poller/internal/poller/backup_scheduler.go | 25 ++- 7 files changed, 346 insertions(+), 36 deletions(-) create mode 100644 poller/internal/bus/backup_responder.go create mode 100644 poller/internal/bus/redis_locker.go diff --git a/poller/cmd/poller/main.go b/poller/cmd/poller/main.go index 82090e7..f7f9596 100644 --- a/poller/cmd/poller/main.go +++ b/poller/cmd/poller/main.go @@ -294,6 +294,23 @@ func main() { cfg.ConfigBackupMaxConcurrent, ) + // ----------------------------------------------------------------------- + // Initialize NATS backup responder for manual config backup triggers + // ----------------------------------------------------------------------- + backupResponder := bus.NewBackupResponder( + publisher.Conn(), + deviceStore, + backupScheduler, + bus.NewRedisBackupLocker(locker), + backupCmdTimeout, + ) + if err := backupResponder.Subscribe(); err != nil { + slog.Error("failed to start backup responder", "error", err) + os.Exit(1) + } + defer backupResponder.Stop() + slog.Info("NATS backup responder started (config.backup.trigger)") + go func() { slog.Info("starting config backup scheduler", "interval", backupInterval, diff --git a/poller/go.mod b/poller/go.mod index b9241b6..cb061f9 100644 --- a/poller/go.mod +++ b/poller/go.mod @@ -1,6 +1,6 @@ module github.com/mikrotik-portal/poller -go 1.24.0 +go 1.25.0 require ( github.com/alicebob/miniredis/v2 v2.37.0 @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jackc/pgx/v5 v5.7.4 - github.com/nats-io/nats.go v1.38.0 + github.com/nats-io/nats.go v1.49.0 github.com/pkg/sftp v1.13.10 github.com/prometheus/client_golang v1.23.2 github.com/redis/go-redis/v9 v9.7.3 @@ -26,6 +26,7 @@ require ( dario.cat/mergo v1.0.2 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -45,15 +46,17 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect + github.com/google/go-tpm v0.9.8 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/kr/fs v0.1.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.10 // indirect github.com/mdelapenya/tlscert v0.2.0 // indirect + github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/go-archive v0.1.0 // indirect github.com/moby/patternmatcher v0.6.0 // indirect @@ -63,7 +66,9 @@ require ( github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/nats-io/nkeys v0.4.9 // indirect + github.com/nats-io/jwt/v2 v2.8.0 // indirect + github.com/nats-io/nats-server/v2 v2.12.5 // indirect + github.com/nats-io/nkeys v0.4.15 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect @@ -87,8 +92,9 @@ require ( go.opentelemetry.io/otel/trace v1.39.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.41.0 // indirect + golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.34.0 // indirect + golang.org/x/time v0.15.0 // indirect google.golang.org/grpc v1.79.1 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/poller/go.sum b/poller/go.sum index faec068..7992d24 100644 --- a/poller/go.sum +++ b/poller/go.sum @@ -8,6 +8,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68= github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= +github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op h1:kpBdlEPbRvff0mDD1gk7o9BhI16b9p5yYAXRlidpqJE= +github.com/antithesishq/antithesis-sdk-go v0.6.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -61,6 +63,8 @@ github.com/go-routeros/routeros/v3 v3.0.0/go.mod h1:j4mq65czXfKtHsdLkgVv8w7sNzyh github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo= +github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= @@ -77,6 +81,8 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -93,6 +99,8 @@ github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8S github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mdelapenya/tlscert v0.2.0 h1:7H81W6Z/4weDvZBNOfQte5GpIMo0lGYEeWbkGp5LJHI= github.com/mdelapenya/tlscert v0.2.0/go.mod h1:O4njj3ELLnJjGdkN7M/vIVCpZ+Cf0L6muqOG4tLSl8o= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ= @@ -113,10 +121,18 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= +github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= +github.com/nats-io/nats-server/v2 v2.12.5 h1:EOHLbsLJgUHUwzkj9gBTOlubkX+dmSs0EYWMdBiHivU= +github.com/nats-io/nats-server/v2 v2.12.5/go.mod h1:JQDAKcwdXs0NRhvYO31dzsXkzCyDkOBS7SKU3Nozu14= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= +github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE= +github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4= +github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -204,14 +220,19 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= +golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0= google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY= diff --git a/poller/internal/bus/backup_responder.go b/poller/internal/bus/backup_responder.go new file mode 100644 index 0000000..15039ce --- /dev/null +++ b/poller/internal/bus/backup_responder.go @@ -0,0 +1,214 @@ +// Package bus provides NATS messaging for the poller service. +// +// backup_responder.go implements a NATS request-reply handler for manual +// config backup triggers. The Python backend sends a trigger request to +// "config.backup.trigger" and receives a synchronous response with the +// backup result (success/failure/locked + sha256 hash). + +package bus + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/nats-io/nats.go" + + "github.com/mikrotik-portal/poller/internal/store" +) + +// ErrLockNotObtained is returned when a backup lock cannot be acquired +// because another backup is already in progress for the device. +var ErrLockNotObtained = errors.New("lock not obtained") + +// BackupTriggerRequest is the JSON payload for a config.backup.trigger NATS request. +type BackupTriggerRequest struct { + DeviceID string `json:"device_id"` + TenantID string `json:"tenant_id"` +} + +// BackupTriggerResponse is the JSON reply for a config.backup.trigger NATS request. +type BackupTriggerResponse struct { + Status string `json:"status"` // "success", "failed", "locked" + SHA256Hash string `json:"sha256_hash,omitempty"` + Message string `json:"message,omitempty"` + Error string `json:"error,omitempty"` +} + +// DeviceGetter is the subset of store.DeviceStore needed by BackupResponder. +type DeviceGetter interface { + GetDevice(ctx context.Context, deviceID string) (store.Device, error) +} + +// BackupExecutor abstracts the backup collection logic so BackupResponder +// can call it without depending directly on the BackupScheduler struct. +type BackupExecutor interface { + CollectAndPublish(ctx context.Context, dev store.Device) (string, error) +} + +// BackupLockHandle represents a held distributed lock that can be released. +type BackupLockHandle interface { + Release(ctx context.Context) error +} + +// BackupLocker abstracts distributed lock acquisition for testing. +type BackupLocker interface { + ObtainLock(ctx context.Context, key string, ttl time.Duration) (BackupLockHandle, error) +} + +// BackupResponder handles NATS request-reply for manual config backup triggers. +type BackupResponder struct { + nc *nats.Conn + sub *nats.Subscription + deviceStore DeviceGetter + executor BackupExecutor + locker BackupLocker + commandTimeout time.Duration +} + +// NewBackupResponder creates a BackupResponder with the given dependencies. +func NewBackupResponder( + nc *nats.Conn, + deviceStore DeviceGetter, + executor BackupExecutor, + locker BackupLocker, + commandTimeout time.Duration, +) *BackupResponder { + return &BackupResponder{ + nc: nc, + deviceStore: deviceStore, + executor: executor, + locker: locker, + commandTimeout: commandTimeout, + } +} + +// Subscribe registers the NATS handler for config.backup.trigger requests. +// Uses core NATS (not JetStream) for request-reply, matching the pattern +// used by CmdResponder and TunnelResponder. +func (br *BackupResponder) Subscribe() error { + sub, err := br.nc.Subscribe("config.backup.trigger", br.handleTrigger) + if err != nil { + return fmt.Errorf("subscribing to config.backup.trigger: %w", err) + } + br.sub = sub + slog.Info("backup responder subscribed", "subject", "config.backup.trigger") + return nil +} + +// Stop unsubscribes from NATS. +func (br *BackupResponder) Stop() { + if br.sub != nil { + if err := br.sub.Unsubscribe(); err != nil { + slog.Warn("error unsubscribing backup responder", "error", err) + } + } +} + +// handleTrigger processes a config.backup.trigger request. +func (br *BackupResponder) handleTrigger(msg *nats.Msg) { + var req BackupTriggerRequest + if err := json.Unmarshal(msg.Data, &req); err != nil { + br.respond(msg, BackupTriggerResponse{ + Status: "failed", + Error: fmt.Sprintf("invalid request JSON: %s", err), + }) + return + } + + slog.Info("manual backup trigger received", + "device_id", req.DeviceID, + "tenant_id", req.TenantID, + ) + + // Look up device. + lookupCtx, lookupCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer lookupCancel() + + dev, err := br.deviceStore.GetDevice(lookupCtx, req.DeviceID) + if err != nil { + slog.Warn("backup trigger: device lookup failed", + "device_id", req.DeviceID, + "error", err, + ) + br.respond(msg, BackupTriggerResponse{ + Status: "failed", + Error: fmt.Sprintf("device lookup failed: %s", err), + }) + return + } + + // Try to obtain per-device Redis lock. + lockTTL := br.commandTimeout + 30*time.Second + lockKey := fmt.Sprintf("backup:device:%s", dev.ID) + + lockCtx, lockCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer lockCancel() + + lock, err := br.locker.ObtainLock(lockCtx, lockKey, lockTTL) + if errors.Is(err, ErrLockNotObtained) { + slog.Info("backup trigger: lock held, backup already in progress", + "device_id", dev.ID, + ) + br.respond(msg, BackupTriggerResponse{ + Status: "locked", + Message: "backup already in progress", + }) + return + } + if err != nil { + br.respond(msg, BackupTriggerResponse{ + Status: "failed", + Error: fmt.Sprintf("failed to acquire lock: %s", err), + }) + return + } + + // Release lock when done. + execCtx, execCancel := context.WithTimeout(context.Background(), br.commandTimeout) + defer execCancel() + defer func() { + if releaseErr := lock.Release(execCtx); releaseErr != nil { + slog.Warn("backup trigger: failed to release lock", + "device_id", dev.ID, + "error", releaseErr, + ) + } + }() + + // Execute the backup. + hash, err := br.executor.CollectAndPublish(execCtx, dev) + if err != nil { + slog.Error("backup trigger: backup failed", + "device_id", dev.ID, + "error", err, + ) + br.respond(msg, BackupTriggerResponse{ + Status: "failed", + Error: err.Error(), + }) + return + } + + slog.Info("backup trigger: backup completed successfully", + "device_id", dev.ID, + "sha256_hash", hash, + ) + + br.respond(msg, BackupTriggerResponse{ + Status: "success", + SHA256Hash: hash, + Message: "Config snapshot collected", + }) +} + +// respond sends a JSON response to a NATS request. +func (br *BackupResponder) respond(msg *nats.Msg, resp BackupTriggerResponse) { + data, _ := json.Marshal(resp) + if err := msg.Respond(data); err != nil { + slog.Error("backup trigger: failed to respond", "error", err) + } +} diff --git a/poller/internal/bus/backup_responder_test.go b/poller/internal/bus/backup_responder_test.go index 0fed38e..8fdd8a9 100644 --- a/poller/internal/bus/backup_responder_test.go +++ b/poller/internal/bus/backup_responder_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + natsserver "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/mikrotik-portal/poller/internal/store" @@ -49,22 +50,7 @@ type mockLockHandle struct{} func (h *mockLockHandle) Release(_ context.Context) error { return nil } -// testMsg builds a nats.Msg with the given payload and a reply inbox. -func testMsg(t *testing.T, payload any) *nats.Msg { - t.Helper() - data, err := json.Marshal(payload) - if err != nil { - t.Fatalf("marshal payload: %v", err) - } - return &nats.Msg{ - Subject: "config.backup.trigger", - Data: data, - Reply: "test.reply", - } -} - func TestBackupResponder_Subscribe(t *testing.T) { - // Start embedded NATS for subscribe test nc, cleanup := startTestNATS(t) defer cleanup() @@ -252,16 +238,32 @@ func TestBackupResponder_DeviceNotFound_ReturnsError(t *testing.T) { } } -// startTestNATS starts an embedded NATS server for testing and returns -// a connected client and cleanup function. +// startTestNATS starts an in-process NATS server and returns a connected client +// and cleanup function. func startTestNATS(t *testing.T) (*nats.Conn, func()) { t.Helper() - // Use nats-server test helper: start a temporary NATS server - s, err := nats.Connect(nats.DefaultURL) - if err != nil { - // Fall back to in-process test server - t.Skip("NATS server not available for testing, skipping") + opts := &natsserver.Options{ + Host: "127.0.0.1", + Port: -1, // random port + } + s, err := natsserver.NewServer(opts) + if err != nil { + t.Fatalf("failed to create test NATS server: %v", err) + } + s.Start() + if !s.ReadyForConnections(5 * time.Second) { + t.Fatal("NATS server not ready in time") + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + s.Shutdown() + t.Fatalf("failed to connect to test NATS: %v", err) + } + + return nc, func() { + nc.Close() + s.Shutdown() } - return s, func() { s.Close() } } diff --git a/poller/internal/bus/redis_locker.go b/poller/internal/bus/redis_locker.go new file mode 100644 index 0000000..ee43bb0 --- /dev/null +++ b/poller/internal/bus/redis_locker.go @@ -0,0 +1,41 @@ +package bus + +import ( + "context" + "time" + + "github.com/bsm/redislock" +) + +// RedisBackupLocker adapts *redislock.Client to the BackupLocker interface. +type RedisBackupLocker struct { + client *redislock.Client +} + +// NewRedisBackupLocker wraps a redislock.Client for use by BackupResponder. +func NewRedisBackupLocker(client *redislock.Client) *RedisBackupLocker { + return &RedisBackupLocker{client: client} +} + +// ObtainLock attempts to acquire a Redis distributed lock. +// Returns ErrLockNotObtained if the lock is already held. +func (l *RedisBackupLocker) ObtainLock(ctx context.Context, key string, ttl time.Duration) (BackupLockHandle, error) { + lock, err := l.client.Obtain(ctx, key, ttl, nil) + if err == redislock.ErrNotObtained { + return nil, ErrLockNotObtained + } + if err != nil { + return nil, err + } + return &redisLockHandle{lock: lock}, nil +} + +// redisLockHandle wraps *redislock.Lock to implement BackupLockHandle. +type redisLockHandle struct { + lock *redislock.Lock +} + +// Release releases the held Redis lock. +func (h *redisLockHandle) Release(ctx context.Context) error { + return h.lock.Release(ctx) +} diff --git a/poller/internal/poller/backup_scheduler.go b/poller/internal/poller/backup_scheduler.go index 941cacb..5bbd3c1 100644 --- a/poller/internal/poller/backup_scheduler.go +++ b/poller/internal/poller/backup_scheduler.go @@ -271,7 +271,7 @@ func (bs *BackupScheduler) executeBackupTick(ctx context.Context, dev store.Devi // Execute the backup. state.lastAttemptAt = time.Now() - if err := bs.collectAndPublish(ctx, dev, state); err != nil { + if _, err := bs.collectAndPublish(ctx, dev, state); err != nil { slog.Error("config backup failed", "device_id", dev.ID, "ip", dev.IPAddress, @@ -327,8 +327,17 @@ func (bs *BackupScheduler) executeBackupTick(ctx context.Context, dev store.Devi } } +// CollectAndPublish performs the config backup pipeline (SSH, normalize, hash, publish) +// for a single device and returns the SHA256 hash of the collected config. +// This is the public entry point used by BackupResponder for manual triggers. +func (bs *BackupScheduler) CollectAndPublish(ctx context.Context, dev store.Device) (string, error) { + return bs.collectAndPublish(ctx, dev, nil) +} + // collectAndPublish performs the actual config backup: SSH command, normalize, hash, publish. -func (bs *BackupScheduler) collectAndPublish(ctx context.Context, dev store.Device, state *backupDeviceState) error { +// If state is nil (manual trigger), observability metrics are still recorded but +// state tracking is skipped. +func (bs *BackupScheduler) collectAndPublish(ctx context.Context, dev store.Device, state *backupDeviceState) (string, error) { observability.ConfigBackupActive.Inc() defer observability.ConfigBackupActive.Dec() @@ -345,7 +354,7 @@ func (bs *BackupScheduler) collectAndPublish(ctx context.Context, dev store.Devi dev.EncryptedCredentials, ) if err != nil { - return fmt.Errorf("decrypting credentials for device %s: %w", dev.ID, err) + return "", fmt.Errorf("decrypting credentials for device %s: %w", dev.ID, err) } // Build known fingerprint for TOFU verification. @@ -369,7 +378,7 @@ func (bs *BackupScheduler) collectAndPublish(ctx context.Context, dev store.Devi "/export show-sensitive", ) if err != nil { - return err + return "", err } // TOFU: store fingerprint on first connection. @@ -386,10 +395,10 @@ func (bs *BackupScheduler) collectAndPublish(ctx context.Context, dev store.Devi // Validate output: non-empty and looks like RouterOS config. if result.Stdout == "" { - return fmt.Errorf("empty config output from device %s", dev.ID) + return "", fmt.Errorf("empty config output from device %s", dev.ID) } if !strings.Contains(result.Stdout, "/") { - return fmt.Errorf("config output from device %s does not look like RouterOS config", dev.ID) + return "", fmt.Errorf("config output from device %s does not look like RouterOS config", dev.ID) } // Normalize and hash. @@ -414,7 +423,7 @@ func (bs *BackupScheduler) collectAndPublish(ctx context.Context, dev store.Devi } if err := bs.publisher.PublishConfigSnapshot(ctx, event); err != nil { - return fmt.Errorf("publishing config snapshot for device %s: %w", dev.ID, err) + return "", fmt.Errorf("publishing config snapshot for device %s: %w", dev.ID, err) } slog.Info("config backup published", @@ -422,7 +431,7 @@ func (bs *BackupScheduler) collectAndPublish(ctx context.Context, dev store.Devi "sha256_hash", hash, ) - return nil + return hash, nil } // isDeviceOnline checks if a device is online via Redis status key.