Files
remotelink-docker/agent/agent.py
monoadmin dcf88c7863 Add open enrollment + update Windows installer
Open enrollment (OPEN_ENROLLMENT=true in .env):
- Agents can register with just --server <url>, no token needed
- Machines assigned to OPEN_ENROLLMENT_USER_EMAIL, first admin, or first user
- Falls back gracefully if env var not set
- agent.py register() now takes optional token; --server alone triggers registration

Agent CLI changes:
- --server without --enroll triggers open enrollment registration on first run
- --enroll still works for token-based or re-enrollment
- Error message updated to reflect new syntax

NSIS installer changes:
- Interactive mode: custom page prompts for server URL + optional token
- Silent mode: /SERVER= alone works with open enrollment, /ENROLL= still supported
- Cleans up config on uninstall

agent.spec: add pyperclip, base64, struct, uuid to hidden imports

docker-compose + .env: OPEN_ENROLLMENT and OPEN_ENROLLMENT_USER_EMAIL vars

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 00:02:05 -07:00

878 lines
34 KiB
Python

"""
RemoteLink Agent
Connects to the RemoteLink server, streams the screen, and handles remote input.
Usage:
First run (self-register):
python agent.py --server https://myserver.com --enroll <enrollment-token>
Subsequent runs (use saved config):
python agent.py
Run-once mode (no config saved, exits when session ends):
python agent.py --server https://myserver.com --enroll <token> --run-once
"""
import asyncio
import base64
import json
import logging
import os
import platform
import subprocess
import sys
import time
import uuid as uuid_lib
from io import BytesIO
from pathlib import Path
from typing import Optional
import argparse
import signal
# ── Display detection (Linux) — must run before mss/pynput imports ────────────
def _ensure_display():
"""Auto-detect and set $DISPLAY on Linux if not already set."""
if platform.system() != "Linux":
return
if os.environ.get("DISPLAY"):
return
import glob
# 1. Look for X11 Unix sockets in /tmp/.X11-unix/
sockets = sorted(glob.glob("/tmp/.X11-unix/X*"))
if sockets:
num = sockets[0].replace("/tmp/.X11-unix/X", "")
os.environ["DISPLAY"] = f":{num}"
log.info(f"Auto-detected DISPLAY=:{num}")
return
# 2. Ask 'w' which displays logged-in users are on
try:
out = subprocess.check_output(
["bash", "-c", "w -h 2>/dev/null | awk '{print $3}' | grep '^:' | head -1"],
timeout=2, text=True
).strip()
if out.startswith(":"):
os.environ["DISPLAY"] = out
log.info(f"Auto-detected DISPLAY={out} from w")
return
except Exception:
pass
# 3. Fallback — :0 is correct on most single-user desktop systems
os.environ["DISPLAY"] = ":0"
log.info("No display found via socket scan — falling back to DISPLAY=:0")
# Run before importing mss / pynput (they read DISPLAY at import time on Linux)
_ensure_display()
# Third-party — installed via requirements.txt / bundled by PyInstaller
import httpx
import websockets
import websockets.exceptions
from mss import mss
from PIL import Image
log = logging.getLogger("remotelink-agent")
# ── Config ────────────────────────────────────────────────────────────────────
def config_dir() -> Path:
"""Platform-appropriate config directory."""
if platform.system() == "Windows":
base = os.environ.get("PROGRAMDATA", "C:\\ProgramData")
return Path(base) / "RemoteLink"
elif platform.system() == "Darwin":
return Path.home() / "Library" / "Application Support" / "RemoteLink"
else:
return Path("/etc/remotelink")
CONFIG_FILE = config_dir() / "agent.json"
AGENT_VERSION = "1.0.0"
def load_config() -> Optional[dict]:
if CONFIG_FILE.exists():
try:
return json.loads(CONFIG_FILE.read_text())
except Exception:
pass
return None
def save_config(data: dict):
config_dir().mkdir(parents=True, exist_ok=True)
CONFIG_FILE.write_text(json.dumps(data, indent=2))
log.info(f"Config saved to {CONFIG_FILE}")
# ── Registration ──────────────────────────────────────────────────────────────
async def register(server_url: str, enrollment_token: Optional[str] = None) -> dict:
"""Self-register with the server. Token is optional when open enrollment is enabled."""
hostname = platform.node()
os_name = platform.system()
os_version = platform.version()
payload: dict = {
"name": hostname,
"hostname": hostname,
"os": os_name,
"osVersion": os_version,
"agentVersion": AGENT_VERSION,
"macAddress": get_mac_address(),
}
if enrollment_token:
payload["enrollmentToken"] = enrollment_token
url = f"{server_url.rstrip('/')}/api/agent/register"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, json=payload)
if resp.status_code != 200:
raise RuntimeError(f"Registration failed: {resp.status_code} {resp.text}")
data = resp.json()
log.info(f"Registered as machine {data['machineId']}")
return data
async def heartbeat(server_url: str, access_key: str) -> Optional[dict]:
"""Send heartbeat, returns server response including update info."""
url = f"{server_url.rstrip('/')}/api/agent/heartbeat"
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json={"accessKey": access_key, "agentVersion": AGENT_VERSION})
if resp.status_code == 200:
data = resp.json()
if data.get("updateAvailable"):
asyncio.create_task(_auto_update(data["updateAvailable"]))
return data
except Exception as e:
log.warning(f"Heartbeat failed: {e}")
return None
async def _auto_update(update_info: dict) -> None:
"""Download new agent binary and replace self, then restart."""
download_url = update_info.get("downloadUrl")
if not download_url:
return
log.info(f"Update available: {update_info.get('version')}. Downloading from {download_url}")
try:
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
resp = await client.get(download_url)
if resp.status_code != 200:
log.warning(f"Update download failed: {resp.status_code}")
return
# Write to a temp file alongside the current binary
current = Path(sys.executable)
tmp = current.with_suffix(".new")
tmp.write_bytes(resp.content)
tmp.chmod(0o755)
# Atomically replace (Unix only — Windows needs a separate updater process)
if platform.system() != "Windows":
tmp.replace(current)
log.info(f"Agent updated to {update_info.get('version')}. Restarting…")
os.execv(str(current), sys.argv)
else:
log.info("Update downloaded. Restart the service to apply on Windows.")
except Exception as e:
log.warning(f"Auto-update failed: {e}")
async def get_session_code(server_url: str, access_key: str) -> Optional[str]:
"""Request a new session code from the server."""
url = f"{server_url.rstrip('/')}/api/agent/session-code"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json={"accessKey": access_key})
if resp.status_code == 200:
data = resp.json()
return data.get("code")
return None
# ── Screen capture ────────────────────────────────────────────────────────────
class ScreenCapture:
def __init__(self, fps: int = 15, quality: int = 60):
self.fps = fps
self.quality = quality
self._frame_delay = 1.0 / fps
self._sct = None
self._monitors: list = [] # mss monitors (excluding index 0 = all)
self._monitor_idx: int = 0 # 0-based into _monitors
def __enter__(self):
try:
self._sct = mss()
self._monitors = self._sct.monitors[1:] # skip [0] = virtual all-screens
except Exception as e:
log.warning(f"Screen capture unavailable: {e}")
self._sct = None
self._monitors = []
return self
def __exit__(self, *args):
if self._sct:
self._sct.close()
@property
def available(self) -> bool:
return self._sct is not None
def set_monitor(self, idx: int):
if 0 <= idx < len(self._monitors):
self._monitor_idx = idx
def set_quality(self, quality: int):
self.quality = max(10, min(95, quality))
fps_map = {"high": 15, "medium": 10, "low": 5}
# quality levels sent from viewer as strings
if isinstance(quality, str):
self.fps = fps_map.get(quality, 15)
self.quality = {"high": 70, "medium": 55, "low": 35}.get(quality, 70)
else:
self.quality = max(10, min(95, quality))
self._frame_delay = 1.0 / self.fps
def get_monitor_list(self) -> list:
return [
{
"index": i,
"width": m["width"],
"height": m["height"],
"left": m["left"],
"top": m["top"],
"primary": i == 0,
}
for i, m in enumerate(self._monitors)
]
def capture(self) -> Optional[bytes]:
if not self._sct or not self._monitors:
return None
try:
monitor = self._monitors[self._monitor_idx]
img = self._sct.grab(monitor)
pil = Image.frombytes("RGB", img.size, img.bgra, "raw", "BGRX")
buf = BytesIO()
pil.save(buf, format="JPEG", quality=self.quality, optimize=False)
return buf.getvalue()
except Exception as e:
log.warning(f"Frame capture failed: {e}")
return None
@property
def frame_delay(self) -> float:
return self._frame_delay
# ── Input control ─────────────────────────────────────────────────────────────
class InputController:
"""Replay mouse and keyboard events on the local machine."""
def __init__(self):
self._mouse = None
self._keyboard = None
self._available = False
try:
from pynput.mouse import Button, Controller as MouseController
from pynput.keyboard import Key, Controller as KeyboardController
self._mouse = MouseController()
self._keyboard = KeyboardController()
self._Button = Button
self._Key = Key
self._available = True
except Exception as e:
log.warning(f"Input control unavailable: {e}")
def handle(self, event: dict):
if not self._available:
return
try:
t = event.get("type")
if t == "mouse_move":
self._mouse.position = (event["x"], event["y"])
elif t == "mouse_click":
self._mouse.position = (event["x"], event["y"])
btn = self._Button.right if event.get("button") == "right" else self._Button.left
if event.get("double"):
self._mouse.click(btn, 2)
else:
self._mouse.click(btn, 1)
elif t == "mouse_scroll":
self._mouse.scroll(event.get("dx", 0), event.get("dy", 0))
elif t == "key_press":
self._keyboard.type(event.get("key", ""))
elif t == "key_special":
key_name = event.get("key", "")
try:
key = getattr(self._Key, key_name)
self._keyboard.press(key)
self._keyboard.release(key)
except AttributeError:
pass
elif t == "exec_key_combo":
# e.g. {"type":"exec_key_combo","keys":["ctrl_l","alt_l","delete"]}
keys = event.get("keys", [])
pressed = []
for k in keys:
try:
key_obj = getattr(self._Key, k)
self._keyboard.press(key_obj)
pressed.append(key_obj)
except AttributeError:
pass
for key_obj in reversed(pressed):
self._keyboard.release(key_obj)
except Exception as e:
log.debug(f"Input error: {e}")
# ── Script execution ──────────────────────────────────────────────────────────
async def exec_script(script: str, shell: str, session_id: str, ws) -> None:
"""Execute a script and stream output back through the WebSocket."""
exec_id = str(time.time())
if platform.system() == "Windows":
if shell == "powershell":
cmd = ["powershell", "-NonInteractive", "-Command", script]
else:
cmd = ["cmd", "/c", script]
else:
cmd = ["bash", "-c", script]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
async for line in proc.stdout:
await ws.send(json.dumps({
"type": "script_output",
"session_id": session_id,
"id": exec_id,
"output": line.decode(errors="replace"),
"done": False,
}))
await proc.wait()
await ws.send(json.dumps({
"type": "script_output",
"session_id": session_id,
"id": exec_id,
"output": "",
"done": True,
"exit_code": proc.returncode,
}))
except Exception as e:
await ws.send(json.dumps({
"type": "script_output",
"session_id": session_id,
"id": exec_id,
"output": f"Error: {e}\n",
"done": True,
"exit_code": -1,
}))
# ── Clipboard helpers ─────────────────────────────────────────────────────────
def _get_clipboard() -> str:
try:
import pyperclip
return pyperclip.paste() or ""
except Exception:
return ""
def _set_clipboard(text: str):
try:
import pyperclip
pyperclip.copy(text)
except Exception:
pass
# ── File transfer helpers ─────────────────────────────────────────────────────
CHUNK_SIZE = 65536 # 64 KB per chunk
async def send_file(path_str: str, session_id: str, ws) -> None:
"""Read a file and send it to the viewer as base64 chunks."""
try:
path = Path(path_str).expanduser().resolve()
if not path.is_file():
await ws.send(json.dumps({
"type": "file_chunk", "session_id": session_id,
"error": f"File not found: {path_str}", "done": True,
}))
return
total = path.stat().st_size
filename = path.name
seq = 0
with open(path, "rb") as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
await ws.send(json.dumps({
"type": "file_chunk",
"session_id": session_id,
"filename": filename,
"size": total,
"seq": seq,
"chunk": base64.b64encode(chunk).decode(),
"done": False,
}))
seq += 1
await ws.send(json.dumps({
"type": "file_chunk",
"session_id": session_id,
"filename": filename,
"size": total,
"seq": seq,
"chunk": "",
"done": True,
}))
log.info(f"Sent file {filename} ({total} bytes) to viewer")
except Exception as e:
await ws.send(json.dumps({
"type": "file_chunk", "session_id": session_id,
"error": str(e), "done": True,
}))
async def list_files(path_str: str, session_id: str, ws) -> None:
"""Send a directory listing to the viewer."""
try:
path = Path(path_str or "~").expanduser().resolve()
if not path.is_dir():
path = path.parent
entries = []
for entry in sorted(path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())):
try:
stat = entry.stat()
entries.append({
"name": entry.name,
"is_dir": entry.is_dir(),
"size": stat.st_size if entry.is_file() else 0,
"path": str(entry),
})
except PermissionError:
pass
await ws.send(json.dumps({
"type": "file_list",
"session_id": session_id,
"path": str(path),
"parent": str(path.parent) if path.parent != path else None,
"entries": entries,
}))
except Exception as e:
await ws.send(json.dumps({
"type": "file_list", "session_id": session_id,
"path": path_str, "entries": [], "error": str(e),
}))
async def _prompt_user_consent(timeout: int = 30) -> bool:
"""Show a local dialog asking the user to accept the incoming connection.
Returns True if accepted within timeout, False otherwise."""
try:
if platform.system() == "Windows":
# Use PowerShell MessageBox
script = (
"[System.Reflection.Assembly]::LoadWithPartialName('System.Windows.Forms') | Out-Null;"
"$r = [System.Windows.Forms.MessageBox]::Show("
"'A technician is requesting remote access to this computer. Allow?',"
"'RemoteLink — Incoming Connection',"
"[System.Windows.Forms.MessageBoxButtons]::YesNo,"
"[System.Windows.Forms.MessageBoxIcon]::Question);"
"exit ($r -eq 'Yes' ? 0 : 1)"
)
proc = await asyncio.create_subprocess_exec(
"powershell", "-NonInteractive", "-Command", script,
)
try:
await asyncio.wait_for(proc.wait(), timeout=timeout)
return proc.returncode == 0
except asyncio.TimeoutError:
proc.kill()
return False
elif platform.system() == "Linux":
# Try zenity (GNOME), then kdialog (KDE), then xterm fallback
for cmd in [
["zenity", "--question", "--title=RemoteLink", f"--text=A technician is requesting remote access. Allow?", f"--timeout={timeout}"],
["kdialog", "--yesno", "A technician is requesting remote access. Allow?", "--title", "RemoteLink"],
]:
try:
proc = await asyncio.create_subprocess_exec(*cmd)
await asyncio.wait_for(proc.wait(), timeout=timeout + 5)
return proc.returncode == 0
except (FileNotFoundError, asyncio.TimeoutError):
continue
# No GUI available — log and deny
log.warning("Attended mode: no GUI dialog available, denying connection")
return False
else:
return True # macOS: always allow for now
except Exception as e:
log.warning(f"Consent dialog error: {e}")
return False
def get_mac_address() -> str:
mac = uuid_lib.getnode()
return ":".join(f"{(mac >> (8 * i)) & 0xff:02X}" for i in reversed(range(6)))
# ── Main agent loop ───────────────────────────────────────────────────────────
class Agent:
def __init__(self, server_url: str, machine_id: str, access_key: str, relay_url: str):
self.server_url = server_url
self.machine_id = machine_id
self.access_key = access_key
self.relay_url = relay_url.rstrip("/")
self._streaming = False
self._active_session: Optional[str] = None
self._input = InputController()
self._stop_event = asyncio.Event()
self._screen: Optional[ScreenCapture] = None
self._attended_mode: bool = False # set via --attended flag
# file upload buffers: transfer_id → {filename, dest, chunks[]}
self._uploads: dict = {}
async def run(self):
log.info(f"Agent starting. Machine ID: {self.machine_id}")
log.info(f"Connecting to relay: {self.relay_url}")
# Heartbeat loop in background
asyncio.create_task(self._heartbeat_loop())
# Connect to relay (with reconnect)
while not self._stop_event.is_set():
try:
await self._connect()
except Exception as e:
log.warning(f"Relay disconnected: {e}. Reconnecting in 5s…")
await asyncio.sleep(5)
async def _heartbeat_loop(self):
while not self._stop_event.is_set():
await heartbeat(self.server_url, self.access_key)
await asyncio.sleep(30)
async def _connect(self):
ws_url = (
f"{self.relay_url}/ws/agent"
f"?machine_id={self.machine_id}&access_key={self.access_key}"
)
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as ws:
log.info("Connected to relay")
await self._message_loop(ws)
async def _clipboard_poll(self, ws):
"""Poll local clipboard every 2s; send changes to active viewer."""
last = _get_clipboard()
while not self._stop_event.is_set():
await asyncio.sleep(2)
if not self._active_session:
continue
current = _get_clipboard()
if current != last and current:
last = current
try:
await ws.send(json.dumps({
"type": "clipboard_content",
"session_id": self._active_session,
"content": current,
}))
except Exception:
break
async def _message_loop(self, ws):
with ScreenCapture(fps=15, quality=60) as screen:
self._screen = screen
if not screen.available:
log.warning(
"Screen capture unavailable — agent will stay connected "
"but cannot stream. Check that $DISPLAY is set."
)
stream_task: Optional[asyncio.Task] = None
clipboard_task: Optional[asyncio.Task] = None
async def stream_frames():
while self._streaming and not self._stop_event.is_set():
t0 = time.monotonic()
frame = screen.capture()
if frame:
try:
await ws.send(frame)
except Exception:
break
elapsed = time.monotonic() - t0
delay = max(0, screen.frame_delay - elapsed)
await asyncio.sleep(delay)
try:
async for raw_msg in ws:
if isinstance(raw_msg, bytes):
continue # agents don't receive binary
try:
msg = json.loads(raw_msg)
except json.JSONDecodeError:
continue
msg_type = msg.get("type")
if msg_type == "start_stream":
session_id = msg.get("session_id")
log.info(f"Viewer connected — session {session_id}")
# Attended mode: ask local user for consent
if self._attended_mode:
accepted = await _prompt_user_consent()
if not accepted:
await ws.send(json.dumps({
"type": "error",
"session_id": session_id,
"message": "Remote session denied by local user.",
}))
continue
self._streaming = True
self._active_session = session_id
if not screen.available:
await ws.send(json.dumps({
"type": "error",
"message": "Screen capture unavailable (no display). Set $DISPLAY and restart the agent.",
}))
# Send monitor list to viewer
await ws.send(json.dumps({
"type": "monitor_list",
"session_id": session_id,
"monitors": screen.get_monitor_list(),
}))
if stream_task and not stream_task.done():
stream_task.cancel()
stream_task = asyncio.create_task(stream_frames())
# Start clipboard polling
if clipboard_task and not clipboard_task.done():
clipboard_task.cancel()
clipboard_task = asyncio.create_task(self._clipboard_poll(ws))
elif msg_type == "stop_stream":
log.info("Viewer disconnected — stopping stream")
self._streaming = False
self._active_session = None
if stream_task and not stream_task.done():
stream_task.cancel()
if clipboard_task and not clipboard_task.done():
clipboard_task.cancel()
elif msg_type == "set_monitor":
screen.set_monitor(msg.get("index", 0))
log.info(f"Switched to monitor {msg.get('index', 0)}")
elif msg_type == "set_quality":
q = msg.get("quality", "high")
quality_map = {"high": 70, "medium": 55, "low": 35}
fps_map = {"high": 15, "medium": 10, "low": 5}
screen.quality = quality_map.get(q, 70)
screen.fps = fps_map.get(q, 15)
screen._frame_delay = 1.0 / screen.fps
elif msg_type in ("mouse_move", "mouse_click", "mouse_scroll",
"key_press", "key_special", "exec_key_combo"):
self._input.handle(msg)
elif msg_type == "clipboard_paste":
_set_clipboard(msg.get("content", ""))
elif msg_type == "exec_script":
asyncio.create_task(exec_script(
msg.get("script", ""),
msg.get("shell", "bash"),
msg.get("session_id", ""),
ws,
))
elif msg_type == "file_download":
asyncio.create_task(send_file(
msg.get("path", ""),
msg.get("session_id", self._active_session or ""),
ws,
))
elif msg_type == "list_files":
asyncio.create_task(list_files(
msg.get("path", "~"),
msg.get("session_id", self._active_session or ""),
ws,
))
elif msg_type == "file_upload_start":
tid = msg.get("transfer_id", "")
filename = Path(msg.get("filename", "upload")).name
dest = Path(msg.get("dest_path", "~")).expanduser() / filename
self._uploads[tid] = {"dest": dest, "chunks": []}
log.info(f"File upload starting: {filename}{dest}")
elif msg_type == "file_upload_chunk":
tid = msg.get("transfer_id", "")
if tid in self._uploads:
chunk = base64.b64decode(msg.get("chunk", ""))
self._uploads[tid]["chunks"].append(chunk)
elif msg_type == "file_upload_end":
tid = msg.get("transfer_id", "")
if tid in self._uploads:
info = self._uploads.pop(tid)
dest: Path = info["dest"]
dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "wb") as f:
for chunk in info["chunks"]:
f.write(chunk)
log.info(f"File upload complete: {dest}")
await ws.send(json.dumps({
"type": "file_chunk",
"session_id": self._active_session or "",
"transfer_id": tid,
"done": True,
"upload_complete": True,
"path": str(dest),
}))
elif msg_type == "ping":
await ws.send(json.dumps({"type": "pong"}))
finally:
self._streaming = False
self._screen = None
if stream_task and not stream_task.done():
stream_task.cancel()
if clipboard_task and not clipboard_task.done():
clipboard_task.cancel()
def stop(self):
self._stop_event.set()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _default_relay_url(server_url: str) -> str:
"""Derive the relay WebSocket URL from a server HTTP URL.
Strips any existing port, swaps the scheme, and appends :8765.
e.g. http://10.10.20.70:3000 → ws://10.10.20.70:8765
https://remotelink.example.com → wss://remotelink.example.com:8765
"""
from urllib.parse import urlparse, urlunparse
parsed = urlparse(server_url)
ws_scheme = "wss" if parsed.scheme == "https" else "ws"
# netloc may include a port — strip it, use only the hostname
host = parsed.hostname
return f"{ws_scheme}://{host}:8765"
# ── Entry point ───────────────────────────────────────────────────────────────
def setup_logging(verbose: bool):
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
# Also log to file
log_dir = config_dir()
log_dir.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(log_dir / "agent.log")
fh.setLevel(level)
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logging.getLogger().addHandler(fh)
async def main():
parser = argparse.ArgumentParser(description="RemoteLink Agent")
parser.add_argument("--server", help="Server URL (e.g. https://remotelink.example.com)")
parser.add_argument("--relay", help="Relay WebSocket URL (e.g. ws://remotelink.example.com:8765)")
parser.add_argument("--enroll", metavar="TOKEN", help="Enrollment token for first-time registration")
parser.add_argument("--run-once", action="store_true", help="Exit after first session ends")
parser.add_argument("--attended", action="store_true", help="Prompt local user to accept each incoming connection")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
setup_logging(args.verbose)
config = load_config()
# ── First-time registration ──────────────────────────────────────────────
if args.server and not config:
# Register with enrollment token (if provided) or open enrollment
log.info(f"Registering with server {args.server}")
reg = await register(args.server, args.enroll or None)
relay_url = args.relay or _default_relay_url(args.server)
config = {
"server_url": args.server,
"relay_url": relay_url,
"machine_id": reg["machineId"],
"access_key": reg["accessKey"],
}
if not args.run_once:
save_config(config)
elif args.enroll:
# Re-enroll existing machine with a new token (replaces config)
if not args.server:
log.error("--server is required with --enroll")
sys.exit(1)
log.info(f"Re-enrolling with server {args.server}")
reg = await register(args.server, args.enroll)
relay_url = args.relay or _default_relay_url(args.server)
config = {
"server_url": args.server,
"relay_url": relay_url,
"machine_id": reg["machineId"],
"access_key": reg["accessKey"],
}
if not args.run_once:
save_config(config)
elif not config:
log.error(
f"No config found at {CONFIG_FILE}.\n"
"Run with --server <url> to register (or --server <url> --enroll <token> if enrollment tokens are required)."
)
sys.exit(1)
server_url = config["server_url"]
relay_url = config.get("relay_url") or _default_relay_url(server_url)
machine_id = config["machine_id"]
access_key = config["access_key"]
agent = Agent(server_url, machine_id, access_key, relay_url)
agent._attended_mode = args.attended
# Handle Ctrl+C / SIGTERM gracefully
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, agent.stop)
except NotImplementedError:
pass # Windows
log.info("RemoteLink Agent running. Press Ctrl+C to stop.")
await agent.run()
if __name__ == "__main__":
asyncio.run(main())