diff --git a/backend/alembic/versions/028_device_ssh_host_key.py b/backend/alembic/versions/028_device_ssh_host_key.py new file mode 100644 index 0000000..653d94f --- /dev/null +++ b/backend/alembic/versions/028_device_ssh_host_key.py @@ -0,0 +1,64 @@ +"""Add SSH host key columns to devices table. + +Adds columns for SSH config backup support: +- ssh_port: SSH port override (default 22) +- ssh_host_key_fingerprint: TOFU host key fingerprint (SHA256:base64) +- ssh_host_key_first_seen: when the host key was first observed +- ssh_host_key_last_verified: when the host key was last verified + +Grants UPDATE on SSH columns to poller_user for TOFU persistence. + +Revision ID: 028 +Revises: 027 +Create Date: 2026-03-13 +""" + +revision = "028" +down_revision = "027" +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade() -> None: + conn = op.get_bind() + + conn.execute(sa.text( + "ALTER TABLE devices ADD COLUMN ssh_port INTEGER DEFAULT 22" + )) + conn.execute(sa.text( + "ALTER TABLE devices ADD COLUMN ssh_host_key_fingerprint TEXT" + )) + conn.execute(sa.text( + "ALTER TABLE devices ADD COLUMN ssh_host_key_first_seen TIMESTAMPTZ" + )) + conn.execute(sa.text( + "ALTER TABLE devices ADD COLUMN ssh_host_key_last_verified TIMESTAMPTZ" + )) + + # Grant poller_user UPDATE on SSH columns for TOFU host key persistence + conn.execute(sa.text( + "GRANT UPDATE (ssh_host_key_fingerprint, ssh_host_key_first_seen, ssh_host_key_last_verified) ON devices TO poller_user" + )) + + +def downgrade() -> None: + conn = op.get_bind() + + conn.execute(sa.text( + "REVOKE UPDATE (ssh_host_key_fingerprint, ssh_host_key_first_seen, ssh_host_key_last_verified) ON devices FROM poller_user" + )) + conn.execute(sa.text( + "ALTER TABLE devices DROP COLUMN ssh_host_key_last_verified" + )) + conn.execute(sa.text( + "ALTER TABLE devices DROP COLUMN ssh_host_key_first_seen" + )) + conn.execute(sa.text( + "ALTER TABLE devices DROP COLUMN ssh_host_key_fingerprint" + )) + conn.execute(sa.text( + "ALTER TABLE devices DROP COLUMN ssh_port" + )) diff --git a/poller/internal/bus/publisher.go b/poller/internal/bus/publisher.go index 0288dfe..0fec6e5 100644 --- a/poller/internal/bus/publisher.go +++ b/poller/internal/bus/publisher.go @@ -65,6 +65,19 @@ type PushRollbackEvent struct { PrePushCommitSHA string `json:"pre_push_commit_sha"` } +// ConfigSnapshotEvent is the payload published to NATS JetStream when a config +// backup is successfully collected from a device. The backend subscribes to +// "config.snapshot.>" to store snapshots and compute diffs. +type ConfigSnapshotEvent struct { + DeviceID string `json:"device_id"` + TenantID string `json:"tenant_id"` + RouterOSVersion string `json:"routeros_version,omitempty"` + CollectedAt string `json:"collected_at"` // RFC3339 + SHA256Hash string `json:"sha256_hash"` + ConfigText string `json:"config_text"` + NormalizationVersion int `json:"normalization_version"` +} + // PushAlertEvent triggers an alert for editor pushes (one-click rollback). type PushAlertEvent struct { DeviceID string `json:"device_id"` @@ -122,6 +135,7 @@ func NewPublisher(natsURL string) (*Publisher, error) { "device.firmware.>", "device.credential_changed.>", "config.changed.>", + "config.snapshot.>", "config.push.rollback.>", "config.push.alert.>", "audit.session.end.>", @@ -257,6 +271,33 @@ func (p *Publisher) PublishConfigChanged(ctx context.Context, event ConfigChange return nil } +// PublishConfigSnapshot publishes a config snapshot event to NATS JetStream. +// +// Events are published to "config.snapshot.create.{DeviceID}" so the Python +// backend can store the snapshot and compute diffs against the previous one. +func (p *Publisher) PublishConfigSnapshot(ctx context.Context, event ConfigSnapshotEvent) error { + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("marshalling config snapshot event: %w", err) + } + + subject := fmt.Sprintf("config.snapshot.create.%s", event.DeviceID) + + _, err = p.js.Publish(ctx, subject, data) + if err != nil { + return fmt.Errorf("publishing to %s: %w", subject, err) + } + + slog.Debug("published config snapshot event", + "device_id", event.DeviceID, + "tenant_id", event.TenantID, + "sha256_hash", event.SHA256Hash, + "subject", subject, + ) + + return nil +} + // PublishPushRollback publishes a push rollback event when a device goes offline // after a template or restore config push, triggering automatic rollback. func (p *Publisher) PublishPushRollback(ctx context.Context, event PushRollbackEvent) error { diff --git a/poller/internal/config/config.go b/poller/internal/config/config.go index bade04f..12032b3 100644 --- a/poller/internal/config/config.go +++ b/poller/internal/config/config.go @@ -90,6 +90,15 @@ type Config struct { // SSHMaxPerDevice is the maximum number of concurrent SSH relay sessions per device. SSHMaxPerDevice int + + // ConfigBackupIntervalSeconds is how often config backups are collected per device (default 6h = 21600s). + ConfigBackupIntervalSeconds int + + // ConfigBackupMaxConcurrent is the max number of concurrent config backup jobs. + ConfigBackupMaxConcurrent int + + // ConfigBackupCommandTimeoutSeconds is the per-command timeout for SSH config export. + ConfigBackupCommandTimeoutSeconds int } // knownInsecureEncryptionKey is the base64-encoded dev default encryption key. @@ -118,8 +127,11 @@ func Load() (*Config, error) { SSHRelayPort: getEnv("SSH_RELAY_PORT", "8080"), SSHIdleTimeout: getEnvInt("SSH_IDLE_TIMEOUT", 900), SSHMaxSessions: getEnvInt("SSH_MAX_SESSIONS", 200), - SSHMaxPerUser: getEnvInt("SSH_MAX_PER_USER", 10), - SSHMaxPerDevice: getEnvInt("SSH_MAX_PER_DEVICE", 20), + SSHMaxPerUser: getEnvInt("SSH_MAX_PER_USER", 10), + SSHMaxPerDevice: getEnvInt("SSH_MAX_PER_DEVICE", 20), + ConfigBackupIntervalSeconds: getEnvInt("CONFIG_BACKUP_INTERVAL", 21600), + ConfigBackupMaxConcurrent: getEnvInt("CONFIG_BACKUP_MAX_CONCURRENT", 10), + ConfigBackupCommandTimeoutSeconds: getEnvInt("CONFIG_BACKUP_COMMAND_TIMEOUT", 60), } if cfg.DatabaseURL == "" { diff --git a/poller/internal/observability/metrics.go b/poller/internal/observability/metrics.go index 71eaa15..8334a10 100644 --- a/poller/internal/observability/metrics.go +++ b/poller/internal/observability/metrics.go @@ -58,3 +58,23 @@ var CircuitBreakerResets = promauto.NewCounter(prometheus.CounterOpts{ Name: "mikrotik_circuit_breaker_resets_total", Help: "Total circuit breaker resets when a device recovers.", }) + +// ConfigBackupTotal counts config backup operations by status. +// Status labels: "success", "error", "skipped_offline", "skipped_auth_blocked", "skipped_hostkey_blocked". +var ConfigBackupTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mikrotik_config_backup_total", + Help: "Total config backup operations.", +}, []string{"status"}) + +// ConfigBackupDuration tracks the duration of individual config backup operations. +var ConfigBackupDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "mikrotik_config_backup_duration_seconds", + Help: "Duration of a single config backup operation in seconds.", + Buckets: []float64{1, 5, 10, 30, 60, 120, 300}, +}) + +// ConfigBackupActive tracks the number of concurrent config backup jobs running. +var ConfigBackupActive = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "mikrotik_config_backup_active", + Help: "Number of concurrent config backup jobs running.", +}) diff --git a/poller/internal/store/devices.go b/poller/internal/store/devices.go index e93b16a..aea42f6 100644 --- a/poller/internal/store/devices.go +++ b/poller/internal/store/devices.go @@ -22,6 +22,8 @@ type Device struct { MajorVersion *int TLSMode string // "insecure" or "portal_ca" CACertPEM *string // PEM-encoded CA cert (only populated when TLSMode = "portal_ca") + SSHPort int // SSH port for config backup (default 22) + SSHHostKeyFingerprint *string // TOFU SSH host key fingerprint (SHA256:base64) } // DeviceStore manages PostgreSQL connections for device data access. @@ -65,7 +67,9 @@ func (s *DeviceStore) FetchDevices(ctx context.Context) ([]Device, error) { d.routeros_version, d.routeros_major_version, d.tls_mode, - ca.cert_pem + ca.cert_pem, + COALESCE(d.ssh_port, 22), + d.ssh_host_key_fingerprint FROM devices d LEFT JOIN certificate_authorities ca ON d.tenant_id = ca.tenant_id @@ -95,6 +99,8 @@ func (s *DeviceStore) FetchDevices(ctx context.Context) ([]Device, error) { &d.MajorVersion, &d.TLSMode, &d.CACertPEM, + &d.SSHPort, + &d.SSHHostKeyFingerprint, ); err != nil { return nil, fmt.Errorf("scanning device row: %w", err) } @@ -122,7 +128,9 @@ func (s *DeviceStore) GetDevice(ctx context.Context, deviceID string) (Device, e d.routeros_version, d.routeros_major_version, d.tls_mode, - ca.cert_pem + ca.cert_pem, + COALESCE(d.ssh_port, 22), + d.ssh_host_key_fingerprint FROM devices d LEFT JOIN certificate_authorities ca ON d.tenant_id = ca.tenant_id @@ -142,6 +150,8 @@ func (s *DeviceStore) GetDevice(ctx context.Context, deviceID string) (Device, e &d.MajorVersion, &d.TLSMode, &d.CACertPEM, + &d.SSHPort, + &d.SSHHostKeyFingerprint, ) if err != nil { return Device{}, fmt.Errorf("querying device %s: %w", deviceID, err) @@ -149,6 +159,17 @@ func (s *DeviceStore) GetDevice(ctx context.Context, deviceID string) (Device, e return d, nil } +// UpdateSSHHostKey stores the SSH host key fingerprint for TOFU verification. +// Called after a successful first-connect to persist the observed fingerprint. +func (s *DeviceStore) UpdateSSHHostKey(ctx context.Context, deviceID string, fingerprint string) error { + const query = `UPDATE devices SET ssh_host_key_fingerprint = $1, ssh_host_key_first_seen = COALESCE(ssh_host_key_first_seen, NOW()), ssh_host_key_last_verified = NOW() WHERE id = $2` + _, err := s.pool.Exec(ctx, query, fingerprint, deviceID) + if err != nil { + return fmt.Errorf("updating SSH host key for device %s: %w", deviceID, err) + } + return nil +} + // Pool returns the underlying pgxpool.Pool for shared use by other subsystems // (e.g., credential cache key_access_log inserts). func (s *DeviceStore) Pool() *pgxpool.Pool {