chore: add demo data seeder and config editor smoke test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jason Staack
2026-03-16 18:37:28 -05:00
parent 133e6e50e4
commit 1a463cfa84
2 changed files with 963 additions and 0 deletions

412
scripts/seed_demo_data.py Normal file
View File

@@ -0,0 +1,412 @@
#!/usr/bin/env python3
"""Seed TOD database with 90s movie-themed demo data for screenshots.
Creates multiple tenants with realistic MikroTik devices, health metrics,
interface traffic, alert rules, and triggered alerts. All data is movie-themed
with The Big Lebowski as the primary tenant.
Usage:
docker exec tod_postgres psql -U postgres -d tod_screenshots < seed_demo.sql
-- OR --
python3 seed_demo_data.py | docker exec -i tod_postgres psql -U postgres -d tod_screenshots
This script generates SQL. Pipe it into psql.
"""
import random
import uuid
from datetime import datetime, timedelta, timezone
now = datetime.now(timezone.utc)
# ── Tenants ──────────────────────────────────────────────────────────────────
tenants = [
{"id": str(uuid.uuid4()), "name": "Lebowski Lanes", "desc": "The Dude's bowling alley network — the rug really ties the room together"},
{"id": str(uuid.uuid4()), "name": "The Stranger's Ranch", "desc": "Sam Elliott's cowboy operation — sometimes there's a man"},
{"id": str(uuid.uuid4()), "name": "Maude's Gallery", "desc": "Maude Lebowski's art studio and exhibition network"},
{"id": str(uuid.uuid4()), "name": "Jackie Treehorn Productions", "desc": "Jackie Treehorn's Malibu beach house production network"},
{"id": str(uuid.uuid4()), "name": "Sobchak Security", "desc": "Walter's security consulting firm — am I wrong?"},
]
# ── MikroTik Models ──────────────────────────────────────────────────────────
models = {
"core": [
("CCR2004-1G-12S+2XS", "tile", "7.16.2", "arm64"),
("CCR2116-12G-4S+", "tile", "7.16.2", "tile"),
("CCR1036-8G-2S+", "tile", "7.15.3", "tile"),
("RB5009UG+S+IN", "arm64", "7.16.2", "arm64"),
],
"switch": [
("CRS326-24G-2S+RM", "arm", "7.16.2", "arm"),
("CRS328-24P-4S+RM", "arm", "7.15.3", "arm"),
("CRS312-4C+8XG-RM", "arm", "7.16.2", "arm"),
("CRS354-48G-4S+2Q+RM", "arm", "7.16.1", "arm"),
],
"ap": [
("cAP ax", "arm64", "7.16.2", "arm64"),
("hAP ax3", "arm64", "7.16.2", "arm64"),
("hAP ax2", "arm", "7.16.2", "arm"),
("cAP ac", "arm", "7.15.3", "arm"),
("wAP ac", "arm", "7.16.2", "arm"),
],
"router": [
("hEX S", "mmips", "7.16.2", "mipsbe"),
("hEX", "mmips", "7.15.3", "mipsbe"),
("RB4011iGS+5HacQ2HnD-IN", "arm64", "7.16.2", "arm64"),
("RB3011UiAS-RM", "arm", "7.16.1", "arm"),
],
"outdoor": [
("SXTsq 5 ac", "arm", "7.16.2", "arm"),
("LHG XL 5 ac", "arm", "7.15.3", "arm"),
("NetMetal ac2", "arm", "7.16.2", "arm"),
],
}
# ── Device Names per Tenant ──────────────────────────────────────────────────
device_templates = {
"Lebowski Lanes": {
"core": ["dude-core-01", "dude-core-02"],
"switch": ["lanes-sw-{:02d}".format(i) for i in range(1, 7)],
"ap": ["bowling-ap-{:02d}".format(i) for i in range(1, 13)] + ["bar-ap-{:02d}".format(i) for i in range(1, 5)],
"router": ["dude-gw-01", "dude-gw-02", "donny-edge-01", "walter-edge-01", "maude-edge-01"],
"outdoor": ["parking-ptp-01", "parking-ptp-02", "roof-backhaul-01"],
},
"The Stranger's Ranch": {
"core": ["stranger-core-01"],
"switch": ["ranch-sw-{:02d}".format(i) for i in range(1, 5)],
"ap": ["barn-ap-{:02d}".format(i) for i in range(1, 7)] + ["lodge-ap-{:02d}".format(i) for i in range(1, 4)],
"router": ["stranger-gw-01", "ranch-edge-01", "trail-edge-01"],
"outdoor": ["pasture-ptp-01", "pasture-ptp-02", "hilltop-backhaul-01", "creek-ptp-01"],
},
"Maude's Gallery": {
"core": ["maude-core-01"],
"switch": ["gallery-sw-{:02d}".format(i) for i in range(1, 4)],
"ap": ["exhibit-ap-{:02d}".format(i) for i in range(1, 9)] + ["studio-ap-{:02d}".format(i) for i in range(1, 3)],
"router": ["maude-gw-01", "gallery-edge-01"],
"outdoor": ["sculpture-garden-ptp-01"],
},
"Jackie Treehorn Productions": {
"core": ["treehorn-core-01", "treehorn-core-02"],
"switch": ["malibu-sw-{:02d}".format(i) for i in range(1, 6)],
"ap": ["beach-ap-{:02d}".format(i) for i in range(1, 11)] + ["studio-ap-{:02d}".format(i) for i in range(1, 6)],
"router": ["treehorn-gw-01", "treehorn-gw-02", "cabana-edge-01", "pool-edge-01"],
"outdoor": ["beach-ptp-01", "beach-ptp-02", "cliff-backhaul-01"],
},
"Sobchak Security": {
"core": ["sobchak-core-01"],
"switch": ["hq-sw-{:02d}".format(i) for i in range(1, 4)],
"ap": ["office-ap-{:02d}".format(i) for i in range(1, 7)] + ["warehouse-ap-{:02d}".format(i) for i in range(1, 4)],
"router": ["walter-gw-01", "smokey-edge-01", "vietnam-edge-01"],
"outdoor": ["yard-ptp-01", "perimeter-ptp-01"],
},
}
# ── Coordinate clusters for geographic map ────────────────────────────────────
geo_clusters = {
"Lebowski Lanes": (34.0195, -118.4912), # LA / Venice Beach area
"The Stranger's Ranch": (35.3733, -118.9742), # Bakersfield area
"Maude's Gallery": (34.0522, -118.2437), # Downtown LA
"Jackie Treehorn Productions": (34.0259, -118.7798), # Malibu
"Sobchak Security": (34.1808, -118.3090), # Burbank
}
# ── SQL Generation ───────────────────────────────────────────────────────────
sql_lines = []
def q(s):
"""SQL-escape a string."""
return s.replace("'", "''")
def emit(line):
sql_lines.append(line)
emit("-- TOD Demo Data Seed — 90s Movie Themed")
emit("-- Generated for screenshot purposes")
emit("BEGIN;")
emit("")
# Clean existing demo data (keep system tenant)
emit("-- Clean slate (preserve system tenant and existing users)")
emit("DELETE FROM health_metrics;")
emit("DELETE FROM interface_metrics;")
emit("DELETE FROM wireless_metrics;")
emit("DELETE FROM alert_events;")
emit("DELETE FROM alert_rule_channels;")
emit("DELETE FROM alert_rules;")
emit("DELETE FROM notification_channels;")
emit("DELETE FROM config_templates;")
emit("DELETE FROM config_template_tags;")
emit("DELETE FROM device_tag_assignments;")
emit("DELETE FROM device_tags;")
emit("DELETE FROM device_group_memberships;")
emit("DELETE FROM device_groups;")
emit("DELETE FROM vpn_peers;")
emit("DELETE FROM vpn_config;")
emit("DELETE FROM devices WHERE tenant_id NOT IN (SELECT id FROM tenants WHERE name = 'System (Internal)');")
emit("DO $$ BEGIN DELETE FROM user_tenants WHERE tenant_id NOT IN (SELECT id FROM tenants WHERE name = 'System (Internal)'); EXCEPTION WHEN undefined_table THEN NULL; END $$;")
emit("DELETE FROM tenants WHERE name != 'System (Internal)';")
emit("")
# Create tenants
emit("-- Tenants")
for t in tenants:
emit(f"INSERT INTO tenants (id, name, description) VALUES ('{t['id']}', '{q(t['name'])}', '{q(t['desc'])}');")
emit("")
# Generate devices
all_devices = [] # (device_dict, tenant_id)
emit("-- Devices")
for t in tenants:
tname = t["name"]
tid = t["id"]
templates = device_templates[tname]
base_lat, base_lon = geo_clusters[tname]
subnet_base = random.randint(1, 200)
for role, hostnames in templates.items():
model_list = models[role]
for hostname in hostnames:
dev_id = str(uuid.uuid4())
model_info = random.choice(model_list)
model_name, arch, fw_ver, _ = model_info
# Generate realistic IP
ip = f"10.{subnet_base}.{random.randint(1,254)}.{random.randint(2,254)}"
# Status: 85% online, 10% degraded, 5% offline
r = random.random()
if r < 0.85:
status = "online"
elif r < 0.95:
status = "degraded"
else:
status = "offline"
# Uptime: 1 hour to 90 days
uptime = random.randint(3600, 7776000)
# Last seen: online=recent, degraded=minutes ago, offline=hours ago
if status == "online":
last_seen = now - timedelta(seconds=random.randint(5, 60))
elif status == "degraded":
last_seen = now - timedelta(minutes=random.randint(2, 15))
else:
last_seen = now - timedelta(hours=random.randint(1, 48))
# CPU/memory snapshots
cpu = random.randint(5, 45) if status == "online" else (random.randint(60, 95) if status == "degraded" else 0)
mem_pct = random.randint(20, 65) if status == "online" else (random.randint(70, 95) if status == "degraded" else 0)
# Geo: scatter around cluster center
lat = base_lat + random.uniform(-0.02, 0.02)
lon = base_lon + random.uniform(-0.02, 0.02)
# RouterOS major version
ros_major = int(fw_ver.split(".")[0])
dev = {
"id": dev_id, "tenant_id": tid, "hostname": hostname,
"ip": ip, "model": model_name, "firmware": fw_ver,
"arch": arch, "status": status, "uptime": uptime,
"last_seen": last_seen, "cpu": cpu, "mem_pct": mem_pct,
"lat": lat, "lon": lon, "ros_major": ros_major, "role": role,
}
all_devices.append(dev)
emit(f"INSERT INTO devices (id, tenant_id, hostname, ip_address, model, firmware_version, routeros_version, architecture, status, uptime_seconds, last_seen, last_cpu_load, last_memory_used_pct, latitude, longitude, routeros_major_version) VALUES ('{dev_id}', '{tid}', '{q(hostname)}', '{ip}', '{q(model_name)}', '{fw_ver}', '{fw_ver}', '{arch}', '{status}', {uptime}, '{last_seen.isoformat()}', {cpu}, {mem_pct}, {lat:.6f}, {lon:.6f}, {ros_major});")
emit("")
emit(f"-- Total devices: {len(all_devices)}")
emit("")
# Health metrics — 48 hours of data, every 5 minutes per device
emit("-- Health metrics (48h, 5-minute intervals)")
for dev in all_devices:
if dev["status"] == "offline":
# Only generate data up to when device went offline
hours_offline = random.randint(1, 48)
data_end = now - timedelta(hours=hours_offline)
else:
data_end = now
# Generate every 5 minutes for 48 hours = 576 points per device
# That's too much SQL — do every 15 minutes = 192 points
t_cursor = data_end - timedelta(hours=48)
base_cpu = random.randint(8, 30)
base_mem_free_pct = random.uniform(0.35, 0.70)
if dev["model"].startswith("CCR"):
total_mem = 4 * 1024 * 1024 * 1024 # 4GB
total_disk = 512 * 1024 * 1024 # 512MB
elif dev["model"].startswith("RB5009"):
total_mem = 1024 * 1024 * 1024 # 1GB
total_disk = 1024 * 1024 * 1024 # 1GB
elif dev["model"].startswith("CRS"):
total_mem = 512 * 1024 * 1024 # 512MB
total_disk = 128 * 1024 * 1024
else:
total_mem = 256 * 1024 * 1024 # 256MB
total_disk = 128 * 1024 * 1024
while t_cursor <= data_end:
# Add some daily pattern: higher CPU during business hours
hour = t_cursor.hour
if 9 <= hour <= 17:
cpu_bump = random.randint(5, 20)
elif 18 <= hour <= 22:
cpu_bump = random.randint(2, 10)
else:
cpu_bump = 0
cpu = min(95, base_cpu + cpu_bump + random.randint(-5, 5))
free_mem = int(total_mem * (base_mem_free_pct + random.uniform(-0.05, 0.05)))
free_disk = int(total_disk * random.uniform(0.3, 0.7))
temp = random.randint(38, 58)
emit(f"INSERT INTO health_metrics (time, device_id, tenant_id, cpu_load, free_memory, total_memory, free_disk, total_disk, temperature) VALUES ('{t_cursor.isoformat()}', '{dev['id']}', '{dev['tenant_id']}', {cpu}, {free_mem}, {total_mem}, {free_disk}, {total_disk}, {temp});")
t_cursor += timedelta(minutes=15)
emit("")
# Interface metrics — 48 hours, every 15 minutes
emit("-- Interface metrics (48h, 15-minute intervals)")
interfaces_by_role = {
"core": ["ether1", "ether2", "sfp-sfpplus1", "sfp-sfpplus2"],
"switch": ["ether1", "ether2", "ether3", "sfp-sfpplus1"],
"ap": ["ether1", "wlan1", "wlan2"],
"router": ["ether1", "ether2", "ether3"],
"outdoor": ["ether1", "wlan1"],
}
for dev in all_devices:
if dev["status"] == "offline":
hours_offline = random.randint(1, 48)
data_end = now - timedelta(hours=hours_offline)
else:
data_end = now
ifaces = interfaces_by_role.get(dev["role"], ["ether1"])
t_cursor = data_end - timedelta(hours=48)
while t_cursor <= data_end:
hour = t_cursor.hour
# Traffic pattern: higher during business hours
if 9 <= hour <= 17:
traffic_mult = random.uniform(0.6, 1.0)
elif 18 <= hour <= 22:
traffic_mult = random.uniform(0.3, 0.7)
else:
traffic_mult = random.uniform(0.05, 0.3)
for iface in ifaces:
if iface.startswith("wlan"):
base_bps = 50_000_000 # 50Mbps base for wireless
elif iface.startswith("sfp"):
base_bps = 500_000_000 # 500Mbps base for SFP
else:
base_bps = 100_000_000 # 100Mbps base for ethernet
rx_bps = int(base_bps * traffic_mult * random.uniform(0.3, 1.0))
tx_bps = int(base_bps * traffic_mult * random.uniform(0.2, 0.8))
rx_bytes = rx_bps * 900 # 15 min in seconds
tx_bytes = tx_bps * 900
emit(f"INSERT INTO interface_metrics (time, device_id, tenant_id, interface, rx_bytes, tx_bytes, rx_bps, tx_bps) VALUES ('{t_cursor.isoformat()}', '{dev['id']}', '{dev['tenant_id']}', '{iface}', {rx_bytes}, {tx_bytes}, {rx_bps}, {tx_bps});")
t_cursor += timedelta(minutes=15)
emit("")
# Alert rules and events
emit("-- Alert rules")
for t in tenants:
tid = t["id"]
rules = [
(str(uuid.uuid4()), "CPU High", "cpu_load", ">", 80, "warning"),
(str(uuid.uuid4()), "CPU Critical", "cpu_load", ">", 95, "critical"),
(str(uuid.uuid4()), "Memory Low", "memory_used_pct", ">", 90, "warning"),
(str(uuid.uuid4()), "Device Offline", "status", "=", 0, "critical"),
(str(uuid.uuid4()), "High Temperature", "temperature", ">", 65, "warning"),
]
for rid, name, metric, op, thresh, sev in rules:
emit(f"INSERT INTO alert_rules (id, tenant_id, name, metric, operator, threshold, severity, enabled) VALUES ('{rid}', '{tid}', '{q(name)}', '{metric}', '{op}', {thresh}, '{sev}', true);")
# Generate some fired alerts for degraded/offline devices in this tenant
tenant_devices = [d for d in all_devices if d["tenant_id"] == tid]
for dev in tenant_devices:
hostname_escaped = q(dev["hostname"])
dev_id = dev["id"]
if dev["status"] == "degraded":
rule = rules[0]
cpu_val = random.randint(81, 94)
fired_at = (now - timedelta(minutes=random.randint(5, 120))).isoformat()
msg = f"CPU load exceeded 80% on {hostname_escaped}"
emit(f"INSERT INTO alert_events (device_id, tenant_id, rule_id, status, severity, metric, value, threshold, message, fired_at) VALUES ('{dev_id}', '{tid}', '{rule[0]}', 'firing', 'warning', 'cpu_load', {cpu_val}, 80, '{msg}', '{fired_at}');")
elif dev["status"] == "offline":
rule = rules[3]
fired_at = (now - timedelta(hours=random.randint(1, 24))).isoformat()
msg = f"{hostname_escaped} is offline"
emit(f"INSERT INTO alert_events (device_id, tenant_id, rule_id, status, severity, metric, value, threshold, message, fired_at) VALUES ('{dev_id}', '{tid}', '{rule[0]}', 'firing', 'critical', 'status', 0, 0, '{msg}', '{fired_at}');")
# A few resolved alerts
for _ in range(random.randint(3, 8)):
dev = random.choice(tenant_devices)
rule = random.choice(rules[:3])
fired = now - timedelta(hours=random.randint(6, 72))
resolved = fired + timedelta(minutes=random.randint(5, 120))
hostname_escaped = q(dev["hostname"])
rule_name = q(rule[1])
val = random.randint(int(rule[4])+1, int(rule[4])+20)
msg = f"{rule_name} on {hostname_escaped}"
emit(f"INSERT INTO alert_events (device_id, tenant_id, rule_id, status, severity, metric, value, threshold, message, fired_at, resolved_at) VALUES ('{dev['id']}', '{tid}', '{rule[0]}', 'resolved', '{rule[5]}', '{rule[2]}', {val}, {rule[4]}, '{msg}', '{fired.isoformat()}', '{resolved.isoformat()}');")
emit("")
# Device tags
emit("-- Device tags")
tag_names = ["core", "distribution", "access", "outdoor", "wireless", "managed", "critical", "monitoring"]
for t in tenants:
for tag in tag_names:
tag_id = str(uuid.uuid4())
emit(f"INSERT INTO device_tags (id, tenant_id, name) VALUES ('{tag_id}', '{t['id']}', '{tag}');")
emit("")
# Notification channels
emit("-- Notification channels")
for t in tenants:
ch_id = str(uuid.uuid4())
emit(f"INSERT INTO notification_channels (id, tenant_id, name, channel_type, webhook_url) VALUES ('{ch_id}', '{t['id']}', 'Slack Alerts', 'webhook', 'https://hooks.slack.com/services/DEMO/DEMO/demo');")
ch_id2 = str(uuid.uuid4())
noc_domain = t["name"].lower().replace(" ", "").replace("'", "")
emit(f"INSERT INTO notification_channels (id, tenant_id, name, channel_type, to_address, from_address) VALUES ('{ch_id2}', '{t['id']}', 'Email NOC', 'email', 'noc@{noc_domain}.net', 'alerts@theotherdude.net');")
emit("")
# Config templates
emit("-- Config templates")
template_data = [
("Base Security Hardening", "/ip firewall filter\nadd chain=input action=drop protocol=tcp dst-port=23 comment=\"Block Telnet\"\nadd chain=input action=drop protocol=tcp dst-port=20-21 comment=\"Block FTP\"\n/ip service\nset telnet disabled=yes\nset ftp disabled=yes\n"),
("SNMP Monitoring Setup", "/snmp\nset enabled=yes contact=\"{{ contact_email }}\" location=\"{{ location }}\"\n/snmp community\nadd name={{ community }} addresses={{ allowed_network }} security=none\n"),
("NTP Configuration", "/system ntp client\nset enabled=yes\n/system ntp client servers\nadd address=time.google.com\nadd address=time.cloudflare.com\n"),
]
for t in tenants:
for tpl_name, tpl_content in template_data:
tpl_id = str(uuid.uuid4())
emit(f"INSERT INTO config_templates (id, tenant_id, name, content, created_at) VALUES ('{tpl_id}', '{t['id']}', '{q(tpl_name)}', '{q(tpl_content)}', '{now.isoformat()}');")
emit("")
emit("COMMIT;")
emit("")
emit(f"-- Summary: {len(tenants)} tenants, {len(all_devices)} devices")
# Output
print("\n".join(sql_lines))

View File

@@ -0,0 +1,551 @@
#!/usr/bin/env python3
"""Config Editor smoke test — hits every menu path against a live device.
Authenticates to the TOD API, picks the first online device, and systematically
tests browse on every path from the MenuTree. Also tests add/edit/delete on
safe disposable paths, and tests the CLI execute endpoint.
Usage:
python3 scripts/test_config_editor.py [--base-url URL] [--device HOSTNAME]
Requires: requests (pip install requests)
"""
import argparse
import json
import sys
import time
import requests
# ── Config ──────────────────────────────────────────────────────────────────
DEFAULT_BASE_URL = "http://localhost:8001"
LOGIN_EMAIL = "admin@mikrotik-portal.dev"
LOGIN_PASSWORD = "changeme-in-production"
# Every leaf path from MenuTree.tsx (the ones users can actually click)
BROWSE_PATHS = [
# interface
"/interface",
"/interface/bridge",
"/interface/ethernet",
"/interface/vlan",
"/interface/wifi",
"/interface/bonding",
"/interface/list",
# ip
"/ip/address",
"/ip/route",
"/ip/dns",
"/ip/dhcp-client",
"/ip/dhcp-server",
"/ip/pool",
"/ip/service",
"/ip/neighbor",
# ip firewall
"/ip/firewall/filter",
"/ip/firewall/nat",
"/ip/firewall/mangle",
"/ip/firewall/raw",
"/ip/firewall/address-list",
"/ip/firewall/connection",
# system
"/system/identity",
"/system/clock",
"/system/ntp/client",
"/system/ntp/server",
"/system/resource",
"/system/routerboard",
"/system/scheduler",
"/system/script",
"/system/logging",
"/system/package",
# routing
"/routing/ospf/instance",
"/routing/ospf/area",
"/routing/ospf/interface-template",
"/routing/ospf/static-neighbor",
"/routing/bgp/connection",
"/routing/bgp/template",
"/routing/filter/rule",
"/routing/table",
"/routing/rule",
# queue
"/queue/simple",
"/queue/tree",
"/queue/type",
# tool
"/tool/bandwidth-server",
"/tool/e-mail",
"/tool/graphing",
"/tool/netwatch",
"/tool/sniffer",
# other
"/user",
"/snmp",
"/certificate",
]
# Paths that depend on device hardware/packages — one of these will work
# but not necessarily both. Warn instead of fail.
CONDITIONAL_PATHS = [
"/interface/wireless", # Legacy wireless package (ROS6 or ROS7 with wifi-qcom)
]
# Container / parent paths that the tree shows as expandable folders.
# These typically can't be /print'd directly in RouterOS, so we expect
# them to fail. We test them separately so you know which are real failures
# vs expected container failures.
CONTAINER_PATHS = [
"/ip",
"/ip/firewall",
"/system",
"/routing",
"/routing/ospf",
"/routing/bgp",
"/queue",
"/tool",
]
# Safe path for testing add/edit/delete (firewall address-list is disposable)
SAFE_WRITE_PATH = "/ip/firewall/address-list"
SAFE_ADD_PROPS = {"list": "tod-smoke-test", "address": "192.0.2.1", "comment": "TOD config editor smoke test — safe to delete"}
# ── Helpers ─────────────────────────────────────────────────────────────────
class Colors:
OK = "\033[92m"
FAIL = "\033[91m"
WARN = "\033[93m"
DIM = "\033[90m"
BOLD = "\033[1m"
END = "\033[0m"
def ok(msg):
print(f" {Colors.OK}PASS{Colors.END} {msg}")
def fail(msg, detail=""):
extra = f"{Colors.DIM}{detail}{Colors.END}" if detail else ""
print(f" {Colors.FAIL}FAIL{Colors.END} {msg}{extra}")
def warn(msg, detail=""):
extra = f"{Colors.DIM}{detail}{Colors.END}" if detail else ""
print(f" {Colors.WARN}WARN{Colors.END} {msg}{extra}")
def skip(msg, detail=""):
extra = f"{Colors.DIM}{detail}{Colors.END}" if detail else ""
print(f" {Colors.DIM}SKIP{Colors.END} {msg}{extra}")
def section(title):
print(f"\n{Colors.BOLD}{'' * 60}{Colors.END}")
print(f"{Colors.BOLD} {title}{Colors.END}")
print(f"{Colors.BOLD}{'' * 60}{Colors.END}")
# ── Main ────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Config editor smoke test")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--device", default=None, help="Hostname of device to test (default: first online)")
args = parser.parse_args()
base = args.base_url.rstrip("/")
session = requests.Session()
session.verify = True # use TLS
results = {"pass": 0, "fail": 0, "warn": 0, "skip": 0}
# ── 1. Login ────────────────────────────────────────────────────────────
section("Authentication")
resp = session.post(f"{base}/api/auth/login", json={
"email": LOGIN_EMAIL,
"password": LOGIN_PASSWORD,
})
if resp.status_code != 200:
fail(f"Login failed: HTTP {resp.status_code}")
print(f" Response: {resp.text[:200]}")
sys.exit(1)
auth_data = resp.json()
token = auth_data.get("access_token")
if not token:
fail("Login response missing access_token")
sys.exit(1)
session.headers["Authorization"] = f"Bearer {token}"
ok("Logged in")
results["pass"] += 1
# ── 2. Find tenant and device ───────────────────────────────────────────
section("Device Discovery")
# Get user info to find tenant
resp = session.get(f"{base}/api/auth/me")
if resp.status_code != 200:
fail(f"/api/auth/me failed: HTTP {resp.status_code}")
sys.exit(1)
me = resp.json()
tenant_id = me.get("tenant_id")
if not tenant_id:
# Super admin — need to pick a tenant that has devices
resp = session.get(f"{base}/api/tenants")
if resp.status_code != 200:
fail(f"/api/tenants failed: HTTP {resp.status_code}")
sys.exit(1)
tenants = resp.json()
if isinstance(tenants, dict):
tenants = tenants.get("tenants", tenants.get("data", []))
# Filter out system tenants, prefer tenants with devices
non_system = [t for t in tenants if "system" not in t.get("name", "").lower()]
if non_system:
tenant_id = non_system[0]["id"]
ok(f"Using tenant: {non_system[0].get('name', tenant_id)}")
elif tenants:
tenant_id = tenants[0]["id"]
ok(f"Using tenant: {tenants[0].get('name', tenant_id)}")
else:
fail("No tenants found")
sys.exit(1)
else:
ok(f"Tenant: {tenant_id}")
results["pass"] += 1
# Get devices to find an online device
resp = session.get(f"{base}/api/tenants/{tenant_id}/devices")
if resp.status_code != 200:
fail(f"Devices endpoint failed: HTTP {resp.status_code}")
sys.exit(1)
devices = resp.json()
if isinstance(devices, dict):
devices = devices.get("items", devices.get("devices", devices.get("data", [])))
online_devices = [d for d in devices if d.get("status") == "online"]
if not online_devices:
fail("No online devices found — config editor requires a live device")
sys.exit(1)
if args.device:
matches = [d for d in online_devices if d.get("hostname") == args.device]
if not matches:
fail(f"Device '{args.device}' not found or not online")
print(f" Online devices: {[d.get('hostname') for d in online_devices]}")
sys.exit(1)
device = matches[0]
else:
device = online_devices[0]
device_id = device["id"]
hostname = device.get("hostname", "unknown")
ok(f"Device: {hostname} ({device.get('ip_address', '?')}) — {device.get('model', '?')}")
print(f" {len(online_devices)} online device(s) available")
results["pass"] += 1
# ── 3. Browse every menu path ───────────────────────────────────────────
section(f"Browse Tests — {len(BROWSE_PATHS)} paths")
browse_results = {}
for path in BROWSE_PATHS:
try:
resp = session.get(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/browse",
params={"path": path},
timeout=20,
)
if resp.status_code == 200:
data = resp.json()
entries = data.get("entries", [])
n = len(entries)
cols = list(entries[0].keys()) if entries else []
col_summary = ", ".join(cols[:5]) + ("..." if len(cols) > 5 else "") if cols else "no columns"
ok(f"{path:45s} {n:3d} entries [{col_summary}]")
browse_results[path] = {"status": "pass", "entries": n}
results["pass"] += 1
elif resp.status_code == 403:
warn(f"{path:45s} BLOCKED (403)")
browse_results[path] = {"status": "blocked", "detail": resp.json().get("detail", "")}
results["warn"] += 1
elif resp.status_code == 502:
detail = ""
try:
detail = resp.json().get("detail", "")
except Exception:
detail = resp.text[:100]
fail(f"{path:45s} 502 Bad Gateway", detail)
browse_results[path] = {"status": "fail", "detail": detail}
results["fail"] += 1
else:
detail = resp.text[:100]
fail(f"{path:45s} HTTP {resp.status_code}", detail)
browse_results[path] = {"status": "fail", "detail": detail}
results["fail"] += 1
except requests.exceptions.Timeout:
fail(f"{path:45s} TIMEOUT (20s)")
browse_results[path] = {"status": "fail", "detail": "timeout"}
results["fail"] += 1
except Exception as e:
fail(f"{path:45s} ERROR", str(e))
browse_results[path] = {"status": "fail", "detail": str(e)}
results["fail"] += 1
# Small delay to avoid rate limiting
time.sleep(0.2)
# ── 3b. Conditional paths (hardware-dependent, warn on failure) ────────
section(f"Conditional Paths — {len(CONDITIONAL_PATHS)} paths (device-dependent)")
for path in CONDITIONAL_PATHS:
try:
resp = session.get(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/browse",
params={"path": path},
timeout=20,
)
if resp.status_code == 200:
data = resp.json()
entries = data.get("entries", [])
ok(f"{path:45s} {len(entries):3d} entries")
results["pass"] += 1
else:
detail = ""
try:
detail = resp.json().get("detail", "")[:80]
except Exception:
detail = resp.text[:80]
warn(f"{path:45s} not available on this device", detail)
results["warn"] += 1
except Exception as e:
warn(f"{path:45s} ERROR", str(e)[:80])
results["warn"] += 1
time.sleep(0.2)
# ── 4. Container paths (expected to fail on most RouterOS versions) ─────
section(f"Container Path Tests — {len(CONTAINER_PATHS)} paths (may fail, that's OK)")
for path in CONTAINER_PATHS:
try:
resp = session.get(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/browse",
params={"path": path},
timeout=20,
)
if resp.status_code == 200:
data = resp.json()
entries = data.get("entries", [])
ok(f"{path:45s} {len(entries):3d} entries (container works!)")
results["pass"] += 1
else:
detail = ""
try:
detail = resp.json().get("detail", "")[:80]
except Exception:
detail = resp.text[:80]
skip(f"{path:45s} HTTP {resp.status_code}", detail)
results["skip"] += 1
except Exception as e:
skip(f"{path:45s} ERROR", str(e)[:80])
results["skip"] += 1
time.sleep(0.2)
# ── 5. Add / Edit / Delete test ─────────────────────────────────────────
section("Write Operations (add → edit → delete)")
added_id = None
# Add
try:
resp = session.post(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/add",
json={"path": SAFE_WRITE_PATH, "properties": SAFE_ADD_PROPS},
timeout=20,
)
if resp.status_code == 200:
data = resp.json()
if data.get("success"):
# Try to find the ID of our new entry
ok(f"ADD {SAFE_WRITE_PATH} — success")
results["pass"] += 1
# Browse to find our entry's .id
time.sleep(0.5)
resp2 = session.get(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/browse",
params={"path": SAFE_WRITE_PATH},
timeout=20,
)
if resp2.status_code == 200:
for e in resp2.json().get("entries", []):
if e.get("comment") == SAFE_ADD_PROPS["comment"]:
added_id = e.get(".id")
break
if added_id:
ok(f" Found added entry: {added_id}")
else:
warn(" Could not find added entry by comment — will skip edit/delete")
else:
fail(f"ADD {SAFE_WRITE_PATH}", data.get("error", "unknown error"))
results["fail"] += 1
else:
detail = ""
try:
detail = resp.json().get("detail", "")
except Exception:
detail = resp.text[:100]
fail(f"ADD {SAFE_WRITE_PATH} — HTTP {resp.status_code}", detail)
results["fail"] += 1
except Exception as e:
fail(f"ADD {SAFE_WRITE_PATH}", str(e))
results["fail"] += 1
# Edit
if added_id:
try:
resp = session.post(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/set",
json={
"path": SAFE_WRITE_PATH,
"entry_id": added_id,
"properties": {"comment": "TOD smoke test — edited"},
},
timeout=20,
)
if resp.status_code == 200 and resp.json().get("success"):
ok(f"SET {SAFE_WRITE_PATH} {added_id} — success")
results["pass"] += 1
else:
detail = ""
try:
detail = resp.json().get("detail", resp.json().get("error", ""))
except Exception:
detail = resp.text[:100]
fail(f"SET {SAFE_WRITE_PATH} {added_id}", detail)
results["fail"] += 1
except Exception as e:
fail(f"SET {SAFE_WRITE_PATH} {added_id}", str(e))
results["fail"] += 1
else:
skip("SET — no entry ID from add step")
results["skip"] += 1
# Delete
if added_id:
try:
resp = session.post(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/remove",
json={
"path": SAFE_WRITE_PATH,
"entry_id": added_id,
},
timeout=20,
)
if resp.status_code == 200 and resp.json().get("success"):
ok(f"DEL {SAFE_WRITE_PATH} {added_id} — success (cleaned up)")
results["pass"] += 1
else:
detail = ""
try:
detail = resp.json().get("detail", resp.json().get("error", ""))
except Exception:
detail = resp.text[:100]
fail(f"DEL {SAFE_WRITE_PATH} {added_id}", detail)
results["fail"] += 1
except Exception as e:
fail(f"DEL {SAFE_WRITE_PATH} {added_id}", str(e))
results["fail"] += 1
else:
skip("DEL — no entry ID from add step")
results["skip"] += 1
# ── 6. CLI Execute test ─────────────────────────────────────────────────
section("CLI Execute Tests")
cli_commands = [
"/system/identity/print",
"/ip/address/print",
"/interface/print",
]
for cmd in cli_commands:
try:
resp = session.post(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/execute",
json={"command": cmd},
timeout=20,
)
if resp.status_code == 200:
data = resp.json()
if data.get("success"):
n = len(data.get("data", []))
ok(f"{cmd:45s} {n} result(s)")
results["pass"] += 1
else:
fail(f"{cmd:45s}", data.get("error", "unknown"))
results["fail"] += 1
elif resp.status_code == 403:
warn(f"{cmd:45s} BLOCKED (expected for some commands)")
results["warn"] += 1
else:
fail(f"{cmd:45s} HTTP {resp.status_code}")
results["fail"] += 1
except Exception as e:
fail(f"{cmd:45s}", str(e))
results["fail"] += 1
time.sleep(0.2)
# ── 7. Blocked command tests (should all be 403) ────────────────────────
section("Security Blocklist Tests (all should be blocked)")
blocked_cmds = [
"/system/reset-configuration",
"/system/reboot",
"/user/print",
"/export",
]
for cmd in blocked_cmds:
try:
resp = session.post(
f"{base}/api/tenants/{tenant_id}/devices/{device_id}/config-editor/execute",
json={"command": cmd},
timeout=10,
)
if resp.status_code == 403:
ok(f"{cmd:45s} correctly blocked (403)")
results["pass"] += 1
else:
fail(f"{cmd:45s} NOT BLOCKED — HTTP {resp.status_code}")
results["fail"] += 1
except Exception as e:
fail(f"{cmd:45s}", str(e))
results["fail"] += 1
time.sleep(0.1)
# ── Summary ─────────────────────────────────────────────────────────────
section("Summary")
total = results["pass"] + results["fail"] + results["warn"] + results["skip"]
print(f" {Colors.OK}PASS: {results['pass']}{Colors.END}")
print(f" {Colors.FAIL}FAIL: {results['fail']}{Colors.END}")
print(f" {Colors.WARN}WARN: {results['warn']}{Colors.END}")
print(f" {Colors.DIM}SKIP: {results['skip']}{Colors.END}")
print(f" Total: {total}")
if results["fail"] > 0:
print(f"\n {Colors.FAIL}Some tests failed — see FAIL lines above for details.{Colors.END}")
sys.exit(1)
else:
print(f"\n {Colors.OK}All tests passed (warnings are informational).{Colors.END}")
if __name__ == "__main__":
main()