feat(poller): wire tunnel manager and SSH relay into main

Add TunnelManager, TunnelResponder, SSH relay server, and SSH relay HTTP
server to the poller startup sequence with env-configurable port ranges,
idle timeouts, and session limits. Extends graceful shutdown to cover the
HTTP server (5s context), tunnel manager, and SSH relay server via defer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jason Staack
2026-03-12 15:35:55 -05:00
parent c73466c5e0
commit cb427272ed
2 changed files with 94 additions and 0 deletions

View File

@@ -9,6 +9,7 @@ package main
import (
"context"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
@@ -21,7 +22,9 @@ import (
"github.com/mikrotik-portal/poller/internal/config"
"github.com/mikrotik-portal/poller/internal/observability"
"github.com/mikrotik-portal/poller/internal/poller"
"github.com/mikrotik-portal/poller/internal/sshrelay"
"github.com/mikrotik-portal/poller/internal/store"
"github.com/mikrotik-portal/poller/internal/tunnel"
"github.com/mikrotik-portal/poller/internal/vault"
)
@@ -186,6 +189,56 @@ func main() {
defer credentialSub.Stop()
slog.Info("NATS credential subscriber started (device.credential_changed.>)")
// -----------------------------------------------------------------------
// Initialize WinBox tunnel manager
// -----------------------------------------------------------------------
tunnelMgr := tunnel.NewManager(
cfg.TunnelPortMin,
cfg.TunnelPortMax,
time.Duration(cfg.TunnelIdleTimeout)*time.Second,
deviceStore,
credentialCache,
)
defer tunnelMgr.Shutdown()
slog.Info("tunnel manager initialized",
"port_min", cfg.TunnelPortMin,
"port_max", cfg.TunnelPortMax,
"idle_timeout_s", cfg.TunnelIdleTimeout,
)
// -----------------------------------------------------------------------
// Subscribe NATS tunnel responder
// -----------------------------------------------------------------------
tunnelResp := bus.NewTunnelResponder(publisher.Conn(), tunnelMgr, deviceStore, credentialCache)
if err := tunnelResp.Subscribe(); err != nil {
slog.Error("failed to subscribe tunnel responder", "error", err)
os.Exit(1)
}
defer tunnelResp.Stop()
slog.Info("NATS tunnel responder started (tunnel.*)")
// -----------------------------------------------------------------------
// Initialize SSH relay server and HTTP listener
// -----------------------------------------------------------------------
sshServer := sshrelay.NewServer(redisClient, credentialCache, deviceStore, sshrelay.Config{
IdleTimeout: time.Duration(cfg.SSHIdleTimeout) * time.Second,
MaxSessions: cfg.SSHMaxSessions,
MaxPerUser: cfg.SSHMaxPerUser,
MaxPerDevice: cfg.SSHMaxPerDevice,
})
defer sshServer.Shutdown()
httpServer := &http.Server{
Addr: ":" + cfg.SSHRelayPort,
Handler: sshServer.Handler(),
}
go func() {
slog.Info("SSH relay HTTP server starting", "port", cfg.SSHRelayPort)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("SSH relay HTTP server error", "error", err)
}
}()
// -----------------------------------------------------------------------
// Start observability HTTP server (Prometheus metrics + health endpoint)
// -----------------------------------------------------------------------
@@ -227,5 +280,12 @@ func main() {
os.Exit(1)
}
// Gracefully shut down the SSH relay HTTP server.
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := httpServer.Shutdown(shutdownCtx); err != nil {
slog.Warn("SSH relay HTTP server shutdown error", "error", err)
}
slog.Info("poller shutdown complete")
}

View File

@@ -64,6 +64,32 @@ type Config struct {
// Each API call (DetectVersion, CollectInterfaces, etc.) is wrapped with
// this timeout to prevent indefinite blocking on unresponsive devices.
CommandTimeoutSeconds int
// TunnelPortMin is the lower bound of the local TCP port pool for WinBox tunnels.
TunnelPortMin int
// TunnelPortMax is the upper bound of the local TCP port pool for WinBox tunnels.
TunnelPortMax int
// TunnelIdleTimeout is the number of seconds a WinBox tunnel may remain idle
// with no active connections before it is automatically closed.
TunnelIdleTimeout int
// SSHRelayPort is the TCP port on which the SSH relay HTTP server listens.
SSHRelayPort string
// SSHIdleTimeout is the number of seconds an SSH relay session may remain
// idle before it is automatically terminated.
SSHIdleTimeout int
// SSHMaxSessions is the maximum total number of concurrent SSH relay sessions.
SSHMaxSessions int
// SSHMaxPerUser is the maximum number of concurrent SSH relay sessions per user.
SSHMaxPerUser int
// SSHMaxPerDevice is the maximum number of concurrent SSH relay sessions per device.
SSHMaxPerDevice int
}
// knownInsecureEncryptionKey is the base64-encoded dev default encryption key.
@@ -86,6 +112,14 @@ func Load() (*Config, error) {
CircuitBreakerBaseBackoffSeconds: getEnvInt("CIRCUIT_BREAKER_BASE_BACKOFF_SECONDS", 30),
CircuitBreakerMaxBackoffSeconds: getEnvInt("CIRCUIT_BREAKER_MAX_BACKOFF_SECONDS", 900),
CommandTimeoutSeconds: getEnvInt("COMMAND_TIMEOUT_SECONDS", 30),
TunnelPortMin: getEnvInt("TUNNEL_PORT_MIN", 49000),
TunnelPortMax: getEnvInt("TUNNEL_PORT_MAX", 49100),
TunnelIdleTimeout: getEnvInt("TUNNEL_IDLE_TIMEOUT", 300),
SSHRelayPort: getEnv("SSH_RELAY_PORT", "8080"),
SSHIdleTimeout: getEnvInt("SSH_IDLE_TIMEOUT", 900),
SSHMaxSessions: getEnvInt("SSH_MAX_SESSIONS", 200),
SSHMaxPerUser: getEnvInt("SSH_MAX_PER_USER", 10),
SSHMaxPerDevice: getEnvInt("SSH_MAX_PER_DEVICE", 20),
}
if cfg.DatabaseURL == "" {