Files
wisp-map/app.py
2026-04-10 15:36:34 -07:00

695 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import csv
import io
import json
import os
import re
import sqlite3
import requests
from flask import Flask, jsonify, request, render_template
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
DB_PATH = os.path.join("data", "wisp.db")
# ── Optional SNMP support ────────────────────────────────────────────────────
try:
import puresnmp
import puresnmp.exc
SNMP_AVAILABLE = True
except ImportError:
SNMP_AVAILABLE = False
# ── Optional gspread support (private Google Sheets) ────────────────────────
try:
import gspread
from google.oauth2.service_account import Credentials as SACredentials
GSPREAD_AVAILABLE = True
except ImportError:
GSPREAD_AVAILABLE = False
# ── Database ─────────────────────────────────────────────────────────────────
def get_db():
os.makedirs("data", exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db()
conn.execute("""
CREATE TABLE IF NOT EXISTS access_points (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
ssid TEXT,
lat REAL NOT NULL,
lon REAL NOT NULL,
frequency REAL NOT NULL,
channel INTEGER,
antenna_type TEXT DEFAULT 'omni',
azimuth REAL DEFAULT 0,
beamwidth REAL DEFAULT 360,
coverage_radius INTEGER DEFAULT 2000,
signal_strength REAL DEFAULT -65,
height REAL DEFAULT 30,
notes TEXT DEFAULT ''
)
""")
count = conn.execute("SELECT COUNT(*) FROM access_points").fetchone()[0]
if count == 0:
demo = [
# Moscow Mountain — high-elevation hub serving the Palouse region
("Moscow Mountain", "Palouse-Net", 46.7950, -116.9600, 5800, 149, "omni", 0, 360, 8000, -58, 55, "High-elevation 5 GHz hub"),
# Downtown Moscow water tower — 2.4 GHz fill-in for dense urban core
("Downtown Moscow", "Palouse-Net", 46.7317, -117.0002, 2400, 6, "omni", 0, 360, 2500, -67, 30, "Urban 2.4 GHz fill-in"),
# East sector toward Pullman, WA
("East Sector - UI", "Palouse-Net", 46.7280, -116.9700, 5800, 157, "sector", 90, 120, 6000, -60, 40, "Sector toward Pullman / WSU"),
# South sector toward Troy, ID
("South Sector - Troy","Palouse-Net", 46.6900, -116.9950, 900,None, "sector", 180, 90, 10000, -63, 45, "900 MHz long-range toward Troy"),
# West sector toward Genesee, ID
("West Sector - Gen", "Palouse-Net", 46.7350, -117.0800, 5800, 161, "sector", 270, 120, 5000, -62, 38, "Sector toward Genesee"),
]
conn.executemany(
"INSERT INTO access_points (name,ssid,lat,lon,frequency,channel,antenna_type,azimuth,beamwidth,coverage_radius,signal_strength,height,notes) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)",
demo,
)
conn.commit()
conn.close()
def row_to_dict(row):
return dict(row)
def insert_ap(conn, data):
cur = conn.execute(
"""INSERT INTO access_points
(name,ssid,lat,lon,frequency,channel,antenna_type,azimuth,beamwidth,coverage_radius,signal_strength,height,notes)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
data["name"], data.get("ssid", ""), float(data["lat"]), float(data["lon"]),
float(data["frequency"]), data.get("channel") or None,
data.get("antenna_type", "omni"),
float(data.get("azimuth", 0)), float(data.get("beamwidth", 360)),
int(data.get("coverage_radius", 2000)), float(data.get("signal_strength", -65)),
float(data.get("height", 30)), data.get("notes", ""),
),
)
return cur.lastrowid
# ── Core AP routes ────────────────────────────────────────────────────────────
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/aps", methods=["GET"])
def get_aps():
conn = get_db()
rows = conn.execute("SELECT * FROM access_points ORDER BY name").fetchall()
conn.close()
return jsonify([row_to_dict(r) for r in rows])
@app.route("/api/aps", methods=["POST"])
def create_ap():
data = request.get_json()
if not all(k in data for k in ("name", "lat", "lon", "frequency")):
return jsonify({"error": "Missing required fields: name, lat, lon, frequency"}), 400
conn = get_db()
new_id = insert_ap(conn, data)
conn.commit()
row = conn.execute("SELECT * FROM access_points WHERE id=?", (new_id,)).fetchone()
conn.close()
return jsonify(row_to_dict(row)), 201
@app.route("/api/aps/<int:ap_id>", methods=["PUT"])
def update_ap(ap_id):
data = request.get_json()
conn = get_db()
existing = conn.execute("SELECT * FROM access_points WHERE id=?", (ap_id,)).fetchone()
if not existing:
conn.close()
return jsonify({"error": "Not found"}), 404
merged = {**row_to_dict(existing), **data, "id": ap_id}
conn.execute(
"""UPDATE access_points SET
name=?,ssid=?,lat=?,lon=?,frequency=?,channel=?,antenna_type=?,
azimuth=?,beamwidth=?,coverage_radius=?,signal_strength=?,height=?,notes=?
WHERE id=?""",
(
merged["name"], merged["ssid"], merged["lat"], merged["lon"],
merged["frequency"], merged["channel"], merged["antenna_type"],
merged["azimuth"], merged["beamwidth"], merged["coverage_radius"],
merged["signal_strength"], merged["height"], merged["notes"], ap_id,
),
)
conn.commit()
row = conn.execute("SELECT * FROM access_points WHERE id=?", (ap_id,)).fetchone()
conn.close()
return jsonify(row_to_dict(row))
@app.route("/api/aps/<int:ap_id>", methods=["DELETE"])
def delete_ap(ap_id):
conn = get_db()
result = conn.execute("DELETE FROM access_points WHERE id=?", (ap_id,))
conn.commit()
conn.close()
if result.rowcount == 0:
return jsonify({"error": "Not found"}), 404
return jsonify({"deleted": ap_id})
# ── Google Sheets import ──────────────────────────────────────────────────────
# Column name aliases (sheet header → internal field)
COLUMN_ALIASES = {
"name": "name", "tower": "name", "ap name": "name", "ap_name": "name",
"ssid": "ssid", "network": "ssid",
"lat": "lat", "latitude": "lat",
"lon": "lon", "lng": "lon", "longitude": "lon",
"frequency": "frequency", "freq": "frequency", "freq (mhz)": "frequency", "frequency (mhz)": "frequency",
"channel": "channel", "ch": "channel",
"antenna_type": "antenna_type", "antenna type": "antenna_type", "antenna": "antenna_type",
"azimuth": "azimuth", "bearing": "azimuth",
"beamwidth": "beamwidth", "beam_width": "beamwidth", "beam": "beamwidth",
"coverage_radius": "coverage_radius", "coverage radius": "coverage_radius",
"radius": "coverage_radius", "radius (m)": "coverage_radius",
"signal_strength": "signal_strength", "signal": "signal_strength",
"signal (dbm)": "signal_strength", "rssi": "signal_strength",
"height": "height", "tower height": "height", "height (m)": "height",
"notes": "notes", "comments": "notes", "description": "notes",
}
def _sheets_csv_url(url: str) -> str | None:
"""Convert any Google Sheets URL to a CSV export URL."""
m = re.search(r"/spreadsheets/d/([a-zA-Z0-9_-]+)", url)
if not m:
return None
sheet_id = m.group(1)
gid_m = re.search(r"[#&?]gid=(\d+)", url)
gid = gid_m.group(1) if gid_m else "0"
return f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv&gid={gid}"
def _normalise_row(raw: dict) -> dict:
"""Map sheet column names to internal field names (case-insensitive)."""
out = {}
for k, v in raw.items():
normalised = COLUMN_ALIASES.get(k.strip().lower())
if normalised and v.strip():
out[normalised] = v.strip()
return out
def _import_csv_rows(reader) -> tuple[list, list]:
imported, errors = [], []
conn = get_db()
for i, raw in enumerate(reader, start=2): # row 1 = header
row = _normalise_row(raw)
missing = [f for f in ("name", "lat", "lon", "frequency") if f not in row]
if missing:
errors.append({"row": i, "error": f"Missing columns: {', '.join(missing)}", "data": raw})
continue
try:
new_id = insert_ap(conn, row)
conn.commit()
imported.append({"id": new_id, "name": row["name"]})
except Exception as exc:
errors.append({"row": i, "error": str(exc), "data": raw})
conn.close()
return imported, errors
@app.route("/api/import/sheets", methods=["POST"])
def import_from_sheets():
"""Import APs from a Google Sheet.
Body JSON:
url Sheets URL (required for public sheets)
credentials service account JSON string (optional, for private sheets)
"""
data = request.get_json() or {}
url = data.get("url", "").strip()
credentials_json = data.get("credentials") # optional service-account JSON string
if not url:
return jsonify({"error": "url is required"}), 400
# ── Private sheet via service account ──
if credentials_json:
if not GSPREAD_AVAILABLE:
return jsonify({"error": "gspread is not installed"}), 500
try:
creds_dict = json.loads(credentials_json)
creds = SACredentials.from_service_account_info(
creds_dict,
scopes=["https://www.googleapis.com/auth/spreadsheets.readonly"],
)
gc = gspread.authorize(creds)
sh = gc.open_by_url(url)
# Use first worksheet
ws = sh.get_worksheet(0)
records = ws.get_all_records()
# Convert to csv.DictReader-like list of dicts
imported, errors = [], []
conn = get_db()
for i, raw in enumerate(records, start=2):
row = _normalise_row({k: str(v) for k, v in raw.items()})
missing = [f for f in ("name", "lat", "lon", "frequency") if f not in row]
if missing:
errors.append({"row": i, "error": f"Missing columns: {', '.join(missing)}", "data": raw})
continue
try:
new_id = insert_ap(conn, row)
conn.commit()
imported.append({"id": new_id, "name": row["name"]})
except Exception as exc:
errors.append({"row": i, "error": str(exc)})
conn.close()
return jsonify({"imported": len(imported), "errors": errors, "aps": imported})
except json.JSONDecodeError:
return jsonify({"error": "Invalid service account JSON"}), 400
except Exception as exc:
return jsonify({"error": str(exc)}), 400
# ── Public sheet via CSV export ──
csv_url = _sheets_csv_url(url)
if not csv_url:
return jsonify({"error": "Could not parse a Google Sheets ID from that URL"}), 400
try:
resp = requests.get(csv_url, timeout=15)
resp.raise_for_status()
except requests.RequestException as exc:
return jsonify({"error": f"Failed to fetch sheet: {exc}"}), 400
content = resp.text
# Google returns an HTML login page for private sheets
if "accounts.google.com" in content or "Sign in" in content[:500]:
return jsonify({
"error": "Sheet is private. Share it publicly ('Anyone with the link can view') or provide a service account JSON."
}), 403
reader = csv.DictReader(io.StringIO(content))
imported, errors = _import_csv_rows(reader)
return jsonify({"imported": len(imported), "errors": errors, "aps": imported})
@app.route("/api/import/sheets/preview", methods=["POST"])
def preview_sheets():
"""Return first 5 data rows + detected column mapping without importing."""
data = request.get_json() or {}
url = data.get("url", "").strip()
if not url:
return jsonify({"error": "url is required"}), 400
csv_url = _sheets_csv_url(url)
if not csv_url:
return jsonify({"error": "Could not parse a Google Sheets ID from that URL"}), 400
try:
resp = requests.get(csv_url, timeout=15)
resp.raise_for_status()
except requests.RequestException as exc:
return jsonify({"error": str(exc)}), 400
if "accounts.google.com" in resp.text or "Sign in" in resp.text[:500]:
return jsonify({"error": "Sheet is private. Share it publicly or provide credentials."}), 403
reader = csv.DictReader(io.StringIO(resp.text))
headers = reader.fieldnames or []
mapping = {h: COLUMN_ALIASES.get(h.strip().lower(), None) for h in headers}
rows = [dict(r) for r, _ in zip(reader, range(5))]
return jsonify({"headers": headers, "mapping": mapping, "sample_rows": rows})
# ── SNMP polling ──────────────────────────────────────────────────────────────
_STANDARD_OIDS = {
"sysDescr": "1.3.6.1.2.1.1.1.0",
"sysName": "1.3.6.1.2.1.1.5.0",
"sysLocation": "1.3.6.1.2.1.1.6.0",
"sysContact": "1.3.6.1.2.1.1.4.0",
"sysObjectID": "1.3.6.1.2.1.1.2.0", # critical for reliable vendor detection
}
# ── Ubiquiti ──────────────────────────────────────────────────────────────────
# Each key maps to an ordered list of OIDs to try; first non-None wins.
# This covers three hardware generations in one pass:
# idx .0 — AirOS 6.x / M-series (Rocket M2/M5, Bullet M, NanoStation M,
# NanoBridge M, PowerBeam M, AirGrid M …)
# idx .1 — AirOS 8.x / AC-series / XC platform (Rocket Prism 5AC Gen1/Gen2,
# LiteBeam AC, NanoBeam AC, PowerBeam AC, IsoStation 5AC …)
# .1.6 — airFiber backhaul (AF-5X, AF-5XHD, AF-24, AF-60-LR …)
_UBNT_OID_CHAINS = {
"frequency": [
"1.3.6.1.4.1.41112.1.4.1.1.3.0", # AirOS 6.x M-series
"1.3.6.1.4.1.41112.1.4.1.1.3.1", # AirOS 8.x AC-series / XC
"1.3.6.1.4.1.41112.1.6.1.1.3.0", # airFiber
],
"channel_width": [
"1.3.6.1.4.1.41112.1.4.1.1.4.0",
"1.3.6.1.4.1.41112.1.4.1.1.4.1",
"1.3.6.1.4.1.41112.1.6.1.1.4.0",
],
"tx_power": [
"1.3.6.1.4.1.41112.1.4.1.1.9.0",
"1.3.6.1.4.1.41112.1.4.1.1.9.1",
"1.3.6.1.4.1.41112.1.6.1.1.9.0",
],
"ssid": [
"1.3.6.1.4.1.41112.1.4.5.0", # AirMax SSID
"1.3.6.1.4.1.41112.1.6.4.0", # airFiber network name
],
"signal": [
"1.3.6.1.4.1.41112.1.4.7.1.5.1", # AirMax station 1 signal
"1.3.6.1.4.1.41112.1.4.7.1.5.2", # chain 1 fallback
"1.3.6.1.4.1.41112.1.6.7.1.3.1", # airFiber remote signal
],
"noise": [
"1.3.6.1.4.1.41112.1.4.7.1.6.1",
"1.3.6.1.4.1.41112.1.4.7.1.6.2",
],
"connected_stations": [
"1.3.6.1.4.1.41112.1.4.4.0", # AirMax registered clients
],
}
# ── Cambium ePMP ──────────────────────────────────────────────────────────────
# ePMP 1000 / 2000 / 3000 / 4500 / Force 180/200/300/425/4525 (MIB .22)
_EPMP_OIDS = {
"frequency": "1.3.6.1.4.1.17713.22.1.1.1.2.0",
"channel_width": "1.3.6.1.4.1.17713.22.1.1.1.6.0",
"tx_power": "1.3.6.1.4.1.17713.22.1.1.1.9.0",
"ssid": "1.3.6.1.4.1.17713.22.1.1.3.1.0",
"color_code": "1.3.6.1.4.1.17713.22.1.1.3.7.0",
"mode": "1.3.6.1.4.1.17713.22.1.1.1.5.0", # 1=AP 2=SM
"connected_stations": "1.3.6.1.4.1.17713.22.1.2.1.0",
"dl_mcs": "1.3.6.1.4.1.17713.22.1.1.5.2.0",
"ul_mcs": "1.3.6.1.4.1.17713.22.1.1.5.3.0",
}
# ── Cambium PMP 450 family ────────────────────────────────────────────────────
# PMP 450 / 450i / 450m / 450d / 450b / 450v (MIB .21)
_PMP450_OIDS = {
"frequency": "1.3.6.1.4.1.17713.21.1.1.18.0",
"channel_width": "1.3.6.1.4.1.17713.21.1.1.22.0",
"tx_power": "1.3.6.1.4.1.17713.21.1.1.27.0",
"ssid": "1.3.6.1.4.1.17713.21.1.1.3.0",
"signal": "1.3.6.1.4.1.17713.21.1.2.1.0",
"color_code": "1.3.6.1.4.1.17713.21.1.1.25.0",
"connected_stations": "1.3.6.1.4.1.17713.21.1.4.1.0",
}
# ── Cambium legacy Canopy / Motorola PMP 100 / PMP 400 / PMP 430 ─────────────
# (Enterprise OID 161 = Motorola; devices pre-date the Cambium rebrand)
_CANOPY_OIDS = {
"frequency": "1.3.6.1.4.1.161.19.89.1.1.5.0",
"ssid": "1.3.6.1.4.1.161.19.89.1.1.1.0",
"signal": "1.3.6.1.4.1.161.19.89.2.1.33.0",
"tx_power": "1.3.6.1.4.1.161.19.89.1.1.6.0",
}
# ── MikroTik ──────────────────────────────────────────────────────────────────
# RouterOS wireless (wAP, SXT, BaseBox, Groove, LHG, mANTBox, Audience…)
_MIKROTIK_OIDS = {
"frequency": "1.3.6.1.4.1.14988.1.1.1.7.0",
"channel_width": "1.3.6.1.4.1.14988.1.1.1.8.0",
"tx_power": "1.3.6.1.4.1.14988.1.1.1.3.0",
"ssid": "1.3.6.1.4.1.14988.1.1.1.9.0",
"signal": "1.3.6.1.4.1.14988.1.1.1.2.0", # overall RX signal
"noise": "1.3.6.1.4.1.14988.1.1.1.11.0",
"connected_stations": "1.3.6.1.4.1.14988.1.1.1.1.0",
}
# Human-readable display names for each vendor key
VENDOR_DISPLAY = {
"ubiquiti": "Ubiquiti (AirMax / airFiber)",
"cambium_epmp": "Cambium ePMP",
"cambium_pmp450": "Cambium PMP 450",
"cambium_canopy": "Cambium / Motorola Canopy",
"mikrotik": "MikroTik RouterOS",
"unknown": "Unknown",
}
def _snmp_decode(val) -> str | None:
"""Convert any puresnmp return type to a plain string.
puresnmp / x690 returns typed objects:
OctetString → bytes (sysDescr, sysName, SSID …)
Integer → int (frequency, signal …)
ObjectIdentifier → x690 OID object (sysObjectID)
TimeTicks → int
We need to handle all of these without raising.
"""
if val is None:
return None
# x690 ObjectIdentifier — convert to dotted-decimal string
type_name = type(val).__name__
if type_name == "ObjectIdentifier":
try:
# x690 stores the OID as a tuple in .value
if hasattr(val, "value"):
return "." + ".".join(str(n) for n in val.value)
# Fallback: str() on some versions gives dotted notation directly
s = str(val)
if s.startswith("(") or s.startswith("ObjectIdentifier"):
# Parse tuple repr like "(1, 3, 6, 1, 4, 1, 41112, …)"
nums = re.findall(r"\d+", s)
return "." + ".".join(nums) if nums else s
return s
except Exception:
return str(val)
if isinstance(val, bytes):
try:
return val.decode("utf-8", errors="replace").strip()
except Exception:
return repr(val)
# int, float, or anything else
return str(val).strip()
def _snmp_get(host: str, oid: str, community: str, port: int, version: int) -> tuple[str | None, str | None]:
"""Single SNMP GET. Returns (value_str, error_str)."""
try:
val = puresnmp.get(host, community, oid, port=port, version=version)
return _snmp_decode(val), None
except Exception as exc:
return None, str(exc)
def _snmp_get_chain(host: str, oids: list, community: str, port: int, version: int) -> str | None:
"""Try each OID in order; return the first non-None, non-empty result."""
for oid in oids:
val, _ = _snmp_get(host, oid, community, port, version)
if val and val not in ("0", "None"):
return val
return None
# Keywords checked against the combined sysDescr + sysName string
_UBNT_KEYWORDS = (
"ubiquiti", "ubnt", "airmax", "airos", "airfiber",
# AirMax AC / XC platform (AirOS 8.x)
"liteap", "lite ap", "lap-gps", "litap", # LiteAP / LiteAP GPS
"litebeam", "lite beam", # LiteBeam AC
"nanobeam", "nano beam", # NanoBeam AC
"nanostation", "nano station", # NanoStation AC / Loco AC
"nanobridge",
"powerbeam", "power beam", # PowerBeam AC
"isostation", "iso station", # IsoStation AC
"rocket prism", "rocketprism", "rp5ac", "rp-5ac",# Rocket Prism 5AC
"rocket m", "rocketm", # Rocket M (legacy)
"loco m", "locom", # NanoStation Loco M
"picostation",
"airstation",
"edgepoint",
"af-5", "af-24", "af-60", "airfiber", # airFiber
"edgeos", # EdgeRouter (not a radio but same vendor)
)
def _detect_vendor(descr: str, name: str, obj_id: str) -> str:
"""Determine vendor key from sysObjectID (reliable), then sysDescr + sysName keywords."""
o = (obj_id or "").lower()
# Combined text — AirOS puts the model in sysName on some firmware
combined = ((descr or "") + " " + (name or "")).lower()
# ── sysObjectID enterprise number (most reliable) ──────────────────────
if "41112" in o:
return "ubiquiti"
if "17713" in o:
# Distinguish ePMP (.22) from PMP 450 (.21) by the subtree number
after = o.split("17713")[-1]
if after.startswith(".22") or after.startswith("22"):
return "cambium_epmp"
return "cambium_pmp450"
if "161.19" in o or re.search(r"\.161\.1[^4]", o):
return "cambium_canopy"
if "14988" in o:
return "mikrotik"
# ── Keyword fallback ────────────────────────────────────────────────────
if any(k in combined for k in _UBNT_KEYWORDS):
return "ubiquiti"
if any(k in combined for k in (
"epmp", "force 1", "force1", "force 180", "force180",
"force 200", "force200", "force 300", "force300",
"force 400", "force400", "force 425", "force425",
"cnpilot", "ptp 550", "ptp550",
)):
return "cambium_epmp"
if any(k in combined for k in (
"pmp 450", "pmp450", "pmp 430", "pmp430",
"cambium pmp", "cambium networks pmp",
)):
return "cambium_pmp450"
if any(k in combined for k in ("canopy", "motorola pmp", "motorola bh")):
return "cambium_canopy"
if any(k in combined for k in ("cambium", "pmp")):
return "cambium_pmp450"
if any(k in combined for k in ("mikrotik", "routeros")):
return "mikrotik"
return "unknown"
# Probe OIDs tried (in order) when vendor is still unknown after sysObjectID + keywords.
# Uses well-known scalars from different UBNT MIB subtrees so at least one
# should respond on any AirMax / AirOS 8.x / airFiber device.
_UBNT_PROBE_OIDS = [
"1.3.6.1.4.1.41112.1.4.5.0", # AirMax SSID
"1.3.6.1.4.1.41112.1.4.1.1.3.0", # AirMax frequency (M-series)
"1.3.6.1.4.1.41112.1.4.1.1.3.1", # AirMax frequency (AC/XC, e.g. LiteAP GPS)
"1.3.6.1.4.1.41112.1.6.1.1.3.0", # airFiber frequency
]
def poll_device(host: str, community: str = "public", port: int = 161, version: int = 2) -> dict:
raw: dict[str, str | None] = {}
errors: dict[str, str] = {} # OID key → error message, for diagnostics
# Standard OIDs first (sysObjectID drives vendor detection)
for key, oid in _STANDARD_OIDS.items():
val, err = _snmp_get(host, oid, community, port, version)
raw[key] = val
if err and key in ("sysDescr", "sysName", "sysObjectID"):
errors[key] = err
vendor = _detect_vendor(
raw.get("sysDescr") or "",
raw.get("sysName") or "",
raw.get("sysObjectID") or "",
)
# If still unknown, probe several UBNT OIDs — handles AirOS devices whose
# sysDescr is bare "Linux <hostname>" and sysObjectID didn't decode cleanly.
if vendor == "unknown":
for probe_oid in _UBNT_PROBE_OIDS:
val, _ = _snmp_get(host, probe_oid, community, port, version)
if val and val not in ("0", "None"):
vendor = "ubiquiti"
break
# Fetch vendor-specific OIDs
if vendor == "ubiquiti":
for key, oids in _UBNT_OID_CHAINS.items():
raw[key] = _snmp_get_chain(host, oids, community, port, version)
elif vendor == "cambium_epmp":
for key, oid in _EPMP_OIDS.items():
raw[key], err = _snmp_get(host, oid, community, port, version)
if err:
errors[key] = err
elif vendor == "cambium_pmp450":
for key, oid in _PMP450_OIDS.items():
raw[key], err = _snmp_get(host, oid, community, port, version)
if err:
errors[key] = err
elif vendor == "cambium_canopy":
for key, oid in _CANOPY_OIDS.items():
raw[key], err = _snmp_get(host, oid, community, port, version)
if err:
errors[key] = err
elif vendor == "mikrotik":
for key, oid in _MIKROTIK_OIDS.items():
raw[key], err = _snmp_get(host, oid, community, port, version)
if err:
errors[key] = err
# Build pre-filled AP suggestion
suggested = {
"name": raw.get("sysName") or host,
"ssid": raw.get("ssid") or "",
"lat": 0.0,
"lon": 0.0,
"frequency": 0.0,
"signal_strength": -65.0,
"notes": f"SNMP import from {host} · {VENDOR_DISPLAY.get(vendor, vendor)}",
}
for field, key in (("frequency", "frequency"), ("signal_strength", "signal")):
v = raw.get(key)
if v is not None:
try:
suggested[field] = float(v)
except ValueError:
pass
# Normalise kHz → MHz (some firmware reports in kHz)
if suggested["frequency"] > 100_000:
suggested["frequency"] = round(suggested["frequency"] / 1000)
return {
"host": host,
"vendor": vendor,
"vendor_label": VENDOR_DISPLAY.get(vendor, vendor),
"raw": raw,
"suggested": suggested,
"snmp_available": True,
"errors": errors, # diagnostic — shown in UI if non-empty
}
@app.route("/api/snmp/poll", methods=["POST"])
def snmp_poll():
if not SNMP_AVAILABLE:
return jsonify({"error": "puresnmp is not installed"}), 501
data = request.get_json() or {}
host = data.get("host", "").strip()
if not host:
return jsonify({"error": "host is required"}), 400
community = data.get("community", "public").strip()
port = int(data.get("port", 161))
version_str = str(data.get("version", "2c")).strip()
version = 1 if version_str in ("1", "v1") else 2
try:
result = poll_device(host, community, port, version)
return jsonify(result)
except Exception as exc:
return jsonify({"error": str(exc)}), 500
@app.route("/api/snmp/status", methods=["GET"])
def snmp_status():
return jsonify({"available": SNMP_AVAILABLE, "gspread_available": GSPREAD_AVAILABLE})
if __name__ == "__main__":
init_db()
app.run(host="0.0.0.0", port=5000, debug=False)