Files
the-other-dude/backend/app/services/rsc_parser.py
Jason Staack 06a41ca9bf fix(lint): resolve all ruff lint errors
Add ruff config to exclude alembic E402, SQLAlchemy F821, and pre-existing
E501 line-length issues. Auto-fix 69 unused imports and 2 f-strings without
placeholders. Manually fix 8 unused variables. Apply ruff format to 127 files.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 22:17:50 -05:00

243 lines
7.6 KiB
Python

"""RouterOS RSC export parser — extracts categories, validates syntax, computes impact."""
import re
import logging
from typing import Any
logger = logging.getLogger(__name__)
HIGH_RISK_PATHS = {
"/ip address",
"/ip route",
"/ip firewall filter",
"/ip firewall nat",
"/interface",
"/interface bridge",
"/interface vlan",
"/system identity",
"/ip service",
"/ip ssh",
"/user",
}
MANAGEMENT_PATTERNS = [
(
re.compile(r"chain=input.*dst-port=(22|8291|8728|8729|443|80)", re.I),
"Modifies firewall rules for management ports (SSH/WinBox/API/Web)",
),
(
re.compile(r"chain=input.*action=drop", re.I),
"Adds drop rule on input chain — may block management access",
),
(re.compile(r"/ip service", re.I), "Modifies IP services — may disable API/SSH/WinBox access"),
(
re.compile(r"/user.*set.*password", re.I),
"Changes user password — may affect automated access",
),
]
def _join_continuation_lines(text: str) -> list[str]:
"""Join lines ending with \\ into single logical lines."""
lines = text.split("\n")
joined: list[str] = []
buf = ""
for line in lines:
stripped = line.rstrip()
if stripped.endswith("\\"):
buf += stripped[:-1].rstrip() + " "
else:
if buf:
buf += stripped
joined.append(buf)
buf = ""
else:
joined.append(stripped)
if buf:
joined.append(buf + " <<TRUNCATED>>")
return joined
def parse_rsc(text: str) -> dict[str, Any]:
"""Parse a RouterOS /export compact output.
Returns a dict with a "categories" list, each containing:
- path: the RouterOS command path (e.g. "/ip address")
- adds: count of "add" commands
- sets: count of "set" commands
- removes: count of "remove" commands
- commands: list of command strings under this path
"""
lines = _join_continuation_lines(text)
categories: dict[str, dict] = {}
current_path: str | None = None
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("/"):
# Could be just a path header, or a path followed by a command
parts = line.split(None, 1)
if len(parts) == 1:
# Pure path header like "/interface bridge"
current_path = parts[0]
else:
# Check if second part starts with a known command verb
cmd_check = parts[1].strip().split(None, 1)
if cmd_check and cmd_check[0] in (
"add",
"set",
"remove",
"print",
"enable",
"disable",
):
current_path = parts[0]
line = parts[1].strip()
else:
# The whole line is a path (e.g. "/ip firewall filter")
current_path = line
continue
if current_path and current_path not in categories:
categories[current_path] = {
"path": current_path,
"adds": 0,
"sets": 0,
"removes": 0,
"commands": [],
}
if len(parts) == 1:
continue
if current_path is None:
continue
if current_path not in categories:
categories[current_path] = {
"path": current_path,
"adds": 0,
"sets": 0,
"removes": 0,
"commands": [],
}
cat = categories[current_path]
cat["commands"].append(line)
if line.startswith("add ") or line.startswith("add\t"):
cat["adds"] += 1
elif line.startswith("set "):
cat["sets"] += 1
elif line.startswith("remove "):
cat["removes"] += 1
return {"categories": list(categories.values())}
def validate_rsc(text: str) -> dict[str, Any]:
"""Validate RSC export syntax.
Checks for:
- Unbalanced quotes (indicates truncation or corruption)
- Trailing continuation lines (indicates truncated export)
Returns dict with "valid" (bool) and "errors" (list of strings).
"""
errors: list[str] = []
# Check for unbalanced quotes across the entire file
in_quote = False
for line in text.split("\n"):
stripped = line.rstrip()
if stripped.endswith("\\"):
stripped = stripped[:-1]
# Count unescaped quotes
count = stripped.count('"') - stripped.count('\\"')
if count % 2 != 0:
in_quote = not in_quote
if in_quote:
errors.append("Unbalanced quote detected — file may be truncated")
# Check if file ends with a continuation backslash
lines = text.rstrip().split("\n")
if lines and lines[-1].rstrip().endswith("\\"):
errors.append("File ends with continuation line (\\) — truncated export")
return {"valid": len(errors) == 0, "errors": errors}
def compute_impact(
current_parsed: dict[str, Any],
target_parsed: dict[str, Any],
) -> dict[str, Any]:
"""Compare current vs target parsed RSC and compute impact analysis.
Returns dict with:
- categories: list of per-path diffs with risk levels
- warnings: list of human-readable warning strings
- diff: summary counts (added, removed, modified)
"""
current_map = {c["path"]: c for c in current_parsed["categories"]}
target_map = {c["path"]: c for c in target_parsed["categories"]}
all_paths = sorted(set(list(current_map.keys()) + list(target_map.keys())))
result_categories = []
warnings: list[str] = []
total_added = total_removed = total_modified = 0
for path in all_paths:
curr = current_map.get(path, {"adds": 0, "sets": 0, "removes": 0, "commands": []})
tgt = target_map.get(path, {"adds": 0, "sets": 0, "removes": 0, "commands": []})
curr_cmds = set(curr.get("commands", []))
tgt_cmds = set(tgt.get("commands", []))
added = len(tgt_cmds - curr_cmds)
removed = len(curr_cmds - tgt_cmds)
total_added += added
total_removed += removed
has_changes = added > 0 or removed > 0
risk = "none"
if has_changes:
risk = "high" if path in HIGH_RISK_PATHS else "low"
result_categories.append(
{
"path": path,
"adds": added,
"removes": removed,
"risk": risk,
}
)
# Check target commands against management patterns
target_text = "\n".join(
cmd for cat in target_parsed["categories"] for cmd in cat.get("commands", [])
)
for pattern, message in MANAGEMENT_PATTERNS:
if pattern.search(target_text):
warnings.append(message)
# Warn about removed IP addresses
if "/ip address" in current_map and "/ip address" in target_map:
curr_addrs = current_map["/ip address"].get("commands", [])
tgt_addrs = target_map["/ip address"].get("commands", [])
removed_addrs = set(curr_addrs) - set(tgt_addrs)
if removed_addrs:
warnings.append(
f"Removes {len(removed_addrs)} IP address(es) — verify none are management interfaces"
)
return {
"categories": result_categories,
"warnings": warnings,
"diff": {
"added": total_added,
"removed": total_removed,
"modified": total_modified,
},
}