From cb427272ed842c478243540bca0f111dc8201ba0 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Thu, 12 Mar 2026 15:35:55 -0500 Subject: [PATCH] feat(poller): wire tunnel manager and SSH relay into main Add TunnelManager, TunnelResponder, SSH relay server, and SSH relay HTTP server to the poller startup sequence with env-configurable port ranges, idle timeouts, and session limits. Extends graceful shutdown to cover the HTTP server (5s context), tunnel manager, and SSH relay server via defer. Co-Authored-By: Claude Opus 4.6 --- poller/cmd/poller/main.go | 60 ++++++++++++++++++++++++++++++++ poller/internal/config/config.go | 34 ++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/poller/cmd/poller/main.go b/poller/cmd/poller/main.go index f1d212b..13fa705 100644 --- a/poller/cmd/poller/main.go +++ b/poller/cmd/poller/main.go @@ -9,6 +9,7 @@ package main import ( "context" "log/slog" + "net/http" "os" "os/signal" "syscall" @@ -21,7 +22,9 @@ import ( "github.com/mikrotik-portal/poller/internal/config" "github.com/mikrotik-portal/poller/internal/observability" "github.com/mikrotik-portal/poller/internal/poller" + "github.com/mikrotik-portal/poller/internal/sshrelay" "github.com/mikrotik-portal/poller/internal/store" + "github.com/mikrotik-portal/poller/internal/tunnel" "github.com/mikrotik-portal/poller/internal/vault" ) @@ -186,6 +189,56 @@ func main() { defer credentialSub.Stop() slog.Info("NATS credential subscriber started (device.credential_changed.>)") + // ----------------------------------------------------------------------- + // Initialize WinBox tunnel manager + // ----------------------------------------------------------------------- + tunnelMgr := tunnel.NewManager( + cfg.TunnelPortMin, + cfg.TunnelPortMax, + time.Duration(cfg.TunnelIdleTimeout)*time.Second, + deviceStore, + credentialCache, + ) + defer tunnelMgr.Shutdown() + slog.Info("tunnel manager initialized", + "port_min", cfg.TunnelPortMin, + "port_max", cfg.TunnelPortMax, + "idle_timeout_s", cfg.TunnelIdleTimeout, + ) + + // ----------------------------------------------------------------------- + // Subscribe NATS tunnel responder + // ----------------------------------------------------------------------- + tunnelResp := bus.NewTunnelResponder(publisher.Conn(), tunnelMgr, deviceStore, credentialCache) + if err := tunnelResp.Subscribe(); err != nil { + slog.Error("failed to subscribe tunnel responder", "error", err) + os.Exit(1) + } + defer tunnelResp.Stop() + slog.Info("NATS tunnel responder started (tunnel.*)") + + // ----------------------------------------------------------------------- + // Initialize SSH relay server and HTTP listener + // ----------------------------------------------------------------------- + sshServer := sshrelay.NewServer(redisClient, credentialCache, deviceStore, sshrelay.Config{ + IdleTimeout: time.Duration(cfg.SSHIdleTimeout) * time.Second, + MaxSessions: cfg.SSHMaxSessions, + MaxPerUser: cfg.SSHMaxPerUser, + MaxPerDevice: cfg.SSHMaxPerDevice, + }) + defer sshServer.Shutdown() + + httpServer := &http.Server{ + Addr: ":" + cfg.SSHRelayPort, + Handler: sshServer.Handler(), + } + go func() { + slog.Info("SSH relay HTTP server starting", "port", cfg.SSHRelayPort) + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + slog.Error("SSH relay HTTP server error", "error", err) + } + }() + // ----------------------------------------------------------------------- // Start observability HTTP server (Prometheus metrics + health endpoint) // ----------------------------------------------------------------------- @@ -227,5 +280,12 @@ func main() { os.Exit(1) } + // Gracefully shut down the SSH relay HTTP server. + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + if err := httpServer.Shutdown(shutdownCtx); err != nil { + slog.Warn("SSH relay HTTP server shutdown error", "error", err) + } + slog.Info("poller shutdown complete") } diff --git a/poller/internal/config/config.go b/poller/internal/config/config.go index 3f99603..bade04f 100644 --- a/poller/internal/config/config.go +++ b/poller/internal/config/config.go @@ -64,6 +64,32 @@ type Config struct { // Each API call (DetectVersion, CollectInterfaces, etc.) is wrapped with // this timeout to prevent indefinite blocking on unresponsive devices. CommandTimeoutSeconds int + + // TunnelPortMin is the lower bound of the local TCP port pool for WinBox tunnels. + TunnelPortMin int + + // TunnelPortMax is the upper bound of the local TCP port pool for WinBox tunnels. + TunnelPortMax int + + // TunnelIdleTimeout is the number of seconds a WinBox tunnel may remain idle + // with no active connections before it is automatically closed. + TunnelIdleTimeout int + + // SSHRelayPort is the TCP port on which the SSH relay HTTP server listens. + SSHRelayPort string + + // SSHIdleTimeout is the number of seconds an SSH relay session may remain + // idle before it is automatically terminated. + SSHIdleTimeout int + + // SSHMaxSessions is the maximum total number of concurrent SSH relay sessions. + SSHMaxSessions int + + // SSHMaxPerUser is the maximum number of concurrent SSH relay sessions per user. + SSHMaxPerUser int + + // SSHMaxPerDevice is the maximum number of concurrent SSH relay sessions per device. + SSHMaxPerDevice int } // knownInsecureEncryptionKey is the base64-encoded dev default encryption key. @@ -86,6 +112,14 @@ func Load() (*Config, error) { CircuitBreakerBaseBackoffSeconds: getEnvInt("CIRCUIT_BREAKER_BASE_BACKOFF_SECONDS", 30), CircuitBreakerMaxBackoffSeconds: getEnvInt("CIRCUIT_BREAKER_MAX_BACKOFF_SECONDS", 900), CommandTimeoutSeconds: getEnvInt("COMMAND_TIMEOUT_SECONDS", 30), + TunnelPortMin: getEnvInt("TUNNEL_PORT_MIN", 49000), + TunnelPortMax: getEnvInt("TUNNEL_PORT_MAX", 49100), + TunnelIdleTimeout: getEnvInt("TUNNEL_IDLE_TIMEOUT", 300), + SSHRelayPort: getEnv("SSH_RELAY_PORT", "8080"), + SSHIdleTimeout: getEnvInt("SSH_IDLE_TIMEOUT", 900), + SSHMaxSessions: getEnvInt("SSH_MAX_SESSIONS", 200), + SSHMaxPerUser: getEnvInt("SSH_MAX_PER_USER", 10), + SSHMaxPerDevice: getEnvInt("SSH_MAX_PER_DEVICE", 20), } if cfg.DatabaseURL == "" {