feat(13-03): add link service, schemas, router, and wire subscribers into lifespan

- LinkResponse/UnknownClientResponse Pydantic schemas with from_attributes
- Link service with get_links, get_device_links, get_site_links, get_unknown_clients
- Unknown clients query uses DISTINCT ON for latest registration per MAC
- 4 REST endpoints: tenant links, device links, site links, unknown clients
- Interface and link discovery subscribers wired into FastAPI lifespan start/stop
- Links router registered at /api prefix

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jason Staack
2026-03-19 06:12:06 -05:00
parent 3209a7d9be
commit 0434d31030
4 changed files with 357 additions and 0 deletions

View File

@@ -245,6 +245,30 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.error("Wireless registration subscriber failed to start (non-fatal): %s", e)
# Start NATS subscriber for device interface data (MAC resolution for link discovery).
interface_nc = None
try:
from app.services.interface_subscriber import (
start_interface_subscriber,
stop_interface_subscriber,
)
interface_nc = await start_interface_subscriber()
except Exception as e:
logger.error("Interface subscriber failed to start (non-fatal): %s", e)
# Start NATS subscriber for wireless link discovery (MAC resolution + state machine).
link_discovery_nc = None
try:
from app.services.link_discovery_subscriber import (
start_link_discovery_subscriber,
stop_link_discovery_subscriber,
)
link_discovery_nc = await start_link_discovery_subscriber()
except Exception as e:
logger.error("Link discovery subscriber failed to start (non-fatal): %s", e)
# Start retention cleanup scheduler (daily purge of expired config snapshots)
try:
await start_retention_scheduler()
@@ -340,6 +364,10 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
await stop_config_snapshot_subscriber()
if wireless_reg_nc:
await stop_wireless_registration_subscriber(wireless_reg_nc)
if interface_nc:
await stop_interface_subscriber(interface_nc)
if link_discovery_nc:
await stop_link_discovery_subscriber(link_discovery_nc)
await stop_retention_scheduler()
# Dispose database engine connections to release all pooled connections cleanly.
@@ -405,6 +433,7 @@ def create_app() -> FastAPI:
from app.routers.remote_access import router as remote_access_router
from app.routers.winbox_remote import router as winbox_remote_router
from app.routers.sites import router as sites_router
from app.routers.links import router as links_router
app.include_router(auth_router, prefix="/api")
app.include_router(tenants_router, prefix="/api")
@@ -435,6 +464,7 @@ def create_app() -> FastAPI:
app.include_router(remote_access_router, prefix="/api")
app.include_router(winbox_remote_router, prefix="/api")
app.include_router(sites_router, prefix="/api")
app.include_router(links_router, prefix="/api")
# Health check endpoints
@app.get("/health", tags=["health"])

View File

@@ -0,0 +1,89 @@
"""
Wireless link API endpoints.
Routes: /api/tenants/{tenant_id}/links, /api/tenants/{tenant_id}/devices/{device_id}/links,
/api/tenants/{tenant_id}/sites/{site_id}/links,
/api/tenants/{tenant_id}/devices/{device_id}/unknown-clients
RBAC:
- viewer: GET (read-only) -- all endpoints are GET-only
"""
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.middleware.tenant_context import CurrentUser, get_current_user
from app.routers.devices import _check_tenant_access
from app.schemas.link import LinkListResponse, UnknownClientListResponse
from app.services import link_service
router = APIRouter(tags=["links"])
@router.get(
"/tenants/{tenant_id}/links",
response_model=LinkListResponse,
summary="List wireless links",
)
async def list_links(
tenant_id: uuid.UUID,
state: Optional[str] = Query(None, description="Filter by link state (active, degraded, down, stale)"),
device_id: Optional[uuid.UUID] = Query(None, description="Filter by device (AP or CPE side)"),
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> LinkListResponse:
"""List all wireless links for a tenant with optional state and device filters."""
await _check_tenant_access(current_user, tenant_id, db)
return await link_service.get_links(db=db, tenant_id=tenant_id, state=state, device_id=device_id)
@router.get(
"/tenants/{tenant_id}/devices/{device_id}/links",
response_model=LinkListResponse,
summary="List device links",
)
async def list_device_links(
tenant_id: uuid.UUID,
device_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> LinkListResponse:
"""List wireless links where the given device is either the AP or CPE."""
await _check_tenant_access(current_user, tenant_id, db)
return await link_service.get_device_links(db=db, tenant_id=tenant_id, device_id=device_id)
@router.get(
"/tenants/{tenant_id}/sites/{site_id}/links",
response_model=LinkListResponse,
summary="List site links",
)
async def list_site_links(
tenant_id: uuid.UUID,
site_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> LinkListResponse:
"""List wireless links where either the AP or CPE device belongs to the given site."""
await _check_tenant_access(current_user, tenant_id, db)
return await link_service.get_site_links(db=db, tenant_id=tenant_id, site_id=site_id)
@router.get(
"/tenants/{tenant_id}/devices/{device_id}/unknown-clients",
response_model=UnknownClientListResponse,
summary="List unknown wireless clients",
)
async def list_unknown_clients(
tenant_id: uuid.UUID,
device_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> UnknownClientListResponse:
"""List wireless clients whose MAC does not resolve to any known device interface."""
await _check_tenant_access(current_user, tenant_id, db)
return await link_service.get_unknown_clients(db=db, tenant_id=tenant_id, device_id=device_id)

View File

@@ -0,0 +1,56 @@
"""Pydantic schemas for Link and Unknown Client endpoints."""
import uuid
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, ConfigDict
class LinkResponse(BaseModel):
"""Single wireless link between an AP and CPE device."""
id: uuid.UUID
ap_device_id: uuid.UUID
cpe_device_id: uuid.UUID
ap_hostname: str | None = None
cpe_hostname: str | None = None
interface: str | None = None
client_mac: str
signal_strength: int | None = None
tx_ccq: int | None = None
tx_rate: str | None = None
rx_rate: str | None = None
state: str
missed_polls: int
discovered_at: datetime
last_seen: datetime
model_config = ConfigDict(from_attributes=True)
class LinkListResponse(BaseModel):
"""List of wireless links with total count."""
items: list[LinkResponse]
total: int
class UnknownClientResponse(BaseModel):
"""A wireless client whose MAC does not resolve to any known device interface."""
mac_address: str
interface: str | None = None
signal_strength: int | None = None
tx_rate: str | None = None
rx_rate: str | None = None
last_seen: datetime
model_config = ConfigDict(from_attributes=True)
class UnknownClientListResponse(BaseModel):
"""List of unknown clients with total count."""
items: list[UnknownClientResponse]
total: int

View File

@@ -0,0 +1,182 @@
"""Link service -- query layer for wireless links and unknown clients.
All functions use raw SQL via the app_user engine (RLS enforced).
Tenant isolation is handled automatically by PostgreSQL RLS policies
once the tenant context is set by the middleware.
"""
import uuid
from typing import Optional
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.link import (
LinkListResponse,
LinkResponse,
UnknownClientListResponse,
UnknownClientResponse,
)
async def get_links(
db: AsyncSession,
tenant_id: uuid.UUID,
state: Optional[str] = None,
device_id: Optional[uuid.UUID] = None,
) -> LinkListResponse:
"""List wireless links for a tenant with optional state and device filters.
The device_id filter matches links where the device is either the AP or CPE side.
"""
conditions = ["wl.tenant_id = :tenant_id"]
params: dict = {"tenant_id": str(tenant_id)}
if state:
conditions.append("wl.state = :state")
params["state"] = state
if device_id:
conditions.append("(wl.ap_device_id = :device_id OR wl.cpe_device_id = :device_id)")
params["device_id"] = str(device_id)
where_clause = " AND ".join(conditions)
result = await db.execute(
text(f"""
SELECT wl.id, wl.ap_device_id, wl.cpe_device_id,
ap.hostname AS ap_hostname, cpe.hostname AS cpe_hostname,
wl.interface, wl.client_mac, wl.signal_strength,
wl.tx_ccq, wl.tx_rate, wl.rx_rate, wl.state,
wl.missed_polls, wl.discovered_at, wl.last_seen
FROM wireless_links wl
LEFT JOIN devices ap ON ap.id = wl.ap_device_id
LEFT JOIN devices cpe ON cpe.id = wl.cpe_device_id
WHERE {where_clause}
ORDER BY wl.last_seen DESC
"""),
params,
)
rows = result.fetchall()
items = [
LinkResponse(
id=row.id,
ap_device_id=row.ap_device_id,
cpe_device_id=row.cpe_device_id,
ap_hostname=row.ap_hostname,
cpe_hostname=row.cpe_hostname,
interface=row.interface,
client_mac=row.client_mac,
signal_strength=row.signal_strength,
tx_ccq=row.tx_ccq,
tx_rate=row.tx_rate,
rx_rate=row.rx_rate,
state=row.state,
missed_polls=row.missed_polls,
discovered_at=row.discovered_at,
last_seen=row.last_seen,
)
for row in rows
]
return LinkListResponse(items=items, total=len(items))
async def get_device_links(
db: AsyncSession,
tenant_id: uuid.UUID,
device_id: uuid.UUID,
) -> LinkListResponse:
"""List wireless links where the given device is either the AP or CPE."""
return await get_links(db=db, tenant_id=tenant_id, device_id=device_id)
async def get_site_links(
db: AsyncSession,
tenant_id: uuid.UUID,
site_id: uuid.UUID,
) -> LinkListResponse:
"""List wireless links where either the AP or CPE device belongs to the given site."""
result = await db.execute(
text("""
SELECT wl.id, wl.ap_device_id, wl.cpe_device_id,
ap.hostname AS ap_hostname, cpe.hostname AS cpe_hostname,
wl.interface, wl.client_mac, wl.signal_strength,
wl.tx_ccq, wl.tx_rate, wl.rx_rate, wl.state,
wl.missed_polls, wl.discovered_at, wl.last_seen
FROM wireless_links wl
LEFT JOIN devices ap ON ap.id = wl.ap_device_id
LEFT JOIN devices cpe ON cpe.id = wl.cpe_device_id
WHERE wl.tenant_id = :tenant_id
AND (ap.site_id = :site_id OR cpe.site_id = :site_id)
ORDER BY wl.last_seen DESC
"""),
{"tenant_id": str(tenant_id), "site_id": str(site_id)},
)
rows = result.fetchall()
items = [
LinkResponse(
id=row.id,
ap_device_id=row.ap_device_id,
cpe_device_id=row.cpe_device_id,
ap_hostname=row.ap_hostname,
cpe_hostname=row.cpe_hostname,
interface=row.interface,
client_mac=row.client_mac,
signal_strength=row.signal_strength,
tx_ccq=row.tx_ccq,
tx_rate=row.tx_rate,
rx_rate=row.rx_rate,
state=row.state,
missed_polls=row.missed_polls,
discovered_at=row.discovered_at,
last_seen=row.last_seen,
)
for row in rows
]
return LinkListResponse(items=items, total=len(items))
async def get_unknown_clients(
db: AsyncSession,
tenant_id: uuid.UUID,
device_id: uuid.UUID,
) -> UnknownClientListResponse:
"""List wireless clients connected to a device whose MAC does not match any known device interface.
Uses DISTINCT ON to return the most recent registration per unique MAC address.
"""
result = await db.execute(
text("""
SELECT DISTINCT ON (wr.mac_address)
wr.mac_address, wr.interface, wr.signal_strength,
wr.tx_rate, wr.rx_rate, wr.time AS last_seen
FROM wireless_registrations wr
WHERE wr.device_id = :device_id
AND wr.tenant_id = :tenant_id
AND wr.mac_address NOT IN (
SELECT di.mac_address FROM device_interfaces di
WHERE di.tenant_id = :tenant_id
)
ORDER BY wr.mac_address, wr.time DESC
"""),
{"device_id": str(device_id), "tenant_id": str(tenant_id)},
)
rows = result.fetchall()
items = [
UnknownClientResponse(
mac_address=row.mac_address,
interface=row.interface,
signal_strength=row.signal_strength,
tx_rate=row.tx_rate,
rx_rate=row.rx_rate,
last_seen=row.last_seen,
)
for row in rows
]
return UnknownClientListResponse(items=items, total=len(items))