Files
remotelink-docker/agent/agent.py
monoadmin e16a2fa978 Add Python agent, WebSocket relay, real viewer, enrollment tokens
- WebSocket relay service (FastAPI) bridges agents and viewers
- Python agent with screen capture (mss), input control (pynput),
  script execution, and auto-reconnect
- Windows service wrapper, PyInstaller spec, NSIS installer for
  silent mass deployment (RemoteLink-Setup.exe /S /SERVER= /ENROLL=)
- Enrollment token system: admin generates tokens, agents self-register
- Real WebSocket viewer replaces simulated canvas
- Linux agent binary served from /downloads/remotelink-agent-linux
- DB migration 0002: viewer_token on sessions, enrollment_tokens table
- Sign-up pages cleaned up (invite-only redirect)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 16:25:10 -07:00

439 lines
16 KiB
Python

"""
RemoteLink Agent
Connects to the RemoteLink server, streams the screen, and handles remote input.
Usage:
First run (self-register):
python agent.py --server https://myserver.com --enroll <enrollment-token>
Subsequent runs (use saved config):
python agent.py
Run-once mode (no config saved, exits when session ends):
python agent.py --server https://myserver.com --enroll <token> --run-once
"""
import asyncio
import json
import logging
import os
import platform
import subprocess
import sys
import time
from io import BytesIO
from pathlib import Path
from typing import Optional
import argparse
import signal
# Third-party — installed via requirements.txt / bundled by PyInstaller
import httpx
import websockets
import websockets.exceptions
from mss import mss
from PIL import Image
log = logging.getLogger("remotelink-agent")
# ── Config ────────────────────────────────────────────────────────────────────
def config_dir() -> Path:
"""Platform-appropriate config directory."""
if platform.system() == "Windows":
base = os.environ.get("PROGRAMDATA", "C:\\ProgramData")
return Path(base) / "RemoteLink"
elif platform.system() == "Darwin":
return Path.home() / "Library" / "Application Support" / "RemoteLink"
else:
return Path("/etc/remotelink")
CONFIG_FILE = config_dir() / "agent.json"
AGENT_VERSION = "1.0.0"
def load_config() -> Optional[dict]:
if CONFIG_FILE.exists():
try:
return json.loads(CONFIG_FILE.read_text())
except Exception:
pass
return None
def save_config(data: dict):
config_dir().mkdir(parents=True, exist_ok=True)
CONFIG_FILE.write_text(json.dumps(data, indent=2))
log.info(f"Config saved to {CONFIG_FILE}")
# ── Registration ──────────────────────────────────────────────────────────────
async def register(server_url: str, enrollment_token: str) -> dict:
"""Self-register with the server using an enrollment token."""
hostname = platform.node()
os_name = platform.system()
os_version = platform.version()
url = f"{server_url.rstrip('/')}/api/agent/register"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, json={
"enrollmentToken": enrollment_token,
"name": hostname,
"hostname": hostname,
"os": os_name,
"osVersion": os_version,
"agentVersion": AGENT_VERSION,
"ipAddress": None,
})
if resp.status_code != 200:
raise RuntimeError(f"Registration failed: {resp.status_code} {resp.text}")
data = resp.json()
log.info(f"Registered as machine {data['machineId']}")
return data
async def heartbeat(server_url: str, access_key: str) -> Optional[dict]:
"""Send heartbeat, returns pending connection info if any."""
url = f"{server_url.rstrip('/')}/api/agent/heartbeat"
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json={"accessKey": access_key})
if resp.status_code == 200:
return resp.json()
except Exception as e:
log.warning(f"Heartbeat failed: {e}")
return None
async def get_session_code(server_url: str, access_key: str) -> Optional[str]:
"""Request a new session code from the server."""
url = f"{server_url.rstrip('/')}/api/agent/session-code"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json={"accessKey": access_key})
if resp.status_code == 200:
data = resp.json()
return data.get("code")
return None
# ── Screen capture ────────────────────────────────────────────────────────────
class ScreenCapture:
def __init__(self, fps: int = 15, quality: int = 60):
self.fps = fps
self.quality = quality
self._frame_delay = 1.0 / fps
self._sct = None
def __enter__(self):
self._sct = mss()
return self
def __exit__(self, *args):
if self._sct:
self._sct.close()
def capture(self) -> bytes:
monitor = self._sct.monitors[1] # Primary monitor
img = self._sct.grab(monitor)
pil = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX")
buf = BytesIO()
pil.save(buf, format="JPEG", quality=self.quality, optimize=False)
return buf.getvalue()
@property
def frame_delay(self) -> float:
return self._frame_delay
# ── Input control ─────────────────────────────────────────────────────────────
class InputController:
"""Replay mouse and keyboard events on the local machine."""
def __init__(self):
self._mouse = None
self._keyboard = None
self._available = False
try:
from pynput.mouse import Button, Controller as MouseController
from pynput.keyboard import Key, Controller as KeyboardController
self._mouse = MouseController()
self._keyboard = KeyboardController()
self._Button = Button
self._Key = Key
self._available = True
except Exception as e:
log.warning(f"Input control unavailable: {e}")
def handle(self, event: dict):
if not self._available:
return
try:
t = event.get("type")
if t == "mouse_move":
self._mouse.position = (event["x"], event["y"])
elif t == "mouse_click":
self._mouse.position = (event["x"], event["y"])
btn = self._Button.right if event.get("button") == "right" else self._Button.left
if event.get("double"):
self._mouse.click(btn, 2)
else:
self._mouse.click(btn, 1)
elif t == "mouse_scroll":
self._mouse.scroll(event.get("dx", 0), event.get("dy", 0))
elif t == "key_press":
key_str = event.get("key", "")
self._keyboard.type(key_str)
elif t == "key_special":
# Special keys like Enter, Tab, Escape, etc.
key_name = event.get("key", "")
try:
key = getattr(self._Key, key_name)
self._keyboard.press(key)
self._keyboard.release(key)
except AttributeError:
pass
except Exception as e:
log.debug(f"Input error: {e}")
# ── Script execution ──────────────────────────────────────────────────────────
async def exec_script(script: str, shell: str, session_id: str, ws) -> None:
"""Execute a script and stream output back through the WebSocket."""
exec_id = str(time.time())
if platform.system() == "Windows":
if shell == "powershell":
cmd = ["powershell", "-NonInteractive", "-Command", script]
else:
cmd = ["cmd", "/c", script]
else:
cmd = ["bash", "-c", script]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
async for line in proc.stdout:
await ws.send(json.dumps({
"type": "script_output",
"session_id": session_id,
"id": exec_id,
"output": line.decode(errors="replace"),
"done": False,
}))
await proc.wait()
await ws.send(json.dumps({
"type": "script_output",
"session_id": session_id,
"id": exec_id,
"output": "",
"done": True,
"exit_code": proc.returncode,
}))
except Exception as e:
await ws.send(json.dumps({
"type": "script_output",
"session_id": session_id,
"id": exec_id,
"output": f"Error: {e}\n",
"done": True,
"exit_code": -1,
}))
# ── Main agent loop ───────────────────────────────────────────────────────────
class Agent:
def __init__(self, server_url: str, machine_id: str, access_key: str, relay_url: str):
self.server_url = server_url
self.machine_id = machine_id
self.access_key = access_key
self.relay_url = relay_url.rstrip("/")
self._streaming = False
self._active_session: Optional[str] = None
self._input = InputController()
self._stop_event = asyncio.Event()
async def run(self):
log.info(f"Agent starting. Machine ID: {self.machine_id}")
log.info(f"Connecting to relay: {self.relay_url}")
# Heartbeat loop in background
asyncio.create_task(self._heartbeat_loop())
# Connect to relay (with reconnect)
while not self._stop_event.is_set():
try:
await self._connect()
except Exception as e:
log.warning(f"Relay disconnected: {e}. Reconnecting in 5s…")
await asyncio.sleep(5)
async def _heartbeat_loop(self):
while not self._stop_event.is_set():
await heartbeat(self.server_url, self.access_key)
await asyncio.sleep(30)
async def _connect(self):
ws_url = (
f"{self.relay_url}/ws/agent"
f"?machine_id={self.machine_id}&access_key={self.access_key}"
)
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as ws:
log.info("Connected to relay")
await self._message_loop(ws)
async def _message_loop(self, ws):
with ScreenCapture(fps=15, quality=60) as screen:
stream_task: Optional[asyncio.Task] = None
async def stream_frames():
while self._streaming and not self._stop_event.is_set():
t0 = time.monotonic()
try:
frame = screen.capture()
await ws.send(frame)
except Exception:
break
elapsed = time.monotonic() - t0
delay = max(0, screen.frame_delay - elapsed)
await asyncio.sleep(delay)
try:
async for raw_msg in ws:
if isinstance(raw_msg, bytes):
continue # agents don't receive binary
try:
msg = json.loads(raw_msg)
except json.JSONDecodeError:
continue
msg_type = msg.get("type")
if msg_type == "start_stream":
session_id = msg.get("session_id")
log.info(f"Viewer connected — session {session_id}")
self._streaming = True
self._active_session = session_id
if stream_task and not stream_task.done():
stream_task.cancel()
stream_task = asyncio.create_task(stream_frames())
elif msg_type == "stop_stream":
log.info("Viewer disconnected — stopping stream")
self._streaming = False
self._active_session = None
if stream_task and not stream_task.done():
stream_task.cancel()
elif msg_type in ("mouse_move", "mouse_click", "mouse_scroll",
"key_press", "key_special"):
self._input.handle(msg)
elif msg_type == "exec_script":
asyncio.create_task(exec_script(
msg.get("script", ""),
msg.get("shell", "bash"),
msg.get("session_id", ""),
ws,
))
elif msg_type == "ping":
await ws.send(json.dumps({"type": "pong"}))
finally:
self._streaming = False
if stream_task and not stream_task.done():
stream_task.cancel()
def stop(self):
self._stop_event.set()
# ── Entry point ───────────────────────────────────────────────────────────────
def setup_logging(verbose: bool):
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
# Also log to file
log_dir = config_dir()
log_dir.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(log_dir / "agent.log")
fh.setLevel(level)
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logging.getLogger().addHandler(fh)
async def main():
parser = argparse.ArgumentParser(description="RemoteLink Agent")
parser.add_argument("--server", help="Server URL (e.g. https://remotelink.example.com)")
parser.add_argument("--relay", help="Relay WebSocket URL (e.g. ws://remotelink.example.com:8765)")
parser.add_argument("--enroll", metavar="TOKEN", help="Enrollment token for first-time registration")
parser.add_argument("--run-once", action="store_true", help="Exit after first session ends")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
setup_logging(args.verbose)
config = load_config()
# ── First-time registration ──────────────────────────────────────────────
if args.enroll:
if not args.server:
log.error("--server is required with --enroll")
sys.exit(1)
log.info(f"Enrolling with server {args.server}")
reg = await register(args.server, args.enroll)
relay_url = args.relay or args.server.replace("https://", "ws://").replace("http://", "ws://") + ":8765"
config = {
"server_url": args.server,
"relay_url": relay_url,
"machine_id": reg["machineId"],
"access_key": reg["accessKey"],
}
if not args.run_once:
save_config(config)
elif not config:
log.error(
f"No config found at {CONFIG_FILE}.\n"
"Run with --server <url> --enroll <token> to register this machine."
)
sys.exit(1)
server_url = config["server_url"]
relay_url = config.get("relay_url") or (
server_url.replace("https://", "ws://").replace("http://", "ws://") + ":8765"
)
machine_id = config["machine_id"]
access_key = config["access_key"]
agent = Agent(server_url, machine_id, access_key, relay_url)
# Handle Ctrl+C / SIGTERM gracefully
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, agent.stop)
except NotImplementedError:
pass # Windows
log.info("RemoteLink Agent running. Press Ctrl+C to stop.")
await agent.run()
if __name__ == "__main__":
asyncio.run(main())