Files

376 lines
7.8 KiB
Go

package session
import (
"context"
"fmt"
"log/slog"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/google/uuid"
)
type Config struct {
MaxSessions int
DisplayMin int
DisplayMax int
WSPortMin int
WSPortMax int
IdleTimeout int // seconds
MaxLifetime int // seconds
WinBoxPath string
BindAddr string
}
type Manager struct {
mu sync.Mutex
sessions map[string]*Session
displays *Pool
wsPorts *Pool
cfg Config
}
func NewManager(cfg Config) *Manager {
return &Manager{
sessions: make(map[string]*Session),
displays: NewPool(cfg.DisplayMin, cfg.DisplayMax),
wsPorts: NewPool(cfg.WSPortMin, cfg.WSPortMax),
cfg: cfg,
}
}
func (m *Manager) HasCapacity() bool {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.sessions) < m.cfg.MaxSessions
}
func (m *Manager) SessionCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.sessions)
}
func (m *Manager) CreateSession(req CreateRequest) (*CreateResponse, error) {
m.mu.Lock()
if len(m.sessions) >= m.cfg.MaxSessions {
m.mu.Unlock()
return nil, fmt.Errorf("capacity")
}
display, err := m.displays.Allocate()
if err != nil {
m.mu.Unlock()
return nil, fmt.Errorf("no displays available: %w", err)
}
wsPort, err := m.wsPorts.Allocate()
if err != nil {
m.displays.Release(display)
m.mu.Unlock()
return nil, fmt.Errorf("no ws ports available: %w", err)
}
workerID := req.SessionID
if workerID == "" {
workerID = uuid.New().String()
}
idleTimeout := time.Duration(req.IdleTimeoutSec) * time.Second
if idleTimeout == 0 {
idleTimeout = time.Duration(m.cfg.IdleTimeout) * time.Second
}
maxLifetime := time.Duration(req.MaxLifetimeSec) * time.Second
if maxLifetime == 0 {
maxLifetime = time.Duration(m.cfg.MaxLifetime) * time.Second
}
sess := &Session{
ID: workerID,
TunnelHost: req.TunnelHost,
TunnelPort: req.TunnelPort,
Display: display,
WSPort: wsPort,
State: StateCreating,
CreatedAt: time.Now(),
IdleTimeout: idleTimeout,
MaxLifetime: maxLifetime,
}
m.sessions[workerID] = sess
m.mu.Unlock()
tmpDir, err := CreateSessionTmpDir(workerID)
if err != nil {
m.terminateSession(workerID, "tmpdir creation failed")
return nil, fmt.Errorf("create tmpdir: %w", err)
}
sess.mu.Lock()
sess.TmpDir = tmpDir
sess.mu.Unlock()
xpraCfg := XpraConfig{
Display: display,
WSPort: wsPort,
BindAddr: m.cfg.BindAddr,
TunnelHost: req.TunnelHost,
TunnelPort: req.TunnelPort,
Username: req.Username,
Password: req.Password,
TmpDir: tmpDir,
WinBoxPath: m.cfg.WinBoxPath,
}
proc, err := StartXpra(xpraCfg)
// Zero credential copies (Go-side only; /proc and exec args are a known v1 limitation)
xpraCfg.Username = ""
xpraCfg.Password = ""
req.Username = ""
req.Password = ""
if err != nil {
m.terminateSession(workerID, "xpra start failed")
return nil, fmt.Errorf("xpra start: %w", err)
}
sess.mu.Lock()
sess.XpraPID = proc.Pid
sess.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := WaitForXpraReady(ctx, m.cfg.BindAddr, wsPort, 10*time.Second); err != nil {
m.terminateSession(workerID, "xpra not ready")
return nil, fmt.Errorf("xpra ready: %w", err)
}
sess.mu.Lock()
sess.State = StateActive
createdAt := sess.CreatedAt
sess.mu.Unlock()
return &CreateResponse{
WorkerSessionID: workerID,
Status: StateActive,
XpraWSPort: wsPort,
ExpiresAt: createdAt.Add(idleTimeout),
MaxExpiresAt: createdAt.Add(maxLifetime),
}, nil
}
func (m *Manager) TerminateSession(workerID string) error {
return m.terminateSession(workerID, "requested")
}
func (m *Manager) terminateSession(workerID string, reason string) error {
m.mu.Lock()
sess, ok := m.sessions[workerID]
if !ok {
m.mu.Unlock()
return nil
}
m.mu.Unlock()
sess.mu.Lock()
if sess.State == StateTerminating || sess.State == StateTerminated {
sess.mu.Unlock()
return nil
}
sess.State = StateTerminating
pid := sess.XpraPID
tmpDir := sess.TmpDir
display := sess.Display
wsPort := sess.WSPort
sess.mu.Unlock()
slog.Info("terminating session", "id", workerID, "reason", reason)
if pid > 0 {
KillXpraSession(pid)
}
if tmpDir != "" {
if err := CleanupTmpDir(tmpDir); err != nil {
slog.Warn("tmpdir cleanup failed", "id", workerID, "err", err)
}
}
m.displays.Release(display)
m.wsPorts.Release(wsPort)
sess.mu.Lock()
sess.State = StateTerminated
sess.mu.Unlock()
m.mu.Lock()
delete(m.sessions, workerID)
m.mu.Unlock()
return nil
}
func (m *Manager) GetSession(workerID string) (*StatusResponse, error) {
m.mu.Lock()
sess, ok := m.sessions[workerID]
m.mu.Unlock()
if !ok {
return nil, fmt.Errorf("not found")
}
sess.mu.Lock()
id := sess.ID
state := sess.State
display := sess.Display
wsPort := sess.WSPort
createdAt := sess.CreatedAt
sess.mu.Unlock()
idleSec := QueryIdleTime(display)
return &StatusResponse{
WorkerSessionID: id,
Status: state,
Display: display,
WSPort: wsPort,
CreatedAt: createdAt,
IdleSeconds: idleSec,
}, nil
}
func (m *Manager) ListSessions() []StatusResponse {
m.mu.Lock()
type sessInfo struct {
id string
state State
display int
wsPort int
createdAt time.Time
}
infos := make([]sessInfo, 0, len(m.sessions))
for _, sess := range m.sessions {
sess.mu.Lock()
infos = append(infos, sessInfo{
id: sess.ID,
state: sess.State,
display: sess.Display,
wsPort: sess.WSPort,
createdAt: sess.CreatedAt,
})
sess.mu.Unlock()
}
m.mu.Unlock()
result := make([]StatusResponse, 0, len(infos))
for _, info := range infos {
result = append(result, StatusResponse{
WorkerSessionID: info.id,
Status: info.state,
Display: info.display,
WSPort: info.wsPort,
CreatedAt: info.createdAt,
IdleSeconds: QueryIdleTime(info.display),
})
}
return result
}
func (m *Manager) RunCleanupLoop(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.checkTimeouts()
}
}
}
func (m *Manager) checkTimeouts() {
m.mu.Lock()
ids := make([]string, 0, len(m.sessions))
for id := range m.sessions {
ids = append(ids, id)
}
m.mu.Unlock()
now := time.Now()
for _, id := range ids {
m.mu.Lock()
sess, ok := m.sessions[id]
m.mu.Unlock()
if !ok {
continue
}
sess.mu.Lock()
state := sess.State
createdAt := sess.CreatedAt
maxLifetime := sess.MaxLifetime
idleTimeout := sess.IdleTimeout
display := sess.Display
pid := sess.XpraPID
sess.mu.Unlock()
if state != StateActive && state != StateGrace {
continue
}
if now.Sub(createdAt) > maxLifetime {
slog.Info("session max lifetime exceeded", "id", id)
m.terminateSession(id, "max_lifetime")
continue
}
if pid > 0 {
proc, err := os.FindProcess(pid)
if err != nil || proc.Signal(syscall.Signal(0)) != nil {
slog.Info("xpra process dead", "id", id)
m.terminateSession(id, "worker_failure")
continue
}
}
idleSec := QueryIdleTime(display)
if idleSec >= 0 && time.Duration(idleSec)*time.Second > idleTimeout {
slog.Info("session idle timeout", "id", id, "idle_seconds", idleSec)
m.terminateSession(id, "idle_timeout")
}
}
}
func (m *Manager) CleanupOrphans() {
baseDir := "/tmp/winbox-sessions"
entries, err := os.ReadDir(baseDir)
if err != nil {
if !os.IsNotExist(err) {
slog.Warn("orphan scan: cannot read dir", "err", err)
}
return
}
count := 0
for _, entry := range entries {
if !entry.IsDir() {
continue
}
path := filepath.Join(baseDir, entry.Name())
slog.Info("cleaning orphan session dir", "path", path)
os.RemoveAll(path)
count++
}
exec.Command("xpra", "stop", "--all").Run()
m.displays.ResetAll()
m.wsPorts.ResetAll()
if count > 0 {
slog.Info("orphan cleanup complete", "cleaned", count)
}
}