feat(02-01): add config backup env vars, NATS event, device SSH fields, migration, metrics
- Config: CONFIG_BACKUP_INTERVAL (21600s), CONFIG_BACKUP_MAX_CONCURRENT (10), CONFIG_BACKUP_COMMAND_TIMEOUT (60s) - NATS: ConfigSnapshotEvent type, PublishConfigSnapshot method, config.snapshot.> stream subject - Device: SSHPort/SSHHostKeyFingerprint fields, UpdateSSHHostKey method, updated queries/scans - Migration 028: ssh_port, ssh_host_key_fingerprint, timestamp columns with poller_user grants - Metrics: ConfigBackupTotal (counter), ConfigBackupDuration (histogram), ConfigBackupActive (gauge) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
64
backend/alembic/versions/028_device_ssh_host_key.py
Normal file
64
backend/alembic/versions/028_device_ssh_host_key.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
"""Add SSH host key columns to devices table.
|
||||||
|
|
||||||
|
Adds columns for SSH config backup support:
|
||||||
|
- ssh_port: SSH port override (default 22)
|
||||||
|
- ssh_host_key_fingerprint: TOFU host key fingerprint (SHA256:base64)
|
||||||
|
- ssh_host_key_first_seen: when the host key was first observed
|
||||||
|
- ssh_host_key_last_verified: when the host key was last verified
|
||||||
|
|
||||||
|
Grants UPDATE on SSH columns to poller_user for TOFU persistence.
|
||||||
|
|
||||||
|
Revision ID: 028
|
||||||
|
Revises: 027
|
||||||
|
Create Date: 2026-03-13
|
||||||
|
"""
|
||||||
|
|
||||||
|
revision = "028"
|
||||||
|
down_revision = "027"
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"ALTER TABLE devices ADD COLUMN ssh_port INTEGER DEFAULT 22"
|
||||||
|
))
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"ALTER TABLE devices ADD COLUMN ssh_host_key_fingerprint TEXT"
|
||||||
|
))
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"ALTER TABLE devices ADD COLUMN ssh_host_key_first_seen TIMESTAMPTZ"
|
||||||
|
))
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"ALTER TABLE devices ADD COLUMN ssh_host_key_last_verified TIMESTAMPTZ"
|
||||||
|
))
|
||||||
|
|
||||||
|
# Grant poller_user UPDATE on SSH columns for TOFU host key persistence
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"GRANT UPDATE (ssh_host_key_fingerprint, ssh_host_key_first_seen, ssh_host_key_last_verified) ON devices TO poller_user"
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"REVOKE UPDATE (ssh_host_key_fingerprint, ssh_host_key_first_seen, ssh_host_key_last_verified) ON devices FROM poller_user"
|
||||||
|
))
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"ALTER TABLE devices DROP COLUMN ssh_host_key_last_verified"
|
||||||
|
))
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"ALTER TABLE devices DROP COLUMN ssh_host_key_first_seen"
|
||||||
|
))
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"ALTER TABLE devices DROP COLUMN ssh_host_key_fingerprint"
|
||||||
|
))
|
||||||
|
conn.execute(sa.text(
|
||||||
|
"ALTER TABLE devices DROP COLUMN ssh_port"
|
||||||
|
))
|
||||||
@@ -65,6 +65,19 @@ type PushRollbackEvent struct {
|
|||||||
PrePushCommitSHA string `json:"pre_push_commit_sha"`
|
PrePushCommitSHA string `json:"pre_push_commit_sha"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConfigSnapshotEvent is the payload published to NATS JetStream when a config
|
||||||
|
// backup is successfully collected from a device. The backend subscribes to
|
||||||
|
// "config.snapshot.>" to store snapshots and compute diffs.
|
||||||
|
type ConfigSnapshotEvent struct {
|
||||||
|
DeviceID string `json:"device_id"`
|
||||||
|
TenantID string `json:"tenant_id"`
|
||||||
|
RouterOSVersion string `json:"routeros_version,omitempty"`
|
||||||
|
CollectedAt string `json:"collected_at"` // RFC3339
|
||||||
|
SHA256Hash string `json:"sha256_hash"`
|
||||||
|
ConfigText string `json:"config_text"`
|
||||||
|
NormalizationVersion int `json:"normalization_version"`
|
||||||
|
}
|
||||||
|
|
||||||
// PushAlertEvent triggers an alert for editor pushes (one-click rollback).
|
// PushAlertEvent triggers an alert for editor pushes (one-click rollback).
|
||||||
type PushAlertEvent struct {
|
type PushAlertEvent struct {
|
||||||
DeviceID string `json:"device_id"`
|
DeviceID string `json:"device_id"`
|
||||||
@@ -122,6 +135,7 @@ func NewPublisher(natsURL string) (*Publisher, error) {
|
|||||||
"device.firmware.>",
|
"device.firmware.>",
|
||||||
"device.credential_changed.>",
|
"device.credential_changed.>",
|
||||||
"config.changed.>",
|
"config.changed.>",
|
||||||
|
"config.snapshot.>",
|
||||||
"config.push.rollback.>",
|
"config.push.rollback.>",
|
||||||
"config.push.alert.>",
|
"config.push.alert.>",
|
||||||
"audit.session.end.>",
|
"audit.session.end.>",
|
||||||
@@ -257,6 +271,33 @@ func (p *Publisher) PublishConfigChanged(ctx context.Context, event ConfigChange
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PublishConfigSnapshot publishes a config snapshot event to NATS JetStream.
|
||||||
|
//
|
||||||
|
// Events are published to "config.snapshot.create.{DeviceID}" so the Python
|
||||||
|
// backend can store the snapshot and compute diffs against the previous one.
|
||||||
|
func (p *Publisher) PublishConfigSnapshot(ctx context.Context, event ConfigSnapshotEvent) error {
|
||||||
|
data, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshalling config snapshot event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
subject := fmt.Sprintf("config.snapshot.create.%s", event.DeviceID)
|
||||||
|
|
||||||
|
_, err = p.js.Publish(ctx, subject, data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("publishing to %s: %w", subject, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Debug("published config snapshot event",
|
||||||
|
"device_id", event.DeviceID,
|
||||||
|
"tenant_id", event.TenantID,
|
||||||
|
"sha256_hash", event.SHA256Hash,
|
||||||
|
"subject", subject,
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// PublishPushRollback publishes a push rollback event when a device goes offline
|
// PublishPushRollback publishes a push rollback event when a device goes offline
|
||||||
// after a template or restore config push, triggering automatic rollback.
|
// after a template or restore config push, triggering automatic rollback.
|
||||||
func (p *Publisher) PublishPushRollback(ctx context.Context, event PushRollbackEvent) error {
|
func (p *Publisher) PublishPushRollback(ctx context.Context, event PushRollbackEvent) error {
|
||||||
|
|||||||
@@ -90,6 +90,15 @@ type Config struct {
|
|||||||
|
|
||||||
// SSHMaxPerDevice is the maximum number of concurrent SSH relay sessions per device.
|
// SSHMaxPerDevice is the maximum number of concurrent SSH relay sessions per device.
|
||||||
SSHMaxPerDevice int
|
SSHMaxPerDevice int
|
||||||
|
|
||||||
|
// ConfigBackupIntervalSeconds is how often config backups are collected per device (default 6h = 21600s).
|
||||||
|
ConfigBackupIntervalSeconds int
|
||||||
|
|
||||||
|
// ConfigBackupMaxConcurrent is the max number of concurrent config backup jobs.
|
||||||
|
ConfigBackupMaxConcurrent int
|
||||||
|
|
||||||
|
// ConfigBackupCommandTimeoutSeconds is the per-command timeout for SSH config export.
|
||||||
|
ConfigBackupCommandTimeoutSeconds int
|
||||||
}
|
}
|
||||||
|
|
||||||
// knownInsecureEncryptionKey is the base64-encoded dev default encryption key.
|
// knownInsecureEncryptionKey is the base64-encoded dev default encryption key.
|
||||||
@@ -120,6 +129,9 @@ func Load() (*Config, error) {
|
|||||||
SSHMaxSessions: getEnvInt("SSH_MAX_SESSIONS", 200),
|
SSHMaxSessions: getEnvInt("SSH_MAX_SESSIONS", 200),
|
||||||
SSHMaxPerUser: getEnvInt("SSH_MAX_PER_USER", 10),
|
SSHMaxPerUser: getEnvInt("SSH_MAX_PER_USER", 10),
|
||||||
SSHMaxPerDevice: getEnvInt("SSH_MAX_PER_DEVICE", 20),
|
SSHMaxPerDevice: getEnvInt("SSH_MAX_PER_DEVICE", 20),
|
||||||
|
ConfigBackupIntervalSeconds: getEnvInt("CONFIG_BACKUP_INTERVAL", 21600),
|
||||||
|
ConfigBackupMaxConcurrent: getEnvInt("CONFIG_BACKUP_MAX_CONCURRENT", 10),
|
||||||
|
ConfigBackupCommandTimeoutSeconds: getEnvInt("CONFIG_BACKUP_COMMAND_TIMEOUT", 60),
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.DatabaseURL == "" {
|
if cfg.DatabaseURL == "" {
|
||||||
|
|||||||
@@ -58,3 +58,23 @@ var CircuitBreakerResets = promauto.NewCounter(prometheus.CounterOpts{
|
|||||||
Name: "mikrotik_circuit_breaker_resets_total",
|
Name: "mikrotik_circuit_breaker_resets_total",
|
||||||
Help: "Total circuit breaker resets when a device recovers.",
|
Help: "Total circuit breaker resets when a device recovers.",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// ConfigBackupTotal counts config backup operations by status.
|
||||||
|
// Status labels: "success", "error", "skipped_offline", "skipped_auth_blocked", "skipped_hostkey_blocked".
|
||||||
|
var ConfigBackupTotal = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "mikrotik_config_backup_total",
|
||||||
|
Help: "Total config backup operations.",
|
||||||
|
}, []string{"status"})
|
||||||
|
|
||||||
|
// ConfigBackupDuration tracks the duration of individual config backup operations.
|
||||||
|
var ConfigBackupDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Name: "mikrotik_config_backup_duration_seconds",
|
||||||
|
Help: "Duration of a single config backup operation in seconds.",
|
||||||
|
Buckets: []float64{1, 5, 10, 30, 60, 120, 300},
|
||||||
|
})
|
||||||
|
|
||||||
|
// ConfigBackupActive tracks the number of concurrent config backup jobs running.
|
||||||
|
var ConfigBackupActive = promauto.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Name: "mikrotik_config_backup_active",
|
||||||
|
Help: "Number of concurrent config backup jobs running.",
|
||||||
|
})
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ type Device struct {
|
|||||||
MajorVersion *int
|
MajorVersion *int
|
||||||
TLSMode string // "insecure" or "portal_ca"
|
TLSMode string // "insecure" or "portal_ca"
|
||||||
CACertPEM *string // PEM-encoded CA cert (only populated when TLSMode = "portal_ca")
|
CACertPEM *string // PEM-encoded CA cert (only populated when TLSMode = "portal_ca")
|
||||||
|
SSHPort int // SSH port for config backup (default 22)
|
||||||
|
SSHHostKeyFingerprint *string // TOFU SSH host key fingerprint (SHA256:base64)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeviceStore manages PostgreSQL connections for device data access.
|
// DeviceStore manages PostgreSQL connections for device data access.
|
||||||
@@ -65,7 +67,9 @@ func (s *DeviceStore) FetchDevices(ctx context.Context) ([]Device, error) {
|
|||||||
d.routeros_version,
|
d.routeros_version,
|
||||||
d.routeros_major_version,
|
d.routeros_major_version,
|
||||||
d.tls_mode,
|
d.tls_mode,
|
||||||
ca.cert_pem
|
ca.cert_pem,
|
||||||
|
COALESCE(d.ssh_port, 22),
|
||||||
|
d.ssh_host_key_fingerprint
|
||||||
FROM devices d
|
FROM devices d
|
||||||
LEFT JOIN certificate_authorities ca
|
LEFT JOIN certificate_authorities ca
|
||||||
ON d.tenant_id = ca.tenant_id
|
ON d.tenant_id = ca.tenant_id
|
||||||
@@ -95,6 +99,8 @@ func (s *DeviceStore) FetchDevices(ctx context.Context) ([]Device, error) {
|
|||||||
&d.MajorVersion,
|
&d.MajorVersion,
|
||||||
&d.TLSMode,
|
&d.TLSMode,
|
||||||
&d.CACertPEM,
|
&d.CACertPEM,
|
||||||
|
&d.SSHPort,
|
||||||
|
&d.SSHHostKeyFingerprint,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return nil, fmt.Errorf("scanning device row: %w", err)
|
return nil, fmt.Errorf("scanning device row: %w", err)
|
||||||
}
|
}
|
||||||
@@ -122,7 +128,9 @@ func (s *DeviceStore) GetDevice(ctx context.Context, deviceID string) (Device, e
|
|||||||
d.routeros_version,
|
d.routeros_version,
|
||||||
d.routeros_major_version,
|
d.routeros_major_version,
|
||||||
d.tls_mode,
|
d.tls_mode,
|
||||||
ca.cert_pem
|
ca.cert_pem,
|
||||||
|
COALESCE(d.ssh_port, 22),
|
||||||
|
d.ssh_host_key_fingerprint
|
||||||
FROM devices d
|
FROM devices d
|
||||||
LEFT JOIN certificate_authorities ca
|
LEFT JOIN certificate_authorities ca
|
||||||
ON d.tenant_id = ca.tenant_id
|
ON d.tenant_id = ca.tenant_id
|
||||||
@@ -142,6 +150,8 @@ func (s *DeviceStore) GetDevice(ctx context.Context, deviceID string) (Device, e
|
|||||||
&d.MajorVersion,
|
&d.MajorVersion,
|
||||||
&d.TLSMode,
|
&d.TLSMode,
|
||||||
&d.CACertPEM,
|
&d.CACertPEM,
|
||||||
|
&d.SSHPort,
|
||||||
|
&d.SSHHostKeyFingerprint,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Device{}, fmt.Errorf("querying device %s: %w", deviceID, err)
|
return Device{}, fmt.Errorf("querying device %s: %w", deviceID, err)
|
||||||
@@ -149,6 +159,17 @@ func (s *DeviceStore) GetDevice(ctx context.Context, deviceID string) (Device, e
|
|||||||
return d, nil
|
return d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateSSHHostKey stores the SSH host key fingerprint for TOFU verification.
|
||||||
|
// Called after a successful first-connect to persist the observed fingerprint.
|
||||||
|
func (s *DeviceStore) UpdateSSHHostKey(ctx context.Context, deviceID string, fingerprint string) error {
|
||||||
|
const query = `UPDATE devices SET ssh_host_key_fingerprint = $1, ssh_host_key_first_seen = COALESCE(ssh_host_key_first_seen, NOW()), ssh_host_key_last_verified = NOW() WHERE id = $2`
|
||||||
|
_, err := s.pool.Exec(ctx, query, fingerprint, deviceID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("updating SSH host key for device %s: %w", deviceID, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Pool returns the underlying pgxpool.Pool for shared use by other subsystems
|
// Pool returns the underlying pgxpool.Pool for shared use by other subsystems
|
||||||
// (e.g., credential cache key_access_log inserts).
|
// (e.g., credential cache key_access_log inserts).
|
||||||
func (s *DeviceStore) Pool() *pgxpool.Pool {
|
func (s *DeviceStore) Pool() *pgxpool.Pool {
|
||||||
|
|||||||
Reference in New Issue
Block a user