From 7a6ebdca89f3f9abb79b836add86ce248ab57666 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Thu, 12 Mar 2026 15:28:56 -0500 Subject: [PATCH] feat(poller): add tunnel manager with idle cleanup and status tracking Implements Manager which orchestrates WinBox tunnel lifecycle: open, close, idle cleanup, and status queries. Uses PortPool and Tunnel from Tasks 1.2/1.3. DeviceStore and CredentialCache wired in for Task 1.5. Co-Authored-By: Claude Opus 4.6 --- poller/internal/tunnel/manager.go | 198 +++++++++++++++++++++++++ poller/internal/tunnel/manager_test.go | 85 +++++++++++ 2 files changed, 283 insertions(+) create mode 100644 poller/internal/tunnel/manager.go create mode 100644 poller/internal/tunnel/manager_test.go diff --git a/poller/internal/tunnel/manager.go b/poller/internal/tunnel/manager.go new file mode 100644 index 0000000..ec28e8b --- /dev/null +++ b/poller/internal/tunnel/manager.go @@ -0,0 +1,198 @@ +package tunnel + +import ( + "context" + "fmt" + "log/slog" + "net" + "sync" + "time" + + "github.com/google/uuid" + "github.com/mikrotik-portal/poller/internal/store" + "github.com/mikrotik-portal/poller/internal/vault" +) + +// OpenTunnelResponse is returned by Manager.OpenTunnel. +type OpenTunnelResponse struct { + TunnelID string `json:"tunnel_id"` + LocalPort int `json:"local_port"` +} + +// TunnelStatus is a snapshot of a tunnel's runtime state. +type TunnelStatus struct { + TunnelID string `json:"tunnel_id"` + DeviceID string `json:"device_id"` + LocalPort int `json:"local_port"` + ActiveConns int64 `json:"active_conns"` + IdleSeconds int `json:"idle_seconds"` + CreatedAt string `json:"created_at"` +} + +// Manager orchestrates the lifecycle of WinBox tunnels: open, close, idle +// cleanup, and status queries. +type Manager struct { + mu sync.Mutex + tunnels map[string]*Tunnel + portPool *PortPool + idleTime time.Duration + deviceStore *store.DeviceStore + credCache *vault.CredentialCache + cancel context.CancelFunc +} + +// NewManager creates a Manager with ports in [portMin, portMax] and an idle +// timeout of idleTime. deviceStore and credCache may be nil for tests. +func NewManager(portMin, portMax int, idleTime time.Duration, ds *store.DeviceStore, cc *vault.CredentialCache) *Manager { + ctx, cancel := context.WithCancel(context.Background()) + m := &Manager{ + tunnels: make(map[string]*Tunnel), + portPool: NewPortPool(portMin, portMax), + idleTime: idleTime, + deviceStore: ds, + credCache: cc, + cancel: cancel, + } + go m.idleLoop(ctx) + return m +} + +// OpenTunnel allocates a local port, starts a TCP listener, and begins +// proxying connections to remoteAddr. +func (m *Manager) OpenTunnel(deviceID, tenantID, userID, remoteAddr string) (*OpenTunnelResponse, error) { + port, err := m.portPool.Allocate() + if err != nil { + return nil, err + } + + ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + m.portPool.Release(port) + return nil, fmt.Errorf("failed to listen on port %d: %w", port, err) + } + + ctx, cancel := context.WithCancel(context.Background()) + tun := &Tunnel{ + ID: uuid.New().String(), + DeviceID: deviceID, + TenantID: tenantID, + UserID: userID, + LocalPort: port, + RemoteAddr: remoteAddr, + CreatedAt: time.Now(), + LastActive: time.Now().UnixNano(), + listener: ln, + ctx: ctx, + cancel: cancel, + } + + m.mu.Lock() + m.tunnels[tun.ID] = tun + m.mu.Unlock() + + go tun.accept() + + slog.Info("tunnel opened", + "tunnel_id", tun.ID, + "device_id", deviceID, + "tenant_id", tenantID, + "port", port, + "remote", remoteAddr, + ) + + return &OpenTunnelResponse{TunnelID: tun.ID, LocalPort: port}, nil +} + +// CloseTunnel stops the tunnel identified by tunnelID and releases its port. +func (m *Manager) CloseTunnel(tunnelID string) error { + m.mu.Lock() + tun, ok := m.tunnels[tunnelID] + if !ok { + m.mu.Unlock() + return fmt.Errorf("tunnel not found: %s", tunnelID) + } + delete(m.tunnels, tunnelID) + m.mu.Unlock() + + tun.Close() + m.portPool.Release(tun.LocalPort) + return nil +} + +// GetTunnel returns the status of a single tunnel by ID. +func (m *Manager) GetTunnel(tunnelID string) (*TunnelStatus, error) { + m.mu.Lock() + tun, ok := m.tunnels[tunnelID] + m.mu.Unlock() + if !ok { + return nil, fmt.Errorf("tunnel not found: %s", tunnelID) + } + return tunnelStatusFrom(tun), nil +} + +// ListTunnels returns the status of all tunnels for a given deviceID. +func (m *Manager) ListTunnels(deviceID string) []TunnelStatus { + m.mu.Lock() + defer m.mu.Unlock() + var out []TunnelStatus + for _, tun := range m.tunnels { + if tun.DeviceID == deviceID { + out = append(out, *tunnelStatusFrom(tun)) + } + } + return out +} + +// Shutdown closes all tunnels and stops the idle cleanup loop. +func (m *Manager) Shutdown() { + m.cancel() + m.mu.Lock() + ids := make([]string, 0, len(m.tunnels)) + for id := range m.tunnels { + ids = append(ids, id) + } + m.mu.Unlock() + for _, id := range ids { + m.CloseTunnel(id) //nolint:errcheck + } +} + +func (m *Manager) idleLoop(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.cleanupIdle() + } + } +} + +func (m *Manager) cleanupIdle() { + m.mu.Lock() + var toClose []string + for id, tun := range m.tunnels { + if tun.IdleDuration() > m.idleTime && tun.ActiveConns() == 0 { + toClose = append(toClose, id) + } + } + m.mu.Unlock() + + for _, id := range toClose { + slog.Info("tunnel idle timeout", "tunnel_id", id) + m.CloseTunnel(id) //nolint:errcheck + } +} + +func tunnelStatusFrom(tun *Tunnel) *TunnelStatus { + return &TunnelStatus{ + TunnelID: tun.ID, + DeviceID: tun.DeviceID, + LocalPort: tun.LocalPort, + ActiveConns: tun.ActiveConns(), + IdleSeconds: int(tun.IdleDuration().Seconds()), + CreatedAt: tun.CreatedAt.Format(time.RFC3339), + } +} diff --git a/poller/internal/tunnel/manager_test.go b/poller/internal/tunnel/manager_test.go new file mode 100644 index 0000000..ac016ed --- /dev/null +++ b/poller/internal/tunnel/manager_test.go @@ -0,0 +1,85 @@ +package tunnel + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestManager_OpenTunnel(t *testing.T) { + routerAddr, cleanup := mockRouter(t) + defer cleanup() + + mgr := NewManager(49000, 49010, 5*time.Minute, nil, nil) + defer mgr.Shutdown() + + resp, err := mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr) + require.NoError(t, err) + assert.NotEmpty(t, resp.TunnelID) + assert.GreaterOrEqual(t, resp.LocalPort, 49000) + assert.LessOrEqual(t, resp.LocalPort, 49010) +} + +func TestManager_CloseTunnel(t *testing.T) { + routerAddr, cleanup := mockRouter(t) + defer cleanup() + + mgr := NewManager(49000, 49010, 5*time.Minute, nil, nil) + defer mgr.Shutdown() + + resp, _ := mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr) + err := mgr.CloseTunnel(resp.TunnelID) + assert.NoError(t, err) + + // Port should be released + resp2, err := mgr.OpenTunnel("dev-2", "ten-1", "usr-1", routerAddr) + require.NoError(t, err) + assert.Equal(t, resp.LocalPort, resp2.LocalPort) // reused +} + +func TestManager_PortExhaustion(t *testing.T) { + routerAddr, cleanup := mockRouter(t) + defer cleanup() + + mgr := NewManager(49000, 49001, 5*time.Minute, nil, nil) // 2 ports + defer mgr.Shutdown() + + _, err := mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr) + require.NoError(t, err) + _, err = mgr.OpenTunnel("dev-2", "ten-1", "usr-1", routerAddr) + require.NoError(t, err) + _, err = mgr.OpenTunnel("dev-3", "ten-1", "usr-1", routerAddr) + assert.Error(t, err) +} + +func TestManager_IdleCleanup(t *testing.T) { + routerAddr, cleanup := mockRouter(t) + defer cleanup() + + mgr := NewManager(49000, 49010, 100*time.Millisecond, nil, nil) // very short idle + defer mgr.Shutdown() + + resp, _ := mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr) + time.Sleep(500 * time.Millisecond) + mgr.cleanupIdle() // manually trigger + + _, err := mgr.GetTunnel(resp.TunnelID) + assert.Error(t, err) // should be gone +} + +func TestManager_StatusList(t *testing.T) { + routerAddr, cleanup := mockRouter(t) + defer cleanup() + + mgr := NewManager(49000, 49010, 5*time.Minute, nil, nil) + defer mgr.Shutdown() + + mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr) + mgr.OpenTunnel("dev-1", "ten-1", "usr-2", routerAddr) + mgr.OpenTunnel("dev-2", "ten-1", "usr-1", routerAddr) + + list := mgr.ListTunnels("dev-1") + assert.Len(t, list, 2) +}