feat(20-02): add parse-mib and test-profile API endpoints

- POST /snmp-profiles/parse-mib: upload MIB file, subprocess-call tod-mib-parser, return OID tree JSON
- POST /snmp-profiles/{id}/test: test profile connectivity via NATS discovery probe to poller
- New snmp_proxy service module following routeros_proxy.py lazy NATS pattern
- Pydantic schemas: MIBParseResponse, ProfileTestRequest, ProfileTestResponse, ProfileTestOIDResult
- MIB_PARSER_PATH config setting with /app/tod-mib-parser default
- MIB parse errors return 422, not 500; temp file cleanup in finally block

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jason Staack
2026-03-21 20:21:08 -05:00
parent 77e9b680ae
commit 655f1eadae
7 changed files with 412 additions and 28 deletions

View File

@@ -139,6 +139,9 @@ class Settings(BaseSettings):
# Commercial license required above this limit.
LICENSE_DEVICES: int = 250
# MIB parser binary path (tod-mib-parser Go binary)
MIB_PARSER_PATH: str = "/app/tod-mib-parser"
# App settings
APP_NAME: str = "TOD - The Other Dude"
APP_VERSION: str = "9.7.2"

View File

@@ -7,33 +7,52 @@ Provides listing, creation, update, and deletion of SNMP device profiles.
System-shipped profiles (is_system=True, tenant_id IS NULL) are visible to
all tenants but cannot be modified or deleted.
Additional endpoints:
- POST /snmp-profiles/parse-mib: Upload a MIB file, parse via tod-mib-parser binary
- POST /snmp-profiles/{id}/test: Test a profile against a live device via NATS
RBAC:
- devices:read scope: GET (list, detail)
- operator+: POST, PUT (create, update tenant profiles)
- operator+: POST, PUT (create, update tenant profiles, parse-mib, test)
- tenant_admin+: DELETE (delete tenant profiles)
"""
import json
import logging
import shutil
import subprocess
import tempfile
import uuid
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, UploadFile, status
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
from app.middleware.rbac import require_operator_or_above, require_scope, require_tenant_admin_or_above
from app.middleware.tenant_context import CurrentUser, get_current_user
from app.routers.devices import _check_tenant_access
from app.schemas.snmp_profile import (
MIBParseResponse,
ProfileTestRequest,
ProfileTestResponse,
SNMPProfileCreate,
SNMPProfileDetailResponse,
SNMPProfileListResponse,
SNMPProfileResponse,
SNMPProfileUpdate,
)
from app.services import snmp_proxy
logger = logging.getLogger(__name__)
router = APIRouter(tags=["snmp-profiles"])
# Resolve MIB parser binary path: prefer settings, fall back to PATH lookup
MIB_PARSER_BINARY = shutil.which("tod-mib-parser") or settings.MIB_PARSER_PATH
# ---------------------------------------------------------------------------
# List profiles (system + tenant)
@@ -303,3 +322,179 @@ async def delete_profile(
{"profile_id": str(profile_id), "tenant_id": str(tenant_id)},
)
await db.commit()
# ---------------------------------------------------------------------------
# Parse MIB file
# ---------------------------------------------------------------------------
@router.post(
"/tenants/{tenant_id}/snmp-profiles/parse-mib",
response_model=MIBParseResponse,
summary="Upload and parse a MIB file",
dependencies=[Depends(require_operator_or_above)],
)
async def parse_mib(
tenant_id: uuid.UUID,
file: UploadFile,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> MIBParseResponse:
"""Upload a MIB file and parse it using the tod-mib-parser binary.
The binary reads the MIB, extracts OID definitions, and returns a JSON
tree of nodes suitable for building an SNMP profile.
Returns 422 if the MIB file is invalid or the parser encounters an error.
"""
await _check_tenant_access(current_user, tenant_id, db)
tmp_path: str | None = None
try:
# Save uploaded file to a temporary path
with tempfile.NamedTemporaryFile(suffix=".mib", delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
# Call the MIB parser binary
try:
result = subprocess.run(
[MIB_PARSER_BINARY, tmp_path],
capture_output=True,
text=True,
timeout=30,
)
except FileNotFoundError:
logger.error("MIB parser binary not found at %s", MIB_PARSER_BINARY)
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="MIB parser not available",
)
except subprocess.TimeoutExpired:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="MIB parser timed out",
)
if result.returncode != 0:
error_msg = result.stderr.strip() or "MIB parser failed"
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=error_msg,
)
# Parse the JSON output
try:
parsed = json.loads(result.stdout)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="MIB parser returned invalid JSON",
)
# Check for parser-level error in the output
if "error" in parsed and parsed["error"]:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=parsed["error"],
)
# Build response
nodes = parsed.get("nodes", [])
return MIBParseResponse(
module_name=parsed.get("module_name", file.filename or "unknown"),
nodes=nodes,
node_count=len(nodes),
)
finally:
# Clean up temporary file
if tmp_path:
import os
try:
os.unlink(tmp_path)
except OSError:
pass
# ---------------------------------------------------------------------------
# Test profile against live device
# ---------------------------------------------------------------------------
@router.post(
"/tenants/{tenant_id}/snmp-profiles/{profile_id}/test",
response_model=ProfileTestResponse,
summary="Test an SNMP profile against a live device",
dependencies=[Depends(require_operator_or_above)],
)
async def test_profile(
tenant_id: uuid.UUID,
profile_id: uuid.UUID,
data: ProfileTestRequest,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> ProfileTestResponse:
"""Test connectivity to a device using the provided SNMP credentials.
Sends an SNMP discovery probe to the target device via the Go poller's
DiscoveryResponder (NATS request-reply to device.discover.snmp). Returns
the device's sysObjectID, sysDescr, and sysName if reachable.
This validates that the device is reachable with the given credentials,
which is the core requirement for PROF-05.
"""
await _check_tenant_access(current_user, tenant_id, db)
# Verify the profile exists and is visible to this tenant
result = await db.execute(
text("""
SELECT id, sys_object_id
FROM snmp_profiles
WHERE id = :profile_id
AND (tenant_id = :tenant_id OR tenant_id IS NULL)
"""),
{"profile_id": str(profile_id), "tenant_id": str(tenant_id)},
)
profile_row = result.mappings().first()
if not profile_row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="SNMP profile not found",
)
# Send discovery probe via NATS
discovery = await snmp_proxy.snmp_discover(
ip_address=data.ip_address,
snmp_port=data.snmp_port,
snmp_version=data.snmp_version,
community=data.community,
security_level=data.security_level,
username=data.username,
auth_protocol=data.auth_protocol,
auth_passphrase=data.auth_passphrase,
priv_protocol=data.priv_protocol,
priv_passphrase=data.priv_passphrase,
)
# Check for discovery error
if discovery.get("error"):
return ProfileTestResponse(
success=False,
error=discovery["error"],
)
# Build device info from discovery response
device_info = {
"sys_object_id": discovery.get("sys_object_id", ""),
"sys_descr": discovery.get("sys_descr", ""),
"sys_name": discovery.get("sys_name", ""),
}
return ProfileTestResponse(
success=True,
device_info=device_info,
)

View File

@@ -91,3 +91,69 @@ class SNMPProfileListResponse(BaseModel):
"""List of SNMP profiles."""
profiles: list[SNMPProfileResponse]
# ---------------------------------------------------------------------------
# MIB Parse schemas
# ---------------------------------------------------------------------------
class MIBParseResponse(BaseModel):
"""Response from MIB file parsing."""
module_name: str
nodes: list[dict] # OIDNode tree from tod-mib-parser
node_count: int
class MIBParseErrorResponse(BaseModel):
"""Error response when MIB parsing fails."""
error: str
# ---------------------------------------------------------------------------
# Profile Test schemas
# ---------------------------------------------------------------------------
class ProfileTestRequest(BaseModel):
"""Request to test a profile's OIDs against a live device."""
ip_address: str
snmp_port: int = 161
snmp_version: str # "v1", "v2c", "v3"
# v1/v2c
community: Optional[str] = None
# v3
security_level: Optional[str] = None
username: Optional[str] = None
auth_protocol: Optional[str] = None
auth_passphrase: Optional[str] = None
priv_protocol: Optional[str] = None
priv_passphrase: Optional[str] = None
@field_validator("snmp_version")
@classmethod
def validate_version(cls, v: str) -> str:
if v not in ("v1", "v2c", "v3"):
raise ValueError("snmp_version must be v1, v2c, or v3")
return v
class ProfileTestOIDResult(BaseModel):
"""Result of polling a single OID against a live device."""
oid: str
name: str
value: Optional[str] = None
error: Optional[str] = None
class ProfileTestResponse(BaseModel):
"""Response from testing a profile against a live device."""
success: bool
device_info: Optional[dict] = None # sys_object_id, sys_descr, sys_name
results: list[ProfileTestOIDResult] = []
error: Optional[str] = None

View File

@@ -0,0 +1,112 @@
"""SNMP proxy via NATS request-reply.
Sends SNMP discovery/test requests to the Go poller's DiscoveryResponder
subscription (device.discover.snmp) and returns structured response data.
Used by:
- SNMP profile test endpoint (verify device connectivity with credentials)
"""
import json
import logging
from typing import Any, Optional
import nats
import nats.aio.client
from app.config import settings
logger = logging.getLogger(__name__)
# Module-level NATS connection (lazy initialized)
_nc: nats.aio.client.Client | None = None
async def _get_nats() -> nats.aio.client.Client:
"""Get or create a NATS connection for SNMP proxy requests."""
global _nc
if _nc is None or _nc.is_closed:
_nc = await nats.connect(settings.NATS_URL)
logger.info("SNMP proxy NATS connection established")
return _nc
async def snmp_discover(
ip_address: str,
snmp_port: int = 161,
snmp_version: str = "v2c",
community: Optional[str] = None,
security_level: Optional[str] = None,
username: Optional[str] = None,
auth_protocol: Optional[str] = None,
auth_passphrase: Optional[str] = None,
priv_protocol: Optional[str] = None,
priv_passphrase: Optional[str] = None,
) -> dict[str, Any]:
"""Send an SNMP discovery probe to a device via the Go poller.
Builds a DiscoveryRequest payload matching the Go DiscoveryRequest struct
and sends it to the poller's DiscoveryResponder via NATS request-reply.
Args:
ip_address: Target device IP address.
snmp_port: SNMP port (default 161).
snmp_version: "v1", "v2c", or "v3".
community: Community string for v1/v2c.
security_level: SNMPv3 security level.
username: SNMPv3 username.
auth_protocol: SNMPv3 auth protocol (e.g. "SHA").
auth_passphrase: SNMPv3 auth passphrase.
priv_protocol: SNMPv3 privacy protocol (e.g. "AES").
priv_passphrase: SNMPv3 privacy passphrase.
Returns:
{"sys_object_id": str, "sys_descr": str, "sys_name": str, "error": str|None}
"""
nc = await _get_nats()
payload: dict[str, Any] = {
"ip_address": ip_address,
"snmp_port": snmp_port,
"snmp_version": snmp_version,
}
# v1/v2c credentials
if community is not None:
payload["community"] = community
# v3 credentials
if security_level is not None:
payload["security_level"] = security_level
if username is not None:
payload["username"] = username
if auth_protocol is not None:
payload["auth_protocol"] = auth_protocol
if auth_passphrase is not None:
payload["auth_passphrase"] = auth_passphrase
if priv_protocol is not None:
payload["priv_protocol"] = priv_protocol
if priv_passphrase is not None:
payload["priv_passphrase"] = priv_passphrase
try:
reply = await nc.request(
"device.discover.snmp",
json.dumps(payload).encode(),
timeout=10.0,
)
return json.loads(reply.data)
except nats.errors.TimeoutError:
return {"error": "Device unreachable or SNMP timeout"}
except Exception as exc:
logger.error("SNMP discovery NATS request failed for %s: %s", ip_address, exc)
return {"error": str(exc)}
async def close() -> None:
"""Close the NATS connection. Called on application shutdown."""
global _nc
if _nc and not _nc.is_closed:
await _nc.drain()
_nc = None
logger.info("SNMP proxy NATS connection closed")