From 4860fad643771bb63482ff4424b489db54297cc5 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Thu, 12 Mar 2026 15:39:24 -0500 Subject: [PATCH] feat(api): add remote access endpoints for WinBox tunnels and SSH sessions Implements four operator-gated endpoints under /api/tenants/{tenant_id}/devices/{device_id}/: - POST /winbox-session: opens a WinBox tunnel via NATS request-reply to poller - POST /ssh-session: mints a single-use Redis token (120s TTL) for WebSocket SSH relay - DELETE /winbox-session/{tunnel_id}: idempotently closes a WinBox tunnel - GET /sessions: lists active WinBox tunnels via NATS tunnel.status.list Co-Authored-By: Claude Opus 4.6 --- backend/app/main.py | 2 + backend/app/routers/remote_access.py | 316 +++++++++++++++++++++++++++ 2 files changed, 318 insertions(+) create mode 100644 backend/app/routers/remote_access.py diff --git a/backend/app/main.py b/backend/app/main.py index c5c26a3..5ed1e91 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -276,6 +276,7 @@ def create_app() -> FastAPI: from app.routers.certificates import router as certificates_router from app.routers.transparency import router as transparency_router from app.routers.settings import router as settings_router + from app.routers.remote_access import router as remote_access_router app.include_router(auth_router, prefix="/api") app.include_router(tenants_router, prefix="/api") @@ -302,6 +303,7 @@ def create_app() -> FastAPI: app.include_router(certificates_router, prefix="/api/certificates", tags=["certificates"]) app.include_router(transparency_router, prefix="/api") app.include_router(settings_router, prefix="/api") + app.include_router(remote_access_router, prefix="/api") # Health check endpoints @app.get("/health", tags=["health"]) diff --git a/backend/app/routers/remote_access.py b/backend/app/routers/remote_access.py new file mode 100644 index 0000000..578ac5d --- /dev/null +++ b/backend/app/routers/remote_access.py @@ -0,0 +1,316 @@ +""" +Remote access endpoints for WinBox tunnels and SSH terminal sessions. + +All routes are tenant-scoped under /api/tenants/{tenant_id}/devices/{device_id}. +RBAC: operator+ required for all endpoints. +""" + +import json +import logging +import secrets +import time +import uuid +from typing import Optional + +import nats +import redis.asyncio as aioredis +from fastapi import APIRouter, Depends, HTTPException, Request, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import settings +from app.database import get_db +from app.middleware.rbac import require_operator_or_above +from app.middleware.tenant_context import CurrentUser, get_current_user +from app.models.device import Device +from app.schemas.remote_access import ( + ActiveSessionsResponse, + SSHSessionRequest, + SSHSessionResponse, + TunnelStatusItem, + WinboxSessionResponse, +) +from app.services.audit_service import log_action +from sqlalchemy import select + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["remote-access"]) + +# --------------------------------------------------------------------------- +# Lazy NATS and Redis clients +# --------------------------------------------------------------------------- + +_nc: Optional[nats.aio.client.Client] = None +_redis: Optional[aioredis.Redis] = None + + +async def _get_nats() -> nats.aio.client.Client: + """Get or create a shared NATS client.""" + global _nc + if _nc is None or _nc.is_closed: + _nc = await nats.connect(settings.NATS_URL) + return _nc + + +async def _get_redis() -> aioredis.Redis: + """Get or create a shared Redis client.""" + global _redis + if _redis is None: + _redis = aioredis.from_url(settings.REDIS_URL, decode_responses=True) + return _redis + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _source_ip(request: Request) -> Optional[str]: + return request.headers.get("x-real-ip") or (request.client.host if request.client else None) + + +async def _get_device(db: AsyncSession, tenant_id: uuid.UUID, device_id: uuid.UUID) -> Device: + result = await db.execute( + select(Device).where(Device.id == device_id, Device.tenant_id == tenant_id) + ) + device = result.scalar_one_or_none() + if not device: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Device not found") + return device + + +async def _check_tenant_access(current_user: CurrentUser, tenant_id: uuid.UUID, db: AsyncSession) -> None: + if current_user.is_super_admin: + from app.database import set_tenant_context + await set_tenant_context(db, str(tenant_id)) + return + if current_user.tenant_id != tenant_id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Access denied: you do not belong to this tenant.", + ) + + +# --------------------------------------------------------------------------- +# POST /winbox-session — Open a WinBox tunnel via NATS request-reply +# --------------------------------------------------------------------------- + + +@router.post( + "/tenants/{tenant_id}/devices/{device_id}/winbox-session", + response_model=WinboxSessionResponse, + summary="Open a WinBox tunnel to the device", + dependencies=[Depends(require_operator_or_above)], +) +async def open_winbox_session( + tenant_id: uuid.UUID, + device_id: uuid.UUID, + request: Request, + current_user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> WinboxSessionResponse: + """ + Requests the poller to open a local TCP tunnel to device port 8291 (WinBox). + + Returns a tunnel_id, local host/port, and a winbox:// URI. + Requires operator role or above. + """ + await _check_tenant_access(current_user, tenant_id, db) + await _get_device(db, tenant_id, device_id) + source_ip = _source_ip(request) + + try: + await log_action( + db, tenant_id, current_user.user_id, "winbox_tunnel_open", + resource_type="device", resource_id=str(device_id), + device_id=device_id, + details={"source_ip": source_ip}, + ip_address=source_ip, + ) + except Exception: + pass + + payload = json.dumps({ + "device_id": str(device_id), + "tenant_id": str(tenant_id), + "user_id": str(current_user.user_id), + "target_port": 8291, + }).encode() + + try: + nc = await _get_nats() + msg = await nc.request("tunnel.open", payload, timeout=10) + except Exception as exc: + logger.error("NATS tunnel.open failed: %s", exc) + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Tunnel service unavailable") + + try: + data = json.loads(msg.data) + except Exception: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Invalid response from tunnel service") + + if "error" in data: + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=data["error"]) + + port = data.get("local_port") + tunnel_id = data.get("tunnel_id", "") + if not isinstance(port, int) or not (49000 <= port <= 49100): + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Invalid port allocation from tunnel service") + + return WinboxSessionResponse( + tunnel_id=tunnel_id, + host="127.0.0.1", + port=port, + winbox_uri=f"winbox://127.0.0.1:{port}", + ) + + +# --------------------------------------------------------------------------- +# POST /ssh-session — Create a single-use Redis token for SSH WebSocket +# --------------------------------------------------------------------------- + + +@router.post( + "/tenants/{tenant_id}/devices/{device_id}/ssh-session", + response_model=SSHSessionResponse, + summary="Create a single-use SSH WebSocket session token", + dependencies=[Depends(require_operator_or_above)], +) +async def open_ssh_session( + tenant_id: uuid.UUID, + device_id: uuid.UUID, + request: Request, + body: SSHSessionRequest, + current_user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> SSHSessionResponse: + """ + Generates a single-use token (120s TTL) stored in Redis that authorises + the WebSocket SSH relay to accept a connection for this device. + + Requires operator role or above. + """ + await _check_tenant_access(current_user, tenant_id, db) + await _get_device(db, tenant_id, device_id) + source_ip = _source_ip(request) + + try: + await log_action( + db, tenant_id, current_user.user_id, "ssh_session_open", + resource_type="device", resource_id=str(device_id), + device_id=device_id, + details={"source_ip": source_ip, "cols": body.cols, "rows": body.rows}, + ip_address=source_ip, + ) + except Exception: + pass + + token = secrets.token_urlsafe(32) + token_payload = json.dumps({ + "device_id": str(device_id), + "tenant_id": str(tenant_id), + "user_id": str(current_user.user_id), + "source_ip": source_ip, + "cols": body.cols, + "rows": body.rows, + "created_at": int(time.time()), + }) + + try: + rd = await _get_redis() + await rd.setex(f"ssh:token:{token}", 120, token_payload) + except Exception as exc: + logger.error("Redis setex failed for SSH token: %s", exc) + raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Session store unavailable") + + return SSHSessionResponse( + token=token, + websocket_url=f"/ws/ssh?token={token}", + ) + + +# --------------------------------------------------------------------------- +# DELETE /winbox-session/{tunnel_id} — Close a WinBox tunnel (idempotent) +# --------------------------------------------------------------------------- + + +@router.delete( + "/tenants/{tenant_id}/devices/{device_id}/winbox-session/{tunnel_id}", + summary="Close a WinBox tunnel", + dependencies=[Depends(require_operator_or_above)], +) +async def close_winbox_session( + tenant_id: uuid.UUID, + device_id: uuid.UUID, + tunnel_id: str, + request: Request, + current_user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> dict: + """ + Instructs the poller to close the given WinBox tunnel. + Idempotent — does not error if the tunnel is already closed. + Requires operator role or above. + """ + await _check_tenant_access(current_user, tenant_id, db) + source_ip = _source_ip(request) + + try: + await log_action( + db, tenant_id, current_user.user_id, "winbox_tunnel_close", + resource_type="device", resource_id=str(device_id), + device_id=device_id, + details={"tunnel_id": tunnel_id, "source_ip": source_ip}, + ip_address=source_ip, + ) + except Exception: + pass + + try: + nc = await _get_nats() + payload = json.dumps({"tunnel_id": tunnel_id}).encode() + await nc.request("tunnel.close", payload, timeout=10) + except Exception: + # Idempotent — tunnel may already be closed or poller unavailable + pass + + return {"status": "closed"} + + +# --------------------------------------------------------------------------- +# GET /sessions — List active WinBox tunnels and SSH sessions +# --------------------------------------------------------------------------- + + +@router.get( + "/tenants/{tenant_id}/devices/{device_id}/sessions", + response_model=ActiveSessionsResponse, + summary="List active WinBox tunnels and SSH sessions for a device", + dependencies=[Depends(require_operator_or_above)], +) +async def list_sessions( + tenant_id: uuid.UUID, + device_id: uuid.UUID, + current_user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> ActiveSessionsResponse: + """ + Queries the poller via NATS for active WinBox tunnels for this device. + SSH sessions are not tracked server-side (token-based, single-use). + Requires operator role or above. + """ + await _check_tenant_access(current_user, tenant_id, db) + + tunnels: list[TunnelStatusItem] = [] + try: + nc = await _get_nats() + payload = json.dumps({"device_id": str(device_id), "tenant_id": str(tenant_id)}).encode() + msg = await nc.request("tunnel.status.list", payload, timeout=10) + raw = json.loads(msg.data) + if isinstance(raw, list): + tunnels = [TunnelStatusItem(**item) for item in raw] + except Exception as exc: + logger.warning("tunnel.status.list NATS request failed: %s", exc) + # Return empty list rather than error — poller may be unavailable + + return ActiveSessionsResponse(winbox_tunnels=tunnels, ssh_sessions=[])