695 lines
28 KiB
Python
695 lines
28 KiB
Python
import csv
|
||
import io
|
||
import json
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
|
||
import requests
|
||
from flask import Flask, jsonify, request, render_template
|
||
from flask_cors import CORS
|
||
|
||
app = Flask(__name__)
|
||
CORS(app)
|
||
|
||
DB_PATH = os.path.join("data", "wisp.db")
|
||
|
||
# ── Optional SNMP support ────────────────────────────────────────────────────
|
||
try:
|
||
import puresnmp
|
||
import puresnmp.exc
|
||
SNMP_AVAILABLE = True
|
||
except ImportError:
|
||
SNMP_AVAILABLE = False
|
||
|
||
# ── Optional gspread support (private Google Sheets) ────────────────────────
|
||
try:
|
||
import gspread
|
||
from google.oauth2.service_account import Credentials as SACredentials
|
||
GSPREAD_AVAILABLE = True
|
||
except ImportError:
|
||
GSPREAD_AVAILABLE = False
|
||
|
||
|
||
# ── Database ─────────────────────────────────────────────────────────────────
|
||
|
||
def get_db():
|
||
os.makedirs("data", exist_ok=True)
|
||
conn = sqlite3.connect(DB_PATH)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
def init_db():
|
||
conn = get_db()
|
||
conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS access_points (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL,
|
||
ssid TEXT,
|
||
lat REAL NOT NULL,
|
||
lon REAL NOT NULL,
|
||
frequency REAL NOT NULL,
|
||
channel INTEGER,
|
||
antenna_type TEXT DEFAULT 'omni',
|
||
azimuth REAL DEFAULT 0,
|
||
beamwidth REAL DEFAULT 360,
|
||
coverage_radius INTEGER DEFAULT 2000,
|
||
signal_strength REAL DEFAULT -65,
|
||
height REAL DEFAULT 30,
|
||
notes TEXT DEFAULT ''
|
||
)
|
||
""")
|
||
count = conn.execute("SELECT COUNT(*) FROM access_points").fetchone()[0]
|
||
if count == 0:
|
||
demo = [
|
||
# Moscow Mountain — high-elevation hub serving the Palouse region
|
||
("Moscow Mountain", "Palouse-Net", 46.7950, -116.9600, 5800, 149, "omni", 0, 360, 8000, -58, 55, "High-elevation 5 GHz hub"),
|
||
# Downtown Moscow water tower — 2.4 GHz fill-in for dense urban core
|
||
("Downtown Moscow", "Palouse-Net", 46.7317, -117.0002, 2400, 6, "omni", 0, 360, 2500, -67, 30, "Urban 2.4 GHz fill-in"),
|
||
# East sector toward Pullman, WA
|
||
("East Sector - UI", "Palouse-Net", 46.7280, -116.9700, 5800, 157, "sector", 90, 120, 6000, -60, 40, "Sector toward Pullman / WSU"),
|
||
# South sector toward Troy, ID
|
||
("South Sector - Troy","Palouse-Net", 46.6900, -116.9950, 900,None, "sector", 180, 90, 10000, -63, 45, "900 MHz long-range toward Troy"),
|
||
# West sector toward Genesee, ID
|
||
("West Sector - Gen", "Palouse-Net", 46.7350, -117.0800, 5800, 161, "sector", 270, 120, 5000, -62, 38, "Sector toward Genesee"),
|
||
]
|
||
conn.executemany(
|
||
"INSERT INTO access_points (name,ssid,lat,lon,frequency,channel,antenna_type,azimuth,beamwidth,coverage_radius,signal_strength,height,notes) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)",
|
||
demo,
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def row_to_dict(row):
|
||
return dict(row)
|
||
|
||
|
||
def insert_ap(conn, data):
|
||
cur = conn.execute(
|
||
"""INSERT INTO access_points
|
||
(name,ssid,lat,lon,frequency,channel,antenna_type,azimuth,beamwidth,coverage_radius,signal_strength,height,notes)
|
||
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
|
||
(
|
||
data["name"], data.get("ssid", ""), float(data["lat"]), float(data["lon"]),
|
||
float(data["frequency"]), data.get("channel") or None,
|
||
data.get("antenna_type", "omni"),
|
||
float(data.get("azimuth", 0)), float(data.get("beamwidth", 360)),
|
||
int(data.get("coverage_radius", 2000)), float(data.get("signal_strength", -65)),
|
||
float(data.get("height", 30)), data.get("notes", ""),
|
||
),
|
||
)
|
||
return cur.lastrowid
|
||
|
||
|
||
# ── Core AP routes ────────────────────────────────────────────────────────────
|
||
|
||
@app.route("/")
|
||
def index():
|
||
return render_template("index.html")
|
||
|
||
|
||
@app.route("/api/aps", methods=["GET"])
|
||
def get_aps():
|
||
conn = get_db()
|
||
rows = conn.execute("SELECT * FROM access_points ORDER BY name").fetchall()
|
||
conn.close()
|
||
return jsonify([row_to_dict(r) for r in rows])
|
||
|
||
|
||
@app.route("/api/aps", methods=["POST"])
|
||
def create_ap():
|
||
data = request.get_json()
|
||
if not all(k in data for k in ("name", "lat", "lon", "frequency")):
|
||
return jsonify({"error": "Missing required fields: name, lat, lon, frequency"}), 400
|
||
conn = get_db()
|
||
new_id = insert_ap(conn, data)
|
||
conn.commit()
|
||
row = conn.execute("SELECT * FROM access_points WHERE id=?", (new_id,)).fetchone()
|
||
conn.close()
|
||
return jsonify(row_to_dict(row)), 201
|
||
|
||
|
||
@app.route("/api/aps/<int:ap_id>", methods=["PUT"])
|
||
def update_ap(ap_id):
|
||
data = request.get_json()
|
||
conn = get_db()
|
||
existing = conn.execute("SELECT * FROM access_points WHERE id=?", (ap_id,)).fetchone()
|
||
if not existing:
|
||
conn.close()
|
||
return jsonify({"error": "Not found"}), 404
|
||
merged = {**row_to_dict(existing), **data, "id": ap_id}
|
||
conn.execute(
|
||
"""UPDATE access_points SET
|
||
name=?,ssid=?,lat=?,lon=?,frequency=?,channel=?,antenna_type=?,
|
||
azimuth=?,beamwidth=?,coverage_radius=?,signal_strength=?,height=?,notes=?
|
||
WHERE id=?""",
|
||
(
|
||
merged["name"], merged["ssid"], merged["lat"], merged["lon"],
|
||
merged["frequency"], merged["channel"], merged["antenna_type"],
|
||
merged["azimuth"], merged["beamwidth"], merged["coverage_radius"],
|
||
merged["signal_strength"], merged["height"], merged["notes"], ap_id,
|
||
),
|
||
)
|
||
conn.commit()
|
||
row = conn.execute("SELECT * FROM access_points WHERE id=?", (ap_id,)).fetchone()
|
||
conn.close()
|
||
return jsonify(row_to_dict(row))
|
||
|
||
|
||
@app.route("/api/aps/<int:ap_id>", methods=["DELETE"])
|
||
def delete_ap(ap_id):
|
||
conn = get_db()
|
||
result = conn.execute("DELETE FROM access_points WHERE id=?", (ap_id,))
|
||
conn.commit()
|
||
conn.close()
|
||
if result.rowcount == 0:
|
||
return jsonify({"error": "Not found"}), 404
|
||
return jsonify({"deleted": ap_id})
|
||
|
||
|
||
# ── Google Sheets import ──────────────────────────────────────────────────────
|
||
|
||
# Column name aliases (sheet header → internal field)
|
||
COLUMN_ALIASES = {
|
||
"name": "name", "tower": "name", "ap name": "name", "ap_name": "name",
|
||
"ssid": "ssid", "network": "ssid",
|
||
"lat": "lat", "latitude": "lat",
|
||
"lon": "lon", "lng": "lon", "longitude": "lon",
|
||
"frequency": "frequency", "freq": "frequency", "freq (mhz)": "frequency", "frequency (mhz)": "frequency",
|
||
"channel": "channel", "ch": "channel",
|
||
"antenna_type": "antenna_type", "antenna type": "antenna_type", "antenna": "antenna_type",
|
||
"azimuth": "azimuth", "bearing": "azimuth",
|
||
"beamwidth": "beamwidth", "beam_width": "beamwidth", "beam": "beamwidth",
|
||
"coverage_radius": "coverage_radius", "coverage radius": "coverage_radius",
|
||
"radius": "coverage_radius", "radius (m)": "coverage_radius",
|
||
"signal_strength": "signal_strength", "signal": "signal_strength",
|
||
"signal (dbm)": "signal_strength", "rssi": "signal_strength",
|
||
"height": "height", "tower height": "height", "height (m)": "height",
|
||
"notes": "notes", "comments": "notes", "description": "notes",
|
||
}
|
||
|
||
|
||
def _sheets_csv_url(url: str) -> str | None:
|
||
"""Convert any Google Sheets URL to a CSV export URL."""
|
||
m = re.search(r"/spreadsheets/d/([a-zA-Z0-9_-]+)", url)
|
||
if not m:
|
||
return None
|
||
sheet_id = m.group(1)
|
||
gid_m = re.search(r"[#&?]gid=(\d+)", url)
|
||
gid = gid_m.group(1) if gid_m else "0"
|
||
return f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv&gid={gid}"
|
||
|
||
|
||
def _normalise_row(raw: dict) -> dict:
|
||
"""Map sheet column names to internal field names (case-insensitive)."""
|
||
out = {}
|
||
for k, v in raw.items():
|
||
normalised = COLUMN_ALIASES.get(k.strip().lower())
|
||
if normalised and v.strip():
|
||
out[normalised] = v.strip()
|
||
return out
|
||
|
||
|
||
def _import_csv_rows(reader) -> tuple[list, list]:
|
||
imported, errors = [], []
|
||
conn = get_db()
|
||
for i, raw in enumerate(reader, start=2): # row 1 = header
|
||
row = _normalise_row(raw)
|
||
missing = [f for f in ("name", "lat", "lon", "frequency") if f not in row]
|
||
if missing:
|
||
errors.append({"row": i, "error": f"Missing columns: {', '.join(missing)}", "data": raw})
|
||
continue
|
||
try:
|
||
new_id = insert_ap(conn, row)
|
||
conn.commit()
|
||
imported.append({"id": new_id, "name": row["name"]})
|
||
except Exception as exc:
|
||
errors.append({"row": i, "error": str(exc), "data": raw})
|
||
conn.close()
|
||
return imported, errors
|
||
|
||
|
||
@app.route("/api/import/sheets", methods=["POST"])
|
||
def import_from_sheets():
|
||
"""Import APs from a Google Sheet.
|
||
|
||
Body JSON:
|
||
url – Sheets URL (required for public sheets)
|
||
credentials – service account JSON string (optional, for private sheets)
|
||
"""
|
||
data = request.get_json() or {}
|
||
url = data.get("url", "").strip()
|
||
credentials_json = data.get("credentials") # optional service-account JSON string
|
||
|
||
if not url:
|
||
return jsonify({"error": "url is required"}), 400
|
||
|
||
# ── Private sheet via service account ──
|
||
if credentials_json:
|
||
if not GSPREAD_AVAILABLE:
|
||
return jsonify({"error": "gspread is not installed"}), 500
|
||
try:
|
||
creds_dict = json.loads(credentials_json)
|
||
creds = SACredentials.from_service_account_info(
|
||
creds_dict,
|
||
scopes=["https://www.googleapis.com/auth/spreadsheets.readonly"],
|
||
)
|
||
gc = gspread.authorize(creds)
|
||
sh = gc.open_by_url(url)
|
||
# Use first worksheet
|
||
ws = sh.get_worksheet(0)
|
||
records = ws.get_all_records()
|
||
# Convert to csv.DictReader-like list of dicts
|
||
imported, errors = [], []
|
||
conn = get_db()
|
||
for i, raw in enumerate(records, start=2):
|
||
row = _normalise_row({k: str(v) for k, v in raw.items()})
|
||
missing = [f for f in ("name", "lat", "lon", "frequency") if f not in row]
|
||
if missing:
|
||
errors.append({"row": i, "error": f"Missing columns: {', '.join(missing)}", "data": raw})
|
||
continue
|
||
try:
|
||
new_id = insert_ap(conn, row)
|
||
conn.commit()
|
||
imported.append({"id": new_id, "name": row["name"]})
|
||
except Exception as exc:
|
||
errors.append({"row": i, "error": str(exc)})
|
||
conn.close()
|
||
return jsonify({"imported": len(imported), "errors": errors, "aps": imported})
|
||
except json.JSONDecodeError:
|
||
return jsonify({"error": "Invalid service account JSON"}), 400
|
||
except Exception as exc:
|
||
return jsonify({"error": str(exc)}), 400
|
||
|
||
# ── Public sheet via CSV export ──
|
||
csv_url = _sheets_csv_url(url)
|
||
if not csv_url:
|
||
return jsonify({"error": "Could not parse a Google Sheets ID from that URL"}), 400
|
||
|
||
try:
|
||
resp = requests.get(csv_url, timeout=15)
|
||
resp.raise_for_status()
|
||
except requests.RequestException as exc:
|
||
return jsonify({"error": f"Failed to fetch sheet: {exc}"}), 400
|
||
|
||
content = resp.text
|
||
# Google returns an HTML login page for private sheets
|
||
if "accounts.google.com" in content or "Sign in" in content[:500]:
|
||
return jsonify({
|
||
"error": "Sheet is private. Share it publicly ('Anyone with the link can view') or provide a service account JSON."
|
||
}), 403
|
||
|
||
reader = csv.DictReader(io.StringIO(content))
|
||
imported, errors = _import_csv_rows(reader)
|
||
return jsonify({"imported": len(imported), "errors": errors, "aps": imported})
|
||
|
||
|
||
@app.route("/api/import/sheets/preview", methods=["POST"])
|
||
def preview_sheets():
|
||
"""Return first 5 data rows + detected column mapping without importing."""
|
||
data = request.get_json() or {}
|
||
url = data.get("url", "").strip()
|
||
if not url:
|
||
return jsonify({"error": "url is required"}), 400
|
||
|
||
csv_url = _sheets_csv_url(url)
|
||
if not csv_url:
|
||
return jsonify({"error": "Could not parse a Google Sheets ID from that URL"}), 400
|
||
|
||
try:
|
||
resp = requests.get(csv_url, timeout=15)
|
||
resp.raise_for_status()
|
||
except requests.RequestException as exc:
|
||
return jsonify({"error": str(exc)}), 400
|
||
|
||
if "accounts.google.com" in resp.text or "Sign in" in resp.text[:500]:
|
||
return jsonify({"error": "Sheet is private. Share it publicly or provide credentials."}), 403
|
||
|
||
reader = csv.DictReader(io.StringIO(resp.text))
|
||
headers = reader.fieldnames or []
|
||
mapping = {h: COLUMN_ALIASES.get(h.strip().lower(), None) for h in headers}
|
||
rows = [dict(r) for r, _ in zip(reader, range(5))]
|
||
return jsonify({"headers": headers, "mapping": mapping, "sample_rows": rows})
|
||
|
||
|
||
# ── SNMP polling ──────────────────────────────────────────────────────────────
|
||
|
||
_STANDARD_OIDS = {
|
||
"sysDescr": "1.3.6.1.2.1.1.1.0",
|
||
"sysName": "1.3.6.1.2.1.1.5.0",
|
||
"sysLocation": "1.3.6.1.2.1.1.6.0",
|
||
"sysContact": "1.3.6.1.2.1.1.4.0",
|
||
"sysObjectID": "1.3.6.1.2.1.1.2.0", # critical for reliable vendor detection
|
||
}
|
||
|
||
# ── Ubiquiti ──────────────────────────────────────────────────────────────────
|
||
# Each key maps to an ordered list of OIDs to try; first non-None wins.
|
||
# This covers three hardware generations in one pass:
|
||
# idx .0 — AirOS 6.x / M-series (Rocket M2/M5, Bullet M, NanoStation M,
|
||
# NanoBridge M, PowerBeam M, AirGrid M …)
|
||
# idx .1 — AirOS 8.x / AC-series / XC platform (Rocket Prism 5AC Gen1/Gen2,
|
||
# LiteBeam AC, NanoBeam AC, PowerBeam AC, IsoStation 5AC …)
|
||
# .1.6 — airFiber backhaul (AF-5X, AF-5XHD, AF-24, AF-60-LR …)
|
||
_UBNT_OID_CHAINS = {
|
||
"frequency": [
|
||
"1.3.6.1.4.1.41112.1.4.1.1.3.0", # AirOS 6.x M-series
|
||
"1.3.6.1.4.1.41112.1.4.1.1.3.1", # AirOS 8.x AC-series / XC
|
||
"1.3.6.1.4.1.41112.1.6.1.1.3.0", # airFiber
|
||
],
|
||
"channel_width": [
|
||
"1.3.6.1.4.1.41112.1.4.1.1.4.0",
|
||
"1.3.6.1.4.1.41112.1.4.1.1.4.1",
|
||
"1.3.6.1.4.1.41112.1.6.1.1.4.0",
|
||
],
|
||
"tx_power": [
|
||
"1.3.6.1.4.1.41112.1.4.1.1.9.0",
|
||
"1.3.6.1.4.1.41112.1.4.1.1.9.1",
|
||
"1.3.6.1.4.1.41112.1.6.1.1.9.0",
|
||
],
|
||
"ssid": [
|
||
"1.3.6.1.4.1.41112.1.4.5.0", # AirMax SSID
|
||
"1.3.6.1.4.1.41112.1.6.4.0", # airFiber network name
|
||
],
|
||
"signal": [
|
||
"1.3.6.1.4.1.41112.1.4.7.1.5.1", # AirMax station 1 signal
|
||
"1.3.6.1.4.1.41112.1.4.7.1.5.2", # chain 1 fallback
|
||
"1.3.6.1.4.1.41112.1.6.7.1.3.1", # airFiber remote signal
|
||
],
|
||
"noise": [
|
||
"1.3.6.1.4.1.41112.1.4.7.1.6.1",
|
||
"1.3.6.1.4.1.41112.1.4.7.1.6.2",
|
||
],
|
||
"connected_stations": [
|
||
"1.3.6.1.4.1.41112.1.4.4.0", # AirMax registered clients
|
||
],
|
||
}
|
||
|
||
# ── Cambium ePMP ──────────────────────────────────────────────────────────────
|
||
# ePMP 1000 / 2000 / 3000 / 4500 / Force 180/200/300/425/4525 (MIB .22)
|
||
_EPMP_OIDS = {
|
||
"frequency": "1.3.6.1.4.1.17713.22.1.1.1.2.0",
|
||
"channel_width": "1.3.6.1.4.1.17713.22.1.1.1.6.0",
|
||
"tx_power": "1.3.6.1.4.1.17713.22.1.1.1.9.0",
|
||
"ssid": "1.3.6.1.4.1.17713.22.1.1.3.1.0",
|
||
"color_code": "1.3.6.1.4.1.17713.22.1.1.3.7.0",
|
||
"mode": "1.3.6.1.4.1.17713.22.1.1.1.5.0", # 1=AP 2=SM
|
||
"connected_stations": "1.3.6.1.4.1.17713.22.1.2.1.0",
|
||
"dl_mcs": "1.3.6.1.4.1.17713.22.1.1.5.2.0",
|
||
"ul_mcs": "1.3.6.1.4.1.17713.22.1.1.5.3.0",
|
||
}
|
||
|
||
# ── Cambium PMP 450 family ────────────────────────────────────────────────────
|
||
# PMP 450 / 450i / 450m / 450d / 450b / 450v (MIB .21)
|
||
_PMP450_OIDS = {
|
||
"frequency": "1.3.6.1.4.1.17713.21.1.1.18.0",
|
||
"channel_width": "1.3.6.1.4.1.17713.21.1.1.22.0",
|
||
"tx_power": "1.3.6.1.4.1.17713.21.1.1.27.0",
|
||
"ssid": "1.3.6.1.4.1.17713.21.1.1.3.0",
|
||
"signal": "1.3.6.1.4.1.17713.21.1.2.1.0",
|
||
"color_code": "1.3.6.1.4.1.17713.21.1.1.25.0",
|
||
"connected_stations": "1.3.6.1.4.1.17713.21.1.4.1.0",
|
||
}
|
||
|
||
# ── Cambium legacy Canopy / Motorola PMP 100 / PMP 400 / PMP 430 ─────────────
|
||
# (Enterprise OID 161 = Motorola; devices pre-date the Cambium rebrand)
|
||
_CANOPY_OIDS = {
|
||
"frequency": "1.3.6.1.4.1.161.19.89.1.1.5.0",
|
||
"ssid": "1.3.6.1.4.1.161.19.89.1.1.1.0",
|
||
"signal": "1.3.6.1.4.1.161.19.89.2.1.33.0",
|
||
"tx_power": "1.3.6.1.4.1.161.19.89.1.1.6.0",
|
||
}
|
||
|
||
# ── MikroTik ──────────────────────────────────────────────────────────────────
|
||
# RouterOS wireless (wAP, SXT, BaseBox, Groove, LHG, mANTBox, Audience…)
|
||
_MIKROTIK_OIDS = {
|
||
"frequency": "1.3.6.1.4.1.14988.1.1.1.7.0",
|
||
"channel_width": "1.3.6.1.4.1.14988.1.1.1.8.0",
|
||
"tx_power": "1.3.6.1.4.1.14988.1.1.1.3.0",
|
||
"ssid": "1.3.6.1.4.1.14988.1.1.1.9.0",
|
||
"signal": "1.3.6.1.4.1.14988.1.1.1.2.0", # overall RX signal
|
||
"noise": "1.3.6.1.4.1.14988.1.1.1.11.0",
|
||
"connected_stations": "1.3.6.1.4.1.14988.1.1.1.1.0",
|
||
}
|
||
|
||
# Human-readable display names for each vendor key
|
||
VENDOR_DISPLAY = {
|
||
"ubiquiti": "Ubiquiti (AirMax / airFiber)",
|
||
"cambium_epmp": "Cambium ePMP",
|
||
"cambium_pmp450": "Cambium PMP 450",
|
||
"cambium_canopy": "Cambium / Motorola Canopy",
|
||
"mikrotik": "MikroTik RouterOS",
|
||
"unknown": "Unknown",
|
||
}
|
||
|
||
|
||
def _snmp_decode(val) -> str | None:
|
||
"""Convert any puresnmp return type to a plain string.
|
||
|
||
puresnmp / x690 returns typed objects:
|
||
OctetString → bytes (sysDescr, sysName, SSID …)
|
||
Integer → int (frequency, signal …)
|
||
ObjectIdentifier → x690 OID object (sysObjectID)
|
||
TimeTicks → int
|
||
We need to handle all of these without raising.
|
||
"""
|
||
if val is None:
|
||
return None
|
||
# x690 ObjectIdentifier — convert to dotted-decimal string
|
||
type_name = type(val).__name__
|
||
if type_name == "ObjectIdentifier":
|
||
try:
|
||
# x690 stores the OID as a tuple in .value
|
||
if hasattr(val, "value"):
|
||
return "." + ".".join(str(n) for n in val.value)
|
||
# Fallback: str() on some versions gives dotted notation directly
|
||
s = str(val)
|
||
if s.startswith("(") or s.startswith("ObjectIdentifier"):
|
||
# Parse tuple repr like "(1, 3, 6, 1, 4, 1, 41112, …)"
|
||
nums = re.findall(r"\d+", s)
|
||
return "." + ".".join(nums) if nums else s
|
||
return s
|
||
except Exception:
|
||
return str(val)
|
||
if isinstance(val, bytes):
|
||
try:
|
||
return val.decode("utf-8", errors="replace").strip()
|
||
except Exception:
|
||
return repr(val)
|
||
# int, float, or anything else
|
||
return str(val).strip()
|
||
|
||
|
||
def _snmp_get(host: str, oid: str, community: str, port: int, version: int) -> tuple[str | None, str | None]:
|
||
"""Single SNMP GET. Returns (value_str, error_str)."""
|
||
try:
|
||
val = puresnmp.get(host, community, oid, port=port, version=version)
|
||
return _snmp_decode(val), None
|
||
except Exception as exc:
|
||
return None, str(exc)
|
||
|
||
|
||
def _snmp_get_chain(host: str, oids: list, community: str, port: int, version: int) -> str | None:
|
||
"""Try each OID in order; return the first non-None, non-empty result."""
|
||
for oid in oids:
|
||
val, _ = _snmp_get(host, oid, community, port, version)
|
||
if val and val not in ("0", "None"):
|
||
return val
|
||
return None
|
||
|
||
|
||
# Keywords checked against the combined sysDescr + sysName string
|
||
_UBNT_KEYWORDS = (
|
||
"ubiquiti", "ubnt", "airmax", "airos", "airfiber",
|
||
# AirMax AC / XC platform (AirOS 8.x)
|
||
"liteap", "lite ap", "lap-gps", "litap", # LiteAP / LiteAP GPS
|
||
"litebeam", "lite beam", # LiteBeam AC
|
||
"nanobeam", "nano beam", # NanoBeam AC
|
||
"nanostation", "nano station", # NanoStation AC / Loco AC
|
||
"nanobridge",
|
||
"powerbeam", "power beam", # PowerBeam AC
|
||
"isostation", "iso station", # IsoStation AC
|
||
"rocket prism", "rocketprism", "rp5ac", "rp-5ac",# Rocket Prism 5AC
|
||
"rocket m", "rocketm", # Rocket M (legacy)
|
||
"loco m", "locom", # NanoStation Loco M
|
||
"picostation",
|
||
"airstation",
|
||
"edgepoint",
|
||
"af-5", "af-24", "af-60", "airfiber", # airFiber
|
||
"edgeos", # EdgeRouter (not a radio but same vendor)
|
||
)
|
||
|
||
|
||
def _detect_vendor(descr: str, name: str, obj_id: str) -> str:
|
||
"""Determine vendor key from sysObjectID (reliable), then sysDescr + sysName keywords."""
|
||
o = (obj_id or "").lower()
|
||
# Combined text — AirOS puts the model in sysName on some firmware
|
||
combined = ((descr or "") + " " + (name or "")).lower()
|
||
|
||
# ── sysObjectID enterprise number (most reliable) ──────────────────────
|
||
if "41112" in o:
|
||
return "ubiquiti"
|
||
if "17713" in o:
|
||
# Distinguish ePMP (.22) from PMP 450 (.21) by the subtree number
|
||
after = o.split("17713")[-1]
|
||
if after.startswith(".22") or after.startswith("22"):
|
||
return "cambium_epmp"
|
||
return "cambium_pmp450"
|
||
if "161.19" in o or re.search(r"\.161\.1[^4]", o):
|
||
return "cambium_canopy"
|
||
if "14988" in o:
|
||
return "mikrotik"
|
||
|
||
# ── Keyword fallback ────────────────────────────────────────────────────
|
||
if any(k in combined for k in _UBNT_KEYWORDS):
|
||
return "ubiquiti"
|
||
if any(k in combined for k in (
|
||
"epmp", "force 1", "force1", "force 180", "force180",
|
||
"force 200", "force200", "force 300", "force300",
|
||
"force 400", "force400", "force 425", "force425",
|
||
"cnpilot", "ptp 550", "ptp550",
|
||
)):
|
||
return "cambium_epmp"
|
||
if any(k in combined for k in (
|
||
"pmp 450", "pmp450", "pmp 430", "pmp430",
|
||
"cambium pmp", "cambium networks pmp",
|
||
)):
|
||
return "cambium_pmp450"
|
||
if any(k in combined for k in ("canopy", "motorola pmp", "motorola bh")):
|
||
return "cambium_canopy"
|
||
if any(k in combined for k in ("cambium", "pmp")):
|
||
return "cambium_pmp450"
|
||
if any(k in combined for k in ("mikrotik", "routeros")):
|
||
return "mikrotik"
|
||
|
||
return "unknown"
|
||
|
||
|
||
# Probe OIDs tried (in order) when vendor is still unknown after sysObjectID + keywords.
|
||
# Uses well-known scalars from different UBNT MIB subtrees so at least one
|
||
# should respond on any AirMax / AirOS 8.x / airFiber device.
|
||
_UBNT_PROBE_OIDS = [
|
||
"1.3.6.1.4.1.41112.1.4.5.0", # AirMax SSID
|
||
"1.3.6.1.4.1.41112.1.4.1.1.3.0", # AirMax frequency (M-series)
|
||
"1.3.6.1.4.1.41112.1.4.1.1.3.1", # AirMax frequency (AC/XC, e.g. LiteAP GPS)
|
||
"1.3.6.1.4.1.41112.1.6.1.1.3.0", # airFiber frequency
|
||
]
|
||
|
||
|
||
def poll_device(host: str, community: str = "public", port: int = 161, version: int = 2) -> dict:
|
||
raw: dict[str, str | None] = {}
|
||
errors: dict[str, str] = {} # OID key → error message, for diagnostics
|
||
|
||
# Standard OIDs first (sysObjectID drives vendor detection)
|
||
for key, oid in _STANDARD_OIDS.items():
|
||
val, err = _snmp_get(host, oid, community, port, version)
|
||
raw[key] = val
|
||
if err and key in ("sysDescr", "sysName", "sysObjectID"):
|
||
errors[key] = err
|
||
|
||
vendor = _detect_vendor(
|
||
raw.get("sysDescr") or "",
|
||
raw.get("sysName") or "",
|
||
raw.get("sysObjectID") or "",
|
||
)
|
||
|
||
# If still unknown, probe several UBNT OIDs — handles AirOS devices whose
|
||
# sysDescr is bare "Linux <hostname>" and sysObjectID didn't decode cleanly.
|
||
if vendor == "unknown":
|
||
for probe_oid in _UBNT_PROBE_OIDS:
|
||
val, _ = _snmp_get(host, probe_oid, community, port, version)
|
||
if val and val not in ("0", "None"):
|
||
vendor = "ubiquiti"
|
||
break
|
||
|
||
# Fetch vendor-specific OIDs
|
||
if vendor == "ubiquiti":
|
||
for key, oids in _UBNT_OID_CHAINS.items():
|
||
raw[key] = _snmp_get_chain(host, oids, community, port, version)
|
||
elif vendor == "cambium_epmp":
|
||
for key, oid in _EPMP_OIDS.items():
|
||
raw[key], err = _snmp_get(host, oid, community, port, version)
|
||
if err:
|
||
errors[key] = err
|
||
elif vendor == "cambium_pmp450":
|
||
for key, oid in _PMP450_OIDS.items():
|
||
raw[key], err = _snmp_get(host, oid, community, port, version)
|
||
if err:
|
||
errors[key] = err
|
||
elif vendor == "cambium_canopy":
|
||
for key, oid in _CANOPY_OIDS.items():
|
||
raw[key], err = _snmp_get(host, oid, community, port, version)
|
||
if err:
|
||
errors[key] = err
|
||
elif vendor == "mikrotik":
|
||
for key, oid in _MIKROTIK_OIDS.items():
|
||
raw[key], err = _snmp_get(host, oid, community, port, version)
|
||
if err:
|
||
errors[key] = err
|
||
|
||
# Build pre-filled AP suggestion
|
||
suggested = {
|
||
"name": raw.get("sysName") or host,
|
||
"ssid": raw.get("ssid") or "",
|
||
"lat": 0.0,
|
||
"lon": 0.0,
|
||
"frequency": 0.0,
|
||
"signal_strength": -65.0,
|
||
"notes": f"SNMP import from {host} · {VENDOR_DISPLAY.get(vendor, vendor)}",
|
||
}
|
||
|
||
for field, key in (("frequency", "frequency"), ("signal_strength", "signal")):
|
||
v = raw.get(key)
|
||
if v is not None:
|
||
try:
|
||
suggested[field] = float(v)
|
||
except ValueError:
|
||
pass
|
||
|
||
# Normalise kHz → MHz (some firmware reports in kHz)
|
||
if suggested["frequency"] > 100_000:
|
||
suggested["frequency"] = round(suggested["frequency"] / 1000)
|
||
|
||
return {
|
||
"host": host,
|
||
"vendor": vendor,
|
||
"vendor_label": VENDOR_DISPLAY.get(vendor, vendor),
|
||
"raw": raw,
|
||
"suggested": suggested,
|
||
"snmp_available": True,
|
||
"errors": errors, # diagnostic — shown in UI if non-empty
|
||
}
|
||
|
||
|
||
@app.route("/api/snmp/poll", methods=["POST"])
|
||
def snmp_poll():
|
||
if not SNMP_AVAILABLE:
|
||
return jsonify({"error": "puresnmp is not installed"}), 501
|
||
|
||
data = request.get_json() or {}
|
||
host = data.get("host", "").strip()
|
||
if not host:
|
||
return jsonify({"error": "host is required"}), 400
|
||
|
||
community = data.get("community", "public").strip()
|
||
port = int(data.get("port", 161))
|
||
version_str = str(data.get("version", "2c")).strip()
|
||
version = 1 if version_str in ("1", "v1") else 2
|
||
|
||
try:
|
||
result = poll_device(host, community, port, version)
|
||
return jsonify(result)
|
||
except Exception as exc:
|
||
return jsonify({"error": str(exc)}), 500
|
||
|
||
|
||
@app.route("/api/snmp/status", methods=["GET"])
|
||
def snmp_status():
|
||
return jsonify({"available": SNMP_AVAILABLE, "gspread_available": GSPREAD_AVAILABLE})
|
||
|
||
|
||
if __name__ == "__main__":
|
||
init_db()
|
||
app.run(host="0.0.0.0", port=5000, debug=False)
|