import csv import io import json import os import re import sqlite3 import requests from flask import Flask, jsonify, request, render_template from flask_cors import CORS app = Flask(__name__) CORS(app) DB_PATH = os.path.join("data", "wisp.db") # ── Optional SNMP support ──────────────────────────────────────────────────── try: import puresnmp import puresnmp.exc SNMP_AVAILABLE = True except ImportError: SNMP_AVAILABLE = False # ── Optional gspread support (private Google Sheets) ──────────────────────── try: import gspread from google.oauth2.service_account import Credentials as SACredentials GSPREAD_AVAILABLE = True except ImportError: GSPREAD_AVAILABLE = False # ── Database ───────────────────────────────────────────────────────────────── def get_db(): os.makedirs("data", exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_db() conn.execute(""" CREATE TABLE IF NOT EXISTS access_points ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, ssid TEXT, lat REAL NOT NULL, lon REAL NOT NULL, frequency REAL NOT NULL, channel INTEGER, antenna_type TEXT DEFAULT 'omni', azimuth REAL DEFAULT 0, beamwidth REAL DEFAULT 360, coverage_radius INTEGER DEFAULT 2000, signal_strength REAL DEFAULT -65, height REAL DEFAULT 30, notes TEXT DEFAULT '' ) """) count = conn.execute("SELECT COUNT(*) FROM access_points").fetchone()[0] if count == 0: demo = [ # Moscow Mountain — high-elevation hub serving the Palouse region ("Moscow Mountain", "Palouse-Net", 46.7950, -116.9600, 5800, 149, "omni", 0, 360, 8000, -58, 55, "High-elevation 5 GHz hub"), # Downtown Moscow water tower — 2.4 GHz fill-in for dense urban core ("Downtown Moscow", "Palouse-Net", 46.7317, -117.0002, 2400, 6, "omni", 0, 360, 2500, -67, 30, "Urban 2.4 GHz fill-in"), # East sector toward Pullman, WA ("East Sector - UI", "Palouse-Net", 46.7280, -116.9700, 5800, 157, "sector", 90, 120, 6000, -60, 40, "Sector toward Pullman / WSU"), # South sector toward Troy, ID ("South Sector - Troy","Palouse-Net", 46.6900, -116.9950, 900,None, "sector", 180, 90, 10000, -63, 45, "900 MHz long-range toward Troy"), # West sector toward Genesee, ID ("West Sector - Gen", "Palouse-Net", 46.7350, -117.0800, 5800, 161, "sector", 270, 120, 5000, -62, 38, "Sector toward Genesee"), ] conn.executemany( "INSERT INTO access_points (name,ssid,lat,lon,frequency,channel,antenna_type,azimuth,beamwidth,coverage_radius,signal_strength,height,notes) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", demo, ) conn.commit() conn.close() def row_to_dict(row): return dict(row) def insert_ap(conn, data): cur = conn.execute( """INSERT INTO access_points (name,ssid,lat,lon,frequency,channel,antenna_type,azimuth,beamwidth,coverage_radius,signal_strength,height,notes) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( data["name"], data.get("ssid", ""), float(data["lat"]), float(data["lon"]), float(data["frequency"]), data.get("channel") or None, data.get("antenna_type", "omni"), float(data.get("azimuth", 0)), float(data.get("beamwidth", 360)), int(data.get("coverage_radius", 2000)), float(data.get("signal_strength", -65)), float(data.get("height", 30)), data.get("notes", ""), ), ) return cur.lastrowid # ── Core AP routes ──────────────────────────────────────────────────────────── @app.route("/") def index(): return render_template("index.html") @app.route("/api/aps", methods=["GET"]) def get_aps(): conn = get_db() rows = conn.execute("SELECT * FROM access_points ORDER BY name").fetchall() conn.close() return jsonify([row_to_dict(r) for r in rows]) @app.route("/api/aps", methods=["POST"]) def create_ap(): data = request.get_json() if not all(k in data for k in ("name", "lat", "lon", "frequency")): return jsonify({"error": "Missing required fields: name, lat, lon, frequency"}), 400 conn = get_db() new_id = insert_ap(conn, data) conn.commit() row = conn.execute("SELECT * FROM access_points WHERE id=?", (new_id,)).fetchone() conn.close() return jsonify(row_to_dict(row)), 201 @app.route("/api/aps/", methods=["PUT"]) def update_ap(ap_id): data = request.get_json() conn = get_db() existing = conn.execute("SELECT * FROM access_points WHERE id=?", (ap_id,)).fetchone() if not existing: conn.close() return jsonify({"error": "Not found"}), 404 merged = {**row_to_dict(existing), **data, "id": ap_id} conn.execute( """UPDATE access_points SET name=?,ssid=?,lat=?,lon=?,frequency=?,channel=?,antenna_type=?, azimuth=?,beamwidth=?,coverage_radius=?,signal_strength=?,height=?,notes=? WHERE id=?""", ( merged["name"], merged["ssid"], merged["lat"], merged["lon"], merged["frequency"], merged["channel"], merged["antenna_type"], merged["azimuth"], merged["beamwidth"], merged["coverage_radius"], merged["signal_strength"], merged["height"], merged["notes"], ap_id, ), ) conn.commit() row = conn.execute("SELECT * FROM access_points WHERE id=?", (ap_id,)).fetchone() conn.close() return jsonify(row_to_dict(row)) @app.route("/api/aps/", methods=["DELETE"]) def delete_ap(ap_id): conn = get_db() result = conn.execute("DELETE FROM access_points WHERE id=?", (ap_id,)) conn.commit() conn.close() if result.rowcount == 0: return jsonify({"error": "Not found"}), 404 return jsonify({"deleted": ap_id}) # ── Google Sheets import ────────────────────────────────────────────────────── # Column name aliases (sheet header → internal field) COLUMN_ALIASES = { "name": "name", "tower": "name", "ap name": "name", "ap_name": "name", "ssid": "ssid", "network": "ssid", "lat": "lat", "latitude": "lat", "lon": "lon", "lng": "lon", "longitude": "lon", "frequency": "frequency", "freq": "frequency", "freq (mhz)": "frequency", "frequency (mhz)": "frequency", "channel": "channel", "ch": "channel", "antenna_type": "antenna_type", "antenna type": "antenna_type", "antenna": "antenna_type", "azimuth": "azimuth", "bearing": "azimuth", "beamwidth": "beamwidth", "beam_width": "beamwidth", "beam": "beamwidth", "coverage_radius": "coverage_radius", "coverage radius": "coverage_radius", "radius": "coverage_radius", "radius (m)": "coverage_radius", "signal_strength": "signal_strength", "signal": "signal_strength", "signal (dbm)": "signal_strength", "rssi": "signal_strength", "height": "height", "tower height": "height", "height (m)": "height", "notes": "notes", "comments": "notes", "description": "notes", } def _sheets_csv_url(url: str) -> str | None: """Convert any Google Sheets URL to a CSV export URL.""" m = re.search(r"/spreadsheets/d/([a-zA-Z0-9_-]+)", url) if not m: return None sheet_id = m.group(1) gid_m = re.search(r"[#&?]gid=(\d+)", url) gid = gid_m.group(1) if gid_m else "0" return f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv&gid={gid}" def _normalise_row(raw: dict) -> dict: """Map sheet column names to internal field names (case-insensitive).""" out = {} for k, v in raw.items(): normalised = COLUMN_ALIASES.get(k.strip().lower()) if normalised and v.strip(): out[normalised] = v.strip() return out def _import_csv_rows(reader) -> tuple[list, list]: imported, errors = [], [] conn = get_db() for i, raw in enumerate(reader, start=2): # row 1 = header row = _normalise_row(raw) missing = [f for f in ("name", "lat", "lon", "frequency") if f not in row] if missing: errors.append({"row": i, "error": f"Missing columns: {', '.join(missing)}", "data": raw}) continue try: new_id = insert_ap(conn, row) conn.commit() imported.append({"id": new_id, "name": row["name"]}) except Exception as exc: errors.append({"row": i, "error": str(exc), "data": raw}) conn.close() return imported, errors @app.route("/api/import/sheets", methods=["POST"]) def import_from_sheets(): """Import APs from a Google Sheet. Body JSON: url – Sheets URL (required for public sheets) credentials – service account JSON string (optional, for private sheets) """ data = request.get_json() or {} url = data.get("url", "").strip() credentials_json = data.get("credentials") # optional service-account JSON string if not url: return jsonify({"error": "url is required"}), 400 # ── Private sheet via service account ── if credentials_json: if not GSPREAD_AVAILABLE: return jsonify({"error": "gspread is not installed"}), 500 try: creds_dict = json.loads(credentials_json) creds = SACredentials.from_service_account_info( creds_dict, scopes=["https://www.googleapis.com/auth/spreadsheets.readonly"], ) gc = gspread.authorize(creds) sh = gc.open_by_url(url) # Use first worksheet ws = sh.get_worksheet(0) records = ws.get_all_records() # Convert to csv.DictReader-like list of dicts imported, errors = [], [] conn = get_db() for i, raw in enumerate(records, start=2): row = _normalise_row({k: str(v) for k, v in raw.items()}) missing = [f for f in ("name", "lat", "lon", "frequency") if f not in row] if missing: errors.append({"row": i, "error": f"Missing columns: {', '.join(missing)}", "data": raw}) continue try: new_id = insert_ap(conn, row) conn.commit() imported.append({"id": new_id, "name": row["name"]}) except Exception as exc: errors.append({"row": i, "error": str(exc)}) conn.close() return jsonify({"imported": len(imported), "errors": errors, "aps": imported}) except json.JSONDecodeError: return jsonify({"error": "Invalid service account JSON"}), 400 except Exception as exc: return jsonify({"error": str(exc)}), 400 # ── Public sheet via CSV export ── csv_url = _sheets_csv_url(url) if not csv_url: return jsonify({"error": "Could not parse a Google Sheets ID from that URL"}), 400 try: resp = requests.get(csv_url, timeout=15) resp.raise_for_status() except requests.RequestException as exc: return jsonify({"error": f"Failed to fetch sheet: {exc}"}), 400 content = resp.text # Google returns an HTML login page for private sheets if "accounts.google.com" in content or "Sign in" in content[:500]: return jsonify({ "error": "Sheet is private. Share it publicly ('Anyone with the link can view') or provide a service account JSON." }), 403 reader = csv.DictReader(io.StringIO(content)) imported, errors = _import_csv_rows(reader) return jsonify({"imported": len(imported), "errors": errors, "aps": imported}) @app.route("/api/import/sheets/preview", methods=["POST"]) def preview_sheets(): """Return first 5 data rows + detected column mapping without importing.""" data = request.get_json() or {} url = data.get("url", "").strip() if not url: return jsonify({"error": "url is required"}), 400 csv_url = _sheets_csv_url(url) if not csv_url: return jsonify({"error": "Could not parse a Google Sheets ID from that URL"}), 400 try: resp = requests.get(csv_url, timeout=15) resp.raise_for_status() except requests.RequestException as exc: return jsonify({"error": str(exc)}), 400 if "accounts.google.com" in resp.text or "Sign in" in resp.text[:500]: return jsonify({"error": "Sheet is private. Share it publicly or provide credentials."}), 403 reader = csv.DictReader(io.StringIO(resp.text)) headers = reader.fieldnames or [] mapping = {h: COLUMN_ALIASES.get(h.strip().lower(), None) for h in headers} rows = [dict(r) for r, _ in zip(reader, range(5))] return jsonify({"headers": headers, "mapping": mapping, "sample_rows": rows}) # ── SNMP polling ────────────────────────────────────────────────────────────── _STANDARD_OIDS = { "sysDescr": "1.3.6.1.2.1.1.1.0", "sysName": "1.3.6.1.2.1.1.5.0", "sysLocation": "1.3.6.1.2.1.1.6.0", "sysContact": "1.3.6.1.2.1.1.4.0", "sysObjectID": "1.3.6.1.2.1.1.2.0", # critical for reliable vendor detection } # ── Ubiquiti ────────────────────────────────────────────────────────────────── # Each key maps to an ordered list of OIDs to try; first non-None wins. # This covers three hardware generations in one pass: # idx .0 — AirOS 6.x / M-series (Rocket M2/M5, Bullet M, NanoStation M, # NanoBridge M, PowerBeam M, AirGrid M …) # idx .1 — AirOS 8.x / AC-series / XC platform (Rocket Prism 5AC Gen1/Gen2, # LiteBeam AC, NanoBeam AC, PowerBeam AC, IsoStation 5AC …) # .1.6 — airFiber backhaul (AF-5X, AF-5XHD, AF-24, AF-60-LR …) _UBNT_OID_CHAINS = { "frequency": [ "1.3.6.1.4.1.41112.1.4.1.1.3.0", # AirOS 6.x M-series "1.3.6.1.4.1.41112.1.4.1.1.3.1", # AirOS 8.x AC-series / XC "1.3.6.1.4.1.41112.1.6.1.1.3.0", # airFiber ], "channel_width": [ "1.3.6.1.4.1.41112.1.4.1.1.4.0", "1.3.6.1.4.1.41112.1.4.1.1.4.1", "1.3.6.1.4.1.41112.1.6.1.1.4.0", ], "tx_power": [ "1.3.6.1.4.1.41112.1.4.1.1.9.0", "1.3.6.1.4.1.41112.1.4.1.1.9.1", "1.3.6.1.4.1.41112.1.6.1.1.9.0", ], "ssid": [ "1.3.6.1.4.1.41112.1.4.5.0", # AirMax SSID "1.3.6.1.4.1.41112.1.6.4.0", # airFiber network name ], "signal": [ "1.3.6.1.4.1.41112.1.4.7.1.5.1", # AirMax station 1 signal "1.3.6.1.4.1.41112.1.4.7.1.5.2", # chain 1 fallback "1.3.6.1.4.1.41112.1.6.7.1.3.1", # airFiber remote signal ], "noise": [ "1.3.6.1.4.1.41112.1.4.7.1.6.1", "1.3.6.1.4.1.41112.1.4.7.1.6.2", ], "connected_stations": [ "1.3.6.1.4.1.41112.1.4.4.0", # AirMax registered clients ], } # ── Cambium ePMP ────────────────────────────────────────────────────────────── # ePMP 1000 / 2000 / 3000 / 4500 / Force 180/200/300/425/4525 (MIB .22) _EPMP_OIDS = { "frequency": "1.3.6.1.4.1.17713.22.1.1.1.2.0", "channel_width": "1.3.6.1.4.1.17713.22.1.1.1.6.0", "tx_power": "1.3.6.1.4.1.17713.22.1.1.1.9.0", "ssid": "1.3.6.1.4.1.17713.22.1.1.3.1.0", "color_code": "1.3.6.1.4.1.17713.22.1.1.3.7.0", "mode": "1.3.6.1.4.1.17713.22.1.1.1.5.0", # 1=AP 2=SM "connected_stations": "1.3.6.1.4.1.17713.22.1.2.1.0", "dl_mcs": "1.3.6.1.4.1.17713.22.1.1.5.2.0", "ul_mcs": "1.3.6.1.4.1.17713.22.1.1.5.3.0", } # ── Cambium PMP 450 family ──────────────────────────────────────────────────── # PMP 450 / 450i / 450m / 450d / 450b / 450v (MIB .21) _PMP450_OIDS = { "frequency": "1.3.6.1.4.1.17713.21.1.1.18.0", "channel_width": "1.3.6.1.4.1.17713.21.1.1.22.0", "tx_power": "1.3.6.1.4.1.17713.21.1.1.27.0", "ssid": "1.3.6.1.4.1.17713.21.1.1.3.0", "signal": "1.3.6.1.4.1.17713.21.1.2.1.0", "color_code": "1.3.6.1.4.1.17713.21.1.1.25.0", "connected_stations": "1.3.6.1.4.1.17713.21.1.4.1.0", } # ── Cambium legacy Canopy / Motorola PMP 100 / PMP 400 / PMP 430 ───────────── # (Enterprise OID 161 = Motorola; devices pre-date the Cambium rebrand) _CANOPY_OIDS = { "frequency": "1.3.6.1.4.1.161.19.89.1.1.5.0", "ssid": "1.3.6.1.4.1.161.19.89.1.1.1.0", "signal": "1.3.6.1.4.1.161.19.89.2.1.33.0", "tx_power": "1.3.6.1.4.1.161.19.89.1.1.6.0", } # ── MikroTik ────────────────────────────────────────────────────────────────── # RouterOS wireless (wAP, SXT, BaseBox, Groove, LHG, mANTBox, Audience…) _MIKROTIK_OIDS = { "frequency": "1.3.6.1.4.1.14988.1.1.1.7.0", "channel_width": "1.3.6.1.4.1.14988.1.1.1.8.0", "tx_power": "1.3.6.1.4.1.14988.1.1.1.3.0", "ssid": "1.3.6.1.4.1.14988.1.1.1.9.0", "signal": "1.3.6.1.4.1.14988.1.1.1.2.0", # overall RX signal "noise": "1.3.6.1.4.1.14988.1.1.1.11.0", "connected_stations": "1.3.6.1.4.1.14988.1.1.1.1.0", } # Human-readable display names for each vendor key VENDOR_DISPLAY = { "ubiquiti": "Ubiquiti (AirMax / airFiber)", "cambium_epmp": "Cambium ePMP", "cambium_pmp450": "Cambium PMP 450", "cambium_canopy": "Cambium / Motorola Canopy", "mikrotik": "MikroTik RouterOS", "unknown": "Unknown", } def _snmp_decode(val) -> str | None: """Convert any puresnmp return type to a plain string. puresnmp / x690 returns typed objects: OctetString → bytes (sysDescr, sysName, SSID …) Integer → int (frequency, signal …) ObjectIdentifier → x690 OID object (sysObjectID) TimeTicks → int We need to handle all of these without raising. """ if val is None: return None # x690 ObjectIdentifier — convert to dotted-decimal string type_name = type(val).__name__ if type_name == "ObjectIdentifier": try: # x690 stores the OID as a tuple in .value if hasattr(val, "value"): return "." + ".".join(str(n) for n in val.value) # Fallback: str() on some versions gives dotted notation directly s = str(val) if s.startswith("(") or s.startswith("ObjectIdentifier"): # Parse tuple repr like "(1, 3, 6, 1, 4, 1, 41112, …)" nums = re.findall(r"\d+", s) return "." + ".".join(nums) if nums else s return s except Exception: return str(val) if isinstance(val, bytes): try: return val.decode("utf-8", errors="replace").strip() except Exception: return repr(val) # int, float, or anything else return str(val).strip() def _snmp_get(host: str, oid: str, community: str, port: int, version: int) -> tuple[str | None, str | None]: """Single SNMP GET. Returns (value_str, error_str).""" try: val = puresnmp.get(host, community, oid, port=port, version=version) return _snmp_decode(val), None except Exception as exc: return None, str(exc) def _snmp_get_chain(host: str, oids: list, community: str, port: int, version: int) -> str | None: """Try each OID in order; return the first non-None, non-empty result.""" for oid in oids: val, _ = _snmp_get(host, oid, community, port, version) if val and val not in ("0", "None"): return val return None # Keywords checked against the combined sysDescr + sysName string _UBNT_KEYWORDS = ( "ubiquiti", "ubnt", "airmax", "airos", "airfiber", # AirMax AC / XC platform (AirOS 8.x) "liteap", "lite ap", "lap-gps", "litap", # LiteAP / LiteAP GPS "litebeam", "lite beam", # LiteBeam AC "nanobeam", "nano beam", # NanoBeam AC "nanostation", "nano station", # NanoStation AC / Loco AC "nanobridge", "powerbeam", "power beam", # PowerBeam AC "isostation", "iso station", # IsoStation AC "rocket prism", "rocketprism", "rp5ac", "rp-5ac",# Rocket Prism 5AC "rocket m", "rocketm", # Rocket M (legacy) "loco m", "locom", # NanoStation Loco M "picostation", "airstation", "edgepoint", "af-5", "af-24", "af-60", "airfiber", # airFiber "edgeos", # EdgeRouter (not a radio but same vendor) ) def _detect_vendor(descr: str, name: str, obj_id: str) -> str: """Determine vendor key from sysObjectID (reliable), then sysDescr + sysName keywords.""" o = (obj_id or "").lower() # Combined text — AirOS puts the model in sysName on some firmware combined = ((descr or "") + " " + (name or "")).lower() # ── sysObjectID enterprise number (most reliable) ────────────────────── if "41112" in o: return "ubiquiti" if "17713" in o: # Distinguish ePMP (.22) from PMP 450 (.21) by the subtree number after = o.split("17713")[-1] if after.startswith(".22") or after.startswith("22"): return "cambium_epmp" return "cambium_pmp450" if "161.19" in o or re.search(r"\.161\.1[^4]", o): return "cambium_canopy" if "14988" in o: return "mikrotik" # ── Keyword fallback ──────────────────────────────────────────────────── if any(k in combined for k in _UBNT_KEYWORDS): return "ubiquiti" if any(k in combined for k in ( "epmp", "force 1", "force1", "force 180", "force180", "force 200", "force200", "force 300", "force300", "force 400", "force400", "force 425", "force425", "cnpilot", "ptp 550", "ptp550", )): return "cambium_epmp" if any(k in combined for k in ( "pmp 450", "pmp450", "pmp 430", "pmp430", "cambium pmp", "cambium networks pmp", )): return "cambium_pmp450" if any(k in combined for k in ("canopy", "motorola pmp", "motorola bh")): return "cambium_canopy" if any(k in combined for k in ("cambium", "pmp")): return "cambium_pmp450" if any(k in combined for k in ("mikrotik", "routeros")): return "mikrotik" return "unknown" # Probe OIDs tried (in order) when vendor is still unknown after sysObjectID + keywords. # Uses well-known scalars from different UBNT MIB subtrees so at least one # should respond on any AirMax / AirOS 8.x / airFiber device. _UBNT_PROBE_OIDS = [ "1.3.6.1.4.1.41112.1.4.5.0", # AirMax SSID "1.3.6.1.4.1.41112.1.4.1.1.3.0", # AirMax frequency (M-series) "1.3.6.1.4.1.41112.1.4.1.1.3.1", # AirMax frequency (AC/XC, e.g. LiteAP GPS) "1.3.6.1.4.1.41112.1.6.1.1.3.0", # airFiber frequency ] def poll_device(host: str, community: str = "public", port: int = 161, version: int = 2) -> dict: raw: dict[str, str | None] = {} errors: dict[str, str] = {} # OID key → error message, for diagnostics # Standard OIDs first (sysObjectID drives vendor detection) for key, oid in _STANDARD_OIDS.items(): val, err = _snmp_get(host, oid, community, port, version) raw[key] = val if err and key in ("sysDescr", "sysName", "sysObjectID"): errors[key] = err vendor = _detect_vendor( raw.get("sysDescr") or "", raw.get("sysName") or "", raw.get("sysObjectID") or "", ) # If still unknown, probe several UBNT OIDs — handles AirOS devices whose # sysDescr is bare "Linux " and sysObjectID didn't decode cleanly. if vendor == "unknown": for probe_oid in _UBNT_PROBE_OIDS: val, _ = _snmp_get(host, probe_oid, community, port, version) if val and val not in ("0", "None"): vendor = "ubiquiti" break # Fetch vendor-specific OIDs if vendor == "ubiquiti": for key, oids in _UBNT_OID_CHAINS.items(): raw[key] = _snmp_get_chain(host, oids, community, port, version) elif vendor == "cambium_epmp": for key, oid in _EPMP_OIDS.items(): raw[key], err = _snmp_get(host, oid, community, port, version) if err: errors[key] = err elif vendor == "cambium_pmp450": for key, oid in _PMP450_OIDS.items(): raw[key], err = _snmp_get(host, oid, community, port, version) if err: errors[key] = err elif vendor == "cambium_canopy": for key, oid in _CANOPY_OIDS.items(): raw[key], err = _snmp_get(host, oid, community, port, version) if err: errors[key] = err elif vendor == "mikrotik": for key, oid in _MIKROTIK_OIDS.items(): raw[key], err = _snmp_get(host, oid, community, port, version) if err: errors[key] = err # Build pre-filled AP suggestion suggested = { "name": raw.get("sysName") or host, "ssid": raw.get("ssid") or "", "lat": 0.0, "lon": 0.0, "frequency": 0.0, "signal_strength": -65.0, "notes": f"SNMP import from {host} · {VENDOR_DISPLAY.get(vendor, vendor)}", } for field, key in (("frequency", "frequency"), ("signal_strength", "signal")): v = raw.get(key) if v is not None: try: suggested[field] = float(v) except ValueError: pass # Normalise kHz → MHz (some firmware reports in kHz) if suggested["frequency"] > 100_000: suggested["frequency"] = round(suggested["frequency"] / 1000) return { "host": host, "vendor": vendor, "vendor_label": VENDOR_DISPLAY.get(vendor, vendor), "raw": raw, "suggested": suggested, "snmp_available": True, "errors": errors, # diagnostic — shown in UI if non-empty } @app.route("/api/snmp/poll", methods=["POST"]) def snmp_poll(): if not SNMP_AVAILABLE: return jsonify({"error": "puresnmp is not installed"}), 501 data = request.get_json() or {} host = data.get("host", "").strip() if not host: return jsonify({"error": "host is required"}), 400 community = data.get("community", "public").strip() port = int(data.get("port", 161)) version_str = str(data.get("version", "2c")).strip() version = 1 if version_str in ("1", "v1") else 2 try: result = poll_device(host, community, port, version) return jsonify(result) except Exception as exc: return jsonify({"error": str(exc)}), 500 @app.route("/api/snmp/status", methods=["GET"]) def snmp_status(): return jsonify({"available": SNMP_AVAILABLE, "gspread_available": GSPREAD_AVAILABLE}) if __name__ == "__main__": init_db() app.run(host="0.0.0.0", port=5000, debug=False)