feat(poller): add tunnel manager with idle cleanup and status tracking
Implements Manager which orchestrates WinBox tunnel lifecycle: open, close, idle cleanup, and status queries. Uses PortPool and Tunnel from Tasks 1.2/1.3. DeviceStore and CredentialCache wired in for Task 1.5. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
198
poller/internal/tunnel/manager.go
Normal file
198
poller/internal/tunnel/manager.go
Normal file
@@ -0,0 +1,198 @@
|
|||||||
|
package tunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/mikrotik-portal/poller/internal/store"
|
||||||
|
"github.com/mikrotik-portal/poller/internal/vault"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OpenTunnelResponse is returned by Manager.OpenTunnel.
|
||||||
|
type OpenTunnelResponse struct {
|
||||||
|
TunnelID string `json:"tunnel_id"`
|
||||||
|
LocalPort int `json:"local_port"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TunnelStatus is a snapshot of a tunnel's runtime state.
|
||||||
|
type TunnelStatus struct {
|
||||||
|
TunnelID string `json:"tunnel_id"`
|
||||||
|
DeviceID string `json:"device_id"`
|
||||||
|
LocalPort int `json:"local_port"`
|
||||||
|
ActiveConns int64 `json:"active_conns"`
|
||||||
|
IdleSeconds int `json:"idle_seconds"`
|
||||||
|
CreatedAt string `json:"created_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manager orchestrates the lifecycle of WinBox tunnels: open, close, idle
|
||||||
|
// cleanup, and status queries.
|
||||||
|
type Manager struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
tunnels map[string]*Tunnel
|
||||||
|
portPool *PortPool
|
||||||
|
idleTime time.Duration
|
||||||
|
deviceStore *store.DeviceStore
|
||||||
|
credCache *vault.CredentialCache
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewManager creates a Manager with ports in [portMin, portMax] and an idle
|
||||||
|
// timeout of idleTime. deviceStore and credCache may be nil for tests.
|
||||||
|
func NewManager(portMin, portMax int, idleTime time.Duration, ds *store.DeviceStore, cc *vault.CredentialCache) *Manager {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
m := &Manager{
|
||||||
|
tunnels: make(map[string]*Tunnel),
|
||||||
|
portPool: NewPortPool(portMin, portMax),
|
||||||
|
idleTime: idleTime,
|
||||||
|
deviceStore: ds,
|
||||||
|
credCache: cc,
|
||||||
|
cancel: cancel,
|
||||||
|
}
|
||||||
|
go m.idleLoop(ctx)
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenTunnel allocates a local port, starts a TCP listener, and begins
|
||||||
|
// proxying connections to remoteAddr.
|
||||||
|
func (m *Manager) OpenTunnel(deviceID, tenantID, userID, remoteAddr string) (*OpenTunnelResponse, error) {
|
||||||
|
port, err := m.portPool.Allocate()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
|
||||||
|
if err != nil {
|
||||||
|
m.portPool.Release(port)
|
||||||
|
return nil, fmt.Errorf("failed to listen on port %d: %w", port, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
tun := &Tunnel{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
DeviceID: deviceID,
|
||||||
|
TenantID: tenantID,
|
||||||
|
UserID: userID,
|
||||||
|
LocalPort: port,
|
||||||
|
RemoteAddr: remoteAddr,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
LastActive: time.Now().UnixNano(),
|
||||||
|
listener: ln,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
m.tunnels[tun.ID] = tun
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
go tun.accept()
|
||||||
|
|
||||||
|
slog.Info("tunnel opened",
|
||||||
|
"tunnel_id", tun.ID,
|
||||||
|
"device_id", deviceID,
|
||||||
|
"tenant_id", tenantID,
|
||||||
|
"port", port,
|
||||||
|
"remote", remoteAddr,
|
||||||
|
)
|
||||||
|
|
||||||
|
return &OpenTunnelResponse{TunnelID: tun.ID, LocalPort: port}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseTunnel stops the tunnel identified by tunnelID and releases its port.
|
||||||
|
func (m *Manager) CloseTunnel(tunnelID string) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
tun, ok := m.tunnels[tunnelID]
|
||||||
|
if !ok {
|
||||||
|
m.mu.Unlock()
|
||||||
|
return fmt.Errorf("tunnel not found: %s", tunnelID)
|
||||||
|
}
|
||||||
|
delete(m.tunnels, tunnelID)
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
tun.Close()
|
||||||
|
m.portPool.Release(tun.LocalPort)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTunnel returns the status of a single tunnel by ID.
|
||||||
|
func (m *Manager) GetTunnel(tunnelID string) (*TunnelStatus, error) {
|
||||||
|
m.mu.Lock()
|
||||||
|
tun, ok := m.tunnels[tunnelID]
|
||||||
|
m.mu.Unlock()
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("tunnel not found: %s", tunnelID)
|
||||||
|
}
|
||||||
|
return tunnelStatusFrom(tun), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListTunnels returns the status of all tunnels for a given deviceID.
|
||||||
|
func (m *Manager) ListTunnels(deviceID string) []TunnelStatus {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
var out []TunnelStatus
|
||||||
|
for _, tun := range m.tunnels {
|
||||||
|
if tun.DeviceID == deviceID {
|
||||||
|
out = append(out, *tunnelStatusFrom(tun))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown closes all tunnels and stops the idle cleanup loop.
|
||||||
|
func (m *Manager) Shutdown() {
|
||||||
|
m.cancel()
|
||||||
|
m.mu.Lock()
|
||||||
|
ids := make([]string, 0, len(m.tunnels))
|
||||||
|
for id := range m.tunnels {
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
|
for _, id := range ids {
|
||||||
|
m.CloseTunnel(id) //nolint:errcheck
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) idleLoop(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
m.cleanupIdle()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) cleanupIdle() {
|
||||||
|
m.mu.Lock()
|
||||||
|
var toClose []string
|
||||||
|
for id, tun := range m.tunnels {
|
||||||
|
if tun.IdleDuration() > m.idleTime && tun.ActiveConns() == 0 {
|
||||||
|
toClose = append(toClose, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
for _, id := range toClose {
|
||||||
|
slog.Info("tunnel idle timeout", "tunnel_id", id)
|
||||||
|
m.CloseTunnel(id) //nolint:errcheck
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func tunnelStatusFrom(tun *Tunnel) *TunnelStatus {
|
||||||
|
return &TunnelStatus{
|
||||||
|
TunnelID: tun.ID,
|
||||||
|
DeviceID: tun.DeviceID,
|
||||||
|
LocalPort: tun.LocalPort,
|
||||||
|
ActiveConns: tun.ActiveConns(),
|
||||||
|
IdleSeconds: int(tun.IdleDuration().Seconds()),
|
||||||
|
CreatedAt: tun.CreatedAt.Format(time.RFC3339),
|
||||||
|
}
|
||||||
|
}
|
||||||
85
poller/internal/tunnel/manager_test.go
Normal file
85
poller/internal/tunnel/manager_test.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package tunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestManager_OpenTunnel(t *testing.T) {
|
||||||
|
routerAddr, cleanup := mockRouter(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
mgr := NewManager(49000, 49010, 5*time.Minute, nil, nil)
|
||||||
|
defer mgr.Shutdown()
|
||||||
|
|
||||||
|
resp, err := mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotEmpty(t, resp.TunnelID)
|
||||||
|
assert.GreaterOrEqual(t, resp.LocalPort, 49000)
|
||||||
|
assert.LessOrEqual(t, resp.LocalPort, 49010)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManager_CloseTunnel(t *testing.T) {
|
||||||
|
routerAddr, cleanup := mockRouter(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
mgr := NewManager(49000, 49010, 5*time.Minute, nil, nil)
|
||||||
|
defer mgr.Shutdown()
|
||||||
|
|
||||||
|
resp, _ := mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr)
|
||||||
|
err := mgr.CloseTunnel(resp.TunnelID)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Port should be released
|
||||||
|
resp2, err := mgr.OpenTunnel("dev-2", "ten-1", "usr-1", routerAddr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, resp.LocalPort, resp2.LocalPort) // reused
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManager_PortExhaustion(t *testing.T) {
|
||||||
|
routerAddr, cleanup := mockRouter(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
mgr := NewManager(49000, 49001, 5*time.Minute, nil, nil) // 2 ports
|
||||||
|
defer mgr.Shutdown()
|
||||||
|
|
||||||
|
_, err := mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = mgr.OpenTunnel("dev-2", "ten-1", "usr-1", routerAddr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = mgr.OpenTunnel("dev-3", "ten-1", "usr-1", routerAddr)
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManager_IdleCleanup(t *testing.T) {
|
||||||
|
routerAddr, cleanup := mockRouter(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
mgr := NewManager(49000, 49010, 100*time.Millisecond, nil, nil) // very short idle
|
||||||
|
defer mgr.Shutdown()
|
||||||
|
|
||||||
|
resp, _ := mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr)
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
mgr.cleanupIdle() // manually trigger
|
||||||
|
|
||||||
|
_, err := mgr.GetTunnel(resp.TunnelID)
|
||||||
|
assert.Error(t, err) // should be gone
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManager_StatusList(t *testing.T) {
|
||||||
|
routerAddr, cleanup := mockRouter(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
mgr := NewManager(49000, 49010, 5*time.Minute, nil, nil)
|
||||||
|
defer mgr.Shutdown()
|
||||||
|
|
||||||
|
mgr.OpenTunnel("dev-1", "ten-1", "usr-1", routerAddr)
|
||||||
|
mgr.OpenTunnel("dev-1", "ten-1", "usr-2", routerAddr)
|
||||||
|
mgr.OpenTunnel("dev-2", "ten-1", "usr-1", routerAddr)
|
||||||
|
|
||||||
|
list := mgr.ListTunnels("dev-1")
|
||||||
|
assert.Len(t, list, 2)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user