diff --git a/backend/app/main.py b/backend/app/main.py index 3cf7e01..918386a 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -245,6 +245,30 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.error("Wireless registration subscriber failed to start (non-fatal): %s", e) + # Start NATS subscriber for device interface data (MAC resolution for link discovery). + interface_nc = None + try: + from app.services.interface_subscriber import ( + start_interface_subscriber, + stop_interface_subscriber, + ) + + interface_nc = await start_interface_subscriber() + except Exception as e: + logger.error("Interface subscriber failed to start (non-fatal): %s", e) + + # Start NATS subscriber for wireless link discovery (MAC resolution + state machine). + link_discovery_nc = None + try: + from app.services.link_discovery_subscriber import ( + start_link_discovery_subscriber, + stop_link_discovery_subscriber, + ) + + link_discovery_nc = await start_link_discovery_subscriber() + except Exception as e: + logger.error("Link discovery subscriber failed to start (non-fatal): %s", e) + # Start retention cleanup scheduler (daily purge of expired config snapshots) try: await start_retention_scheduler() @@ -340,6 +364,10 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: await stop_config_snapshot_subscriber() if wireless_reg_nc: await stop_wireless_registration_subscriber(wireless_reg_nc) + if interface_nc: + await stop_interface_subscriber(interface_nc) + if link_discovery_nc: + await stop_link_discovery_subscriber(link_discovery_nc) await stop_retention_scheduler() # Dispose database engine connections to release all pooled connections cleanly. @@ -405,6 +433,7 @@ def create_app() -> FastAPI: from app.routers.remote_access import router as remote_access_router from app.routers.winbox_remote import router as winbox_remote_router from app.routers.sites import router as sites_router + from app.routers.links import router as links_router app.include_router(auth_router, prefix="/api") app.include_router(tenants_router, prefix="/api") @@ -435,6 +464,7 @@ def create_app() -> FastAPI: app.include_router(remote_access_router, prefix="/api") app.include_router(winbox_remote_router, prefix="/api") app.include_router(sites_router, prefix="/api") + app.include_router(links_router, prefix="/api") # Health check endpoints @app.get("/health", tags=["health"]) diff --git a/backend/app/routers/links.py b/backend/app/routers/links.py new file mode 100644 index 0000000..ff8a9e7 --- /dev/null +++ b/backend/app/routers/links.py @@ -0,0 +1,89 @@ +""" +Wireless link API endpoints. + +Routes: /api/tenants/{tenant_id}/links, /api/tenants/{tenant_id}/devices/{device_id}/links, + /api/tenants/{tenant_id}/sites/{site_id}/links, + /api/tenants/{tenant_id}/devices/{device_id}/unknown-clients + +RBAC: +- viewer: GET (read-only) -- all endpoints are GET-only +""" + +import uuid +from typing import Optional + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db +from app.middleware.tenant_context import CurrentUser, get_current_user +from app.routers.devices import _check_tenant_access +from app.schemas.link import LinkListResponse, UnknownClientListResponse +from app.services import link_service + +router = APIRouter(tags=["links"]) + + +@router.get( + "/tenants/{tenant_id}/links", + response_model=LinkListResponse, + summary="List wireless links", +) +async def list_links( + tenant_id: uuid.UUID, + state: Optional[str] = Query(None, description="Filter by link state (active, degraded, down, stale)"), + device_id: Optional[uuid.UUID] = Query(None, description="Filter by device (AP or CPE side)"), + current_user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> LinkListResponse: + """List all wireless links for a tenant with optional state and device filters.""" + await _check_tenant_access(current_user, tenant_id, db) + return await link_service.get_links(db=db, tenant_id=tenant_id, state=state, device_id=device_id) + + +@router.get( + "/tenants/{tenant_id}/devices/{device_id}/links", + response_model=LinkListResponse, + summary="List device links", +) +async def list_device_links( + tenant_id: uuid.UUID, + device_id: uuid.UUID, + current_user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> LinkListResponse: + """List wireless links where the given device is either the AP or CPE.""" + await _check_tenant_access(current_user, tenant_id, db) + return await link_service.get_device_links(db=db, tenant_id=tenant_id, device_id=device_id) + + +@router.get( + "/tenants/{tenant_id}/sites/{site_id}/links", + response_model=LinkListResponse, + summary="List site links", +) +async def list_site_links( + tenant_id: uuid.UUID, + site_id: uuid.UUID, + current_user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> LinkListResponse: + """List wireless links where either the AP or CPE device belongs to the given site.""" + await _check_tenant_access(current_user, tenant_id, db) + return await link_service.get_site_links(db=db, tenant_id=tenant_id, site_id=site_id) + + +@router.get( + "/tenants/{tenant_id}/devices/{device_id}/unknown-clients", + response_model=UnknownClientListResponse, + summary="List unknown wireless clients", +) +async def list_unknown_clients( + tenant_id: uuid.UUID, + device_id: uuid.UUID, + current_user: CurrentUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> UnknownClientListResponse: + """List wireless clients whose MAC does not resolve to any known device interface.""" + await _check_tenant_access(current_user, tenant_id, db) + return await link_service.get_unknown_clients(db=db, tenant_id=tenant_id, device_id=device_id) diff --git a/backend/app/schemas/link.py b/backend/app/schemas/link.py new file mode 100644 index 0000000..095e00a --- /dev/null +++ b/backend/app/schemas/link.py @@ -0,0 +1,56 @@ +"""Pydantic schemas for Link and Unknown Client endpoints.""" + +import uuid +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, ConfigDict + + +class LinkResponse(BaseModel): + """Single wireless link between an AP and CPE device.""" + + id: uuid.UUID + ap_device_id: uuid.UUID + cpe_device_id: uuid.UUID + ap_hostname: str | None = None + cpe_hostname: str | None = None + interface: str | None = None + client_mac: str + signal_strength: int | None = None + tx_ccq: int | None = None + tx_rate: str | None = None + rx_rate: str | None = None + state: str + missed_polls: int + discovered_at: datetime + last_seen: datetime + + model_config = ConfigDict(from_attributes=True) + + +class LinkListResponse(BaseModel): + """List of wireless links with total count.""" + + items: list[LinkResponse] + total: int + + +class UnknownClientResponse(BaseModel): + """A wireless client whose MAC does not resolve to any known device interface.""" + + mac_address: str + interface: str | None = None + signal_strength: int | None = None + tx_rate: str | None = None + rx_rate: str | None = None + last_seen: datetime + + model_config = ConfigDict(from_attributes=True) + + +class UnknownClientListResponse(BaseModel): + """List of unknown clients with total count.""" + + items: list[UnknownClientResponse] + total: int diff --git a/backend/app/services/link_service.py b/backend/app/services/link_service.py new file mode 100644 index 0000000..d3bf6e3 --- /dev/null +++ b/backend/app/services/link_service.py @@ -0,0 +1,182 @@ +"""Link service -- query layer for wireless links and unknown clients. + +All functions use raw SQL via the app_user engine (RLS enforced). +Tenant isolation is handled automatically by PostgreSQL RLS policies +once the tenant context is set by the middleware. +""" + +import uuid +from typing import Optional + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from app.schemas.link import ( + LinkListResponse, + LinkResponse, + UnknownClientListResponse, + UnknownClientResponse, +) + + +async def get_links( + db: AsyncSession, + tenant_id: uuid.UUID, + state: Optional[str] = None, + device_id: Optional[uuid.UUID] = None, +) -> LinkListResponse: + """List wireless links for a tenant with optional state and device filters. + + The device_id filter matches links where the device is either the AP or CPE side. + """ + conditions = ["wl.tenant_id = :tenant_id"] + params: dict = {"tenant_id": str(tenant_id)} + + if state: + conditions.append("wl.state = :state") + params["state"] = state + + if device_id: + conditions.append("(wl.ap_device_id = :device_id OR wl.cpe_device_id = :device_id)") + params["device_id"] = str(device_id) + + where_clause = " AND ".join(conditions) + + result = await db.execute( + text(f""" + SELECT wl.id, wl.ap_device_id, wl.cpe_device_id, + ap.hostname AS ap_hostname, cpe.hostname AS cpe_hostname, + wl.interface, wl.client_mac, wl.signal_strength, + wl.tx_ccq, wl.tx_rate, wl.rx_rate, wl.state, + wl.missed_polls, wl.discovered_at, wl.last_seen + FROM wireless_links wl + LEFT JOIN devices ap ON ap.id = wl.ap_device_id + LEFT JOIN devices cpe ON cpe.id = wl.cpe_device_id + WHERE {where_clause} + ORDER BY wl.last_seen DESC + """), + params, + ) + rows = result.fetchall() + + items = [ + LinkResponse( + id=row.id, + ap_device_id=row.ap_device_id, + cpe_device_id=row.cpe_device_id, + ap_hostname=row.ap_hostname, + cpe_hostname=row.cpe_hostname, + interface=row.interface, + client_mac=row.client_mac, + signal_strength=row.signal_strength, + tx_ccq=row.tx_ccq, + tx_rate=row.tx_rate, + rx_rate=row.rx_rate, + state=row.state, + missed_polls=row.missed_polls, + discovered_at=row.discovered_at, + last_seen=row.last_seen, + ) + for row in rows + ] + + return LinkListResponse(items=items, total=len(items)) + + +async def get_device_links( + db: AsyncSession, + tenant_id: uuid.UUID, + device_id: uuid.UUID, +) -> LinkListResponse: + """List wireless links where the given device is either the AP or CPE.""" + return await get_links(db=db, tenant_id=tenant_id, device_id=device_id) + + +async def get_site_links( + db: AsyncSession, + tenant_id: uuid.UUID, + site_id: uuid.UUID, +) -> LinkListResponse: + """List wireless links where either the AP or CPE device belongs to the given site.""" + result = await db.execute( + text(""" + SELECT wl.id, wl.ap_device_id, wl.cpe_device_id, + ap.hostname AS ap_hostname, cpe.hostname AS cpe_hostname, + wl.interface, wl.client_mac, wl.signal_strength, + wl.tx_ccq, wl.tx_rate, wl.rx_rate, wl.state, + wl.missed_polls, wl.discovered_at, wl.last_seen + FROM wireless_links wl + LEFT JOIN devices ap ON ap.id = wl.ap_device_id + LEFT JOIN devices cpe ON cpe.id = wl.cpe_device_id + WHERE wl.tenant_id = :tenant_id + AND (ap.site_id = :site_id OR cpe.site_id = :site_id) + ORDER BY wl.last_seen DESC + """), + {"tenant_id": str(tenant_id), "site_id": str(site_id)}, + ) + rows = result.fetchall() + + items = [ + LinkResponse( + id=row.id, + ap_device_id=row.ap_device_id, + cpe_device_id=row.cpe_device_id, + ap_hostname=row.ap_hostname, + cpe_hostname=row.cpe_hostname, + interface=row.interface, + client_mac=row.client_mac, + signal_strength=row.signal_strength, + tx_ccq=row.tx_ccq, + tx_rate=row.tx_rate, + rx_rate=row.rx_rate, + state=row.state, + missed_polls=row.missed_polls, + discovered_at=row.discovered_at, + last_seen=row.last_seen, + ) + for row in rows + ] + + return LinkListResponse(items=items, total=len(items)) + + +async def get_unknown_clients( + db: AsyncSession, + tenant_id: uuid.UUID, + device_id: uuid.UUID, +) -> UnknownClientListResponse: + """List wireless clients connected to a device whose MAC does not match any known device interface. + + Uses DISTINCT ON to return the most recent registration per unique MAC address. + """ + result = await db.execute( + text(""" + SELECT DISTINCT ON (wr.mac_address) + wr.mac_address, wr.interface, wr.signal_strength, + wr.tx_rate, wr.rx_rate, wr.time AS last_seen + FROM wireless_registrations wr + WHERE wr.device_id = :device_id + AND wr.tenant_id = :tenant_id + AND wr.mac_address NOT IN ( + SELECT di.mac_address FROM device_interfaces di + WHERE di.tenant_id = :tenant_id + ) + ORDER BY wr.mac_address, wr.time DESC + """), + {"device_id": str(device_id), "tenant_id": str(tenant_id)}, + ) + rows = result.fetchall() + + items = [ + UnknownClientResponse( + mac_address=row.mac_address, + interface=row.interface, + signal_strength=row.signal_strength, + tx_rate=row.tx_rate, + rx_rate=row.rx_rate, + last_seen=row.last_seen, + ) + for row in rows + ] + + return UnknownClientListResponse(items=items, total=len(items))