Files
Jason Staack 06a41ca9bf fix(lint): resolve all ruff lint errors
Add ruff config to exclude alembic E402, SQLAlchemy F821, and pre-existing
E501 line-length issues. Auto-fix 69 unused imports and 2 f-strings without
placeholders. Manually fix 8 unused variables. Apply ruff format to 127 files.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 22:17:50 -05:00

1115 lines
35 KiB
Python

"""Alert management API endpoints.
Tenant-scoped routes under /api/tenants/{tenant_id}/ for:
- Alert rules CRUD (list, create, update, delete, toggle)
- Notification channels CRUD (list, create, update, delete, test)
- Alert events listing with pagination and filtering
- Active alert count for nav badge
- Acknowledge and silence actions
- Device-scoped alert listing
RLS enforced via get_db() (app_user engine with tenant context).
RBAC: viewer = read-only (GET); operator and above = write (POST/PUT/PATCH/DELETE).
"""
import logging
import uuid
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
import re
from pydantic import BaseModel, ConfigDict, model_validator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db, set_tenant_context
from app.middleware.rate_limit import limiter
from app.middleware.rbac import require_scope
from app.middleware.tenant_context import CurrentUser, get_current_user
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
router = APIRouter(tags=["alerts"])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _check_tenant_access(
current_user: CurrentUser, tenant_id: uuid.UUID, db: AsyncSession
) -> None:
"""Verify the current user is allowed to access the given tenant."""
if current_user.is_super_admin:
await set_tenant_context(db, str(tenant_id))
elif current_user.tenant_id != tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this tenant",
)
def _require_write(current_user: CurrentUser) -> None:
"""Raise 403 if user is a viewer (read-only)."""
if current_user.role == "viewer":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Viewers have read-only access.",
)
EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
ALLOWED_METRICS = {
"cpu_load",
"memory_used_pct",
"disk_used_pct",
"temperature",
"signal_strength",
"ccq",
"client_count",
}
ALLOWED_OPERATORS = {"gt", "lt", "gte", "lte"}
ALLOWED_SEVERITIES = {"critical", "warning", "info"}
# ---------------------------------------------------------------------------
# Request/response models
# ---------------------------------------------------------------------------
class AlertRuleCreate(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
metric: str
operator: str
threshold: float
duration_polls: int = 1
severity: str = "warning"
device_id: Optional[str] = None
group_id: Optional[str] = None
channel_ids: list[str] = []
enabled: bool = True
class AlertRuleUpdate(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
metric: str
operator: str
threshold: float
duration_polls: int = 1
severity: str = "warning"
device_id: Optional[str] = None
group_id: Optional[str] = None
channel_ids: list[str] = []
enabled: bool = True
class ChannelCreate(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
channel_type: str # "email", "webhook", or "slack"
smtp_host: Optional[str] = None
smtp_port: Optional[int] = None
smtp_user: Optional[str] = None
smtp_password: Optional[str] = None # plaintext — will be encrypted before storage
smtp_use_tls: bool = False
from_address: Optional[str] = None
to_address: Optional[str] = None
webhook_url: Optional[str] = None
slack_webhook_url: Optional[str] = None
@model_validator(mode="after")
def validate_email_fields(self):
if self.channel_type == "email":
missing = []
if not self.smtp_host:
missing.append("smtp_host")
if not self.smtp_port:
missing.append("smtp_port")
if not self.to_address:
missing.append("to_address")
if missing:
raise ValueError(f"Email channels require: {', '.join(missing)}")
if self.to_address and not EMAIL_REGEX.match(self.to_address):
raise ValueError(f"Invalid email address: {self.to_address}")
if self.from_address and not EMAIL_REGEX.match(self.from_address):
raise ValueError(f"Invalid from address: {self.from_address}")
return self
class ChannelUpdate(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
channel_type: str
smtp_host: Optional[str] = None
smtp_port: Optional[int] = None
smtp_user: Optional[str] = None
smtp_password: Optional[str] = None # if None, keep existing
smtp_use_tls: bool = False
from_address: Optional[str] = None
to_address: Optional[str] = None
webhook_url: Optional[str] = None
slack_webhook_url: Optional[str] = None
class SilenceRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
duration_minutes: int
# =========================================================================
# ALERT RULES CRUD
# =========================================================================
@router.get(
"/tenants/{tenant_id}/alert-rules",
summary="List all alert rules for tenant",
dependencies=[require_scope("alerts:read")],
)
async def list_alert_rules(
tenant_id: uuid.UUID,
enabled: Optional[bool] = Query(None),
metric: Optional[str] = Query(None),
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> list[dict[str, Any]]:
await _check_tenant_access(current_user, tenant_id, db)
filters = ["1=1"]
params: dict[str, Any] = {}
if enabled is not None:
filters.append("ar.enabled = :enabled")
params["enabled"] = enabled
if metric:
filters.append("ar.metric = :metric")
params["metric"] = metric
where = " AND ".join(filters)
result = await db.execute(
text(f"""
SELECT ar.id, ar.tenant_id, ar.device_id, ar.group_id,
ar.name, ar.metric, ar.operator, ar.threshold,
ar.duration_polls, ar.severity, ar.enabled, ar.is_default,
ar.created_at,
COALESCE(
(SELECT json_agg(arc.channel_id)
FROM alert_rule_channels arc
WHERE arc.rule_id = ar.id),
'[]'::json
) AS channel_ids
FROM alert_rules ar
WHERE {where}
ORDER BY ar.created_at DESC
"""),
params,
)
rows = result.fetchall()
return [
{
"id": str(row[0]),
"tenant_id": str(row[1]),
"device_id": str(row[2]) if row[2] else None,
"group_id": str(row[3]) if row[3] else None,
"name": row[4],
"metric": row[5],
"operator": row[6],
"threshold": float(row[7]),
"duration_polls": row[8],
"severity": row[9],
"enabled": row[10],
"is_default": row[11],
"created_at": row[12].isoformat() if row[12] else None,
"channel_ids": [str(c) for c in (row[13] if isinstance(row[13], list) else [])],
}
for row in rows
]
@router.post(
"/tenants/{tenant_id}/alert-rules",
summary="Create alert rule",
status_code=status.HTTP_201_CREATED,
)
@limiter.limit("20/minute")
async def create_alert_rule(
request: Request,
tenant_id: uuid.UUID,
body: AlertRuleCreate,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
if body.metric not in ALLOWED_METRICS:
raise HTTPException(422, f"metric must be one of: {', '.join(sorted(ALLOWED_METRICS))}")
if body.operator not in ALLOWED_OPERATORS:
raise HTTPException(422, f"operator must be one of: {', '.join(sorted(ALLOWED_OPERATORS))}")
if body.severity not in ALLOWED_SEVERITIES:
raise HTTPException(
422, f"severity must be one of: {', '.join(sorted(ALLOWED_SEVERITIES))}"
)
rule_id = str(uuid.uuid4())
await db.execute(
text("""
INSERT INTO alert_rules
(id, tenant_id, device_id, group_id, name, metric, operator,
threshold, duration_polls, severity, enabled)
VALUES
(CAST(:id AS uuid), CAST(:tenant_id AS uuid),
CAST(:device_id AS uuid), CAST(:group_id AS uuid),
:name, :metric, :operator, :threshold, :duration_polls,
:severity, :enabled)
"""),
{
"id": rule_id,
"tenant_id": str(tenant_id),
"device_id": body.device_id,
"group_id": body.group_id,
"name": body.name,
"metric": body.metric,
"operator": body.operator,
"threshold": body.threshold,
"duration_polls": body.duration_polls,
"severity": body.severity,
"enabled": body.enabled,
},
)
# Create channel associations
for ch_id in body.channel_ids:
await db.execute(
text("""
INSERT INTO alert_rule_channels (rule_id, channel_id)
VALUES (CAST(:rule_id AS uuid), CAST(:channel_id AS uuid))
"""),
{"rule_id": rule_id, "channel_id": ch_id},
)
await db.commit()
try:
await log_action(
db,
tenant_id,
current_user.user_id,
"alert_rule_create",
resource_type="alert_rule",
resource_id=rule_id,
details={"name": body.name, "metric": body.metric, "severity": body.severity},
)
except Exception:
pass
return {
"id": rule_id,
"tenant_id": str(tenant_id),
"name": body.name,
"metric": body.metric,
"operator": body.operator,
"threshold": body.threshold,
"duration_polls": body.duration_polls,
"severity": body.severity,
"enabled": body.enabled,
"channel_ids": body.channel_ids,
}
@router.put(
"/tenants/{tenant_id}/alert-rules/{rule_id}",
summary="Update alert rule",
)
@limiter.limit("20/minute")
async def update_alert_rule(
request: Request,
tenant_id: uuid.UUID,
rule_id: uuid.UUID,
body: AlertRuleUpdate,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
if body.metric not in ALLOWED_METRICS:
raise HTTPException(422, f"metric must be one of: {', '.join(sorted(ALLOWED_METRICS))}")
if body.operator not in ALLOWED_OPERATORS:
raise HTTPException(422, f"operator must be one of: {', '.join(sorted(ALLOWED_OPERATORS))}")
if body.severity not in ALLOWED_SEVERITIES:
raise HTTPException(
422, f"severity must be one of: {', '.join(sorted(ALLOWED_SEVERITIES))}"
)
result = await db.execute(
text("""
UPDATE alert_rules
SET name = :name, metric = :metric, operator = :operator,
threshold = :threshold, duration_polls = :duration_polls,
severity = :severity, device_id = CAST(:device_id AS uuid),
group_id = CAST(:group_id AS uuid), enabled = :enabled
WHERE id = CAST(:rule_id AS uuid)
RETURNING id
"""),
{
"rule_id": str(rule_id),
"name": body.name,
"metric": body.metric,
"operator": body.operator,
"threshold": body.threshold,
"duration_polls": body.duration_polls,
"severity": body.severity,
"device_id": body.device_id,
"group_id": body.group_id,
"enabled": body.enabled,
},
)
if not result.fetchone():
raise HTTPException(404, "Alert rule not found")
# Replace channel associations
await db.execute(
text("DELETE FROM alert_rule_channels WHERE rule_id = CAST(:rule_id AS uuid)"),
{"rule_id": str(rule_id)},
)
for ch_id in body.channel_ids:
await db.execute(
text("""
INSERT INTO alert_rule_channels (rule_id, channel_id)
VALUES (CAST(:rule_id AS uuid), CAST(:channel_id AS uuid))
"""),
{"rule_id": str(rule_id), "channel_id": ch_id},
)
await db.commit()
try:
await log_action(
db,
tenant_id,
current_user.user_id,
"alert_rule_update",
resource_type="alert_rule",
resource_id=str(rule_id),
details={"name": body.name, "metric": body.metric, "severity": body.severity},
)
except Exception:
pass
return {
"id": str(rule_id),
"name": body.name,
"metric": body.metric,
"operator": body.operator,
"threshold": body.threshold,
"duration_polls": body.duration_polls,
"severity": body.severity,
"enabled": body.enabled,
"channel_ids": body.channel_ids,
}
@router.delete(
"/tenants/{tenant_id}/alert-rules/{rule_id}",
summary="Delete alert rule",
status_code=status.HTTP_204_NO_CONTENT,
)
@limiter.limit("5/minute")
async def delete_alert_rule(
request: Request,
tenant_id: uuid.UUID,
rule_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> None:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
# Prevent deletion of default rules
check = await db.execute(
text("SELECT is_default FROM alert_rules WHERE id = CAST(:id AS uuid)"),
{"id": str(rule_id)},
)
row = check.fetchone()
if not row:
raise HTTPException(404, "Alert rule not found")
if row[0]:
raise HTTPException(422, "Cannot delete default alert rules. Disable them instead.")
await db.execute(
text("DELETE FROM alert_rules WHERE id = CAST(:id AS uuid)"),
{"id": str(rule_id)},
)
await db.commit()
try:
await log_action(
db,
tenant_id,
current_user.user_id,
"alert_rule_delete",
resource_type="alert_rule",
resource_id=str(rule_id),
)
except Exception:
pass
@router.patch(
"/tenants/{tenant_id}/alert-rules/{rule_id}/toggle",
summary="Toggle alert rule enabled/disabled",
)
@limiter.limit("20/minute")
async def toggle_alert_rule(
request: Request,
tenant_id: uuid.UUID,
rule_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
result = await db.execute(
text("""
UPDATE alert_rules SET enabled = NOT enabled
WHERE id = CAST(:id AS uuid)
RETURNING id, enabled
"""),
{"id": str(rule_id)},
)
row = result.fetchone()
if not row:
raise HTTPException(404, "Alert rule not found")
await db.commit()
return {"id": str(row[0]), "enabled": row[1]}
# =========================================================================
# NOTIFICATION CHANNELS CRUD
# =========================================================================
class SMTPTestRequest(BaseModel):
smtp_host: str
smtp_port: int = 587
smtp_user: Optional[str] = None
smtp_password: Optional[str] = None
smtp_use_tls: bool = False
from_address: str = "alerts@example.com"
to_address: str
@router.post(
"/tenants/{tenant_id}/notification-channels/test-smtp",
summary="Test SMTP settings before creating a channel",
)
async def test_channel_smtp(
tenant_id: uuid.UUID,
data: SMTPTestRequest,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
"""Test SMTP settings before creating a channel."""
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
from app.services.email_service import SMTPConfig, send_test_email
config = SMTPConfig(
host=data.smtp_host,
port=data.smtp_port,
user=data.smtp_user,
password=data.smtp_password,
use_tls=data.smtp_use_tls,
from_address=data.from_address,
)
result = await send_test_email(data.to_address, config)
if not result["success"]:
raise HTTPException(status_code=400, detail=result["message"])
return result
@router.get(
"/tenants/{tenant_id}/notification-channels",
summary="List notification channels for tenant",
dependencies=[require_scope("alerts:read")],
)
async def list_notification_channels(
tenant_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> list[dict[str, Any]]:
await _check_tenant_access(current_user, tenant_id, db)
result = await db.execute(
text("""
SELECT id, tenant_id, name, channel_type,
smtp_host, smtp_port, smtp_user, smtp_use_tls,
from_address, to_address, webhook_url,
created_at, slack_webhook_url
FROM notification_channels
ORDER BY created_at DESC
""")
)
return [
{
"id": str(row[0]),
"tenant_id": str(row[1]),
"name": row[2],
"channel_type": row[3],
"smtp_host": row[4],
"smtp_port": row[5],
"smtp_user": row[6],
"smtp_use_tls": row[7],
"from_address": row[8],
"to_address": row[9],
"webhook_url": row[10],
"created_at": row[11].isoformat() if row[11] else None,
"slack_webhook_url": row[12],
}
for row in result.fetchall()
]
@router.post(
"/tenants/{tenant_id}/notification-channels",
summary="Create notification channel",
status_code=status.HTTP_201_CREATED,
)
@limiter.limit("20/minute")
async def create_notification_channel(
request: Request,
tenant_id: uuid.UUID,
body: ChannelCreate,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
if body.channel_type not in ("email", "webhook", "slack"):
raise HTTPException(422, "channel_type must be 'email', 'webhook', or 'slack'")
channel_id = str(uuid.uuid4())
from app.services.crypto import encrypt_credentials_transit
# Encrypt SMTP password via Transit if provided
encrypted_password_transit = None
if body.smtp_password:
encrypted_password_transit = await encrypt_credentials_transit(
body.smtp_password,
str(tenant_id),
)
await db.execute(
text("""
INSERT INTO notification_channels
(id, tenant_id, name, channel_type, smtp_host, smtp_port,
smtp_user, smtp_password_transit, smtp_use_tls, from_address,
to_address, webhook_url, slack_webhook_url)
VALUES
(CAST(:id AS uuid), CAST(:tenant_id AS uuid),
:name, :channel_type, :smtp_host, :smtp_port,
:smtp_user, :smtp_password_transit, :smtp_use_tls,
:from_address, :to_address, :webhook_url,
:slack_webhook_url)
"""),
{
"id": channel_id,
"tenant_id": str(tenant_id),
"name": body.name,
"channel_type": body.channel_type,
"smtp_host": body.smtp_host,
"smtp_port": body.smtp_port,
"smtp_user": body.smtp_user,
"smtp_password_transit": encrypted_password_transit,
"smtp_use_tls": body.smtp_use_tls,
"from_address": body.from_address,
"to_address": body.to_address,
"webhook_url": body.webhook_url,
"slack_webhook_url": body.slack_webhook_url,
},
)
await db.commit()
return {
"id": channel_id,
"tenant_id": str(tenant_id),
"name": body.name,
"channel_type": body.channel_type,
"smtp_host": body.smtp_host,
"smtp_port": body.smtp_port,
"smtp_user": body.smtp_user,
"smtp_use_tls": body.smtp_use_tls,
"from_address": body.from_address,
"to_address": body.to_address,
"webhook_url": body.webhook_url,
"slack_webhook_url": body.slack_webhook_url,
}
@router.put(
"/tenants/{tenant_id}/notification-channels/{channel_id}",
summary="Update notification channel",
)
@limiter.limit("20/minute")
async def update_notification_channel(
request: Request,
tenant_id: uuid.UUID,
channel_id: uuid.UUID,
body: ChannelUpdate,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
if body.channel_type not in ("email", "webhook", "slack"):
raise HTTPException(422, "channel_type must be 'email', 'webhook', or 'slack'")
from app.services.crypto import encrypt_credentials_transit
# Build SET clauses dynamically based on which secrets are provided
set_parts = [
"name = :name",
"channel_type = :channel_type",
"smtp_host = :smtp_host",
"smtp_port = :smtp_port",
"smtp_user = :smtp_user",
"smtp_use_tls = :smtp_use_tls",
"from_address = :from_address",
"to_address = :to_address",
"webhook_url = :webhook_url",
"slack_webhook_url = :slack_webhook_url",
]
params: dict[str, Any] = {
"id": str(channel_id),
"name": body.name,
"channel_type": body.channel_type,
"smtp_host": body.smtp_host,
"smtp_port": body.smtp_port,
"smtp_user": body.smtp_user,
"smtp_use_tls": body.smtp_use_tls,
"from_address": body.from_address,
"to_address": body.to_address,
"webhook_url": body.webhook_url,
"slack_webhook_url": body.slack_webhook_url,
}
if body.smtp_password:
set_parts.append("smtp_password_transit = :smtp_password_transit")
params["smtp_password_transit"] = await encrypt_credentials_transit(
body.smtp_password,
str(tenant_id),
)
# Clear legacy column
set_parts.append("smtp_password = NULL")
set_clause = ", ".join(set_parts)
result = await db.execute(
text(f"""
UPDATE notification_channels
SET {set_clause}
WHERE id = CAST(:id AS uuid)
RETURNING id
"""),
params,
)
if not result.fetchone():
raise HTTPException(404, "Notification channel not found")
await db.commit()
return {
"id": str(channel_id),
"name": body.name,
"channel_type": body.channel_type,
"smtp_host": body.smtp_host,
"smtp_port": body.smtp_port,
"smtp_user": body.smtp_user,
"smtp_use_tls": body.smtp_use_tls,
"from_address": body.from_address,
"to_address": body.to_address,
"webhook_url": body.webhook_url,
"slack_webhook_url": body.slack_webhook_url,
}
@router.delete(
"/tenants/{tenant_id}/notification-channels/{channel_id}",
summary="Delete notification channel",
status_code=status.HTTP_204_NO_CONTENT,
)
@limiter.limit("5/minute")
async def delete_notification_channel(
request: Request,
tenant_id: uuid.UUID,
channel_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> None:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
result = await db.execute(
text("DELETE FROM notification_channels WHERE id = CAST(:id AS uuid) RETURNING id"),
{"id": str(channel_id)},
)
if not result.fetchone():
raise HTTPException(404, "Notification channel not found")
await db.commit()
@router.post(
"/tenants/{tenant_id}/notification-channels/{channel_id}/test",
summary="Send test notification via channel",
)
@limiter.limit("5/minute")
async def test_notification_channel(
request: Request,
tenant_id: uuid.UUID,
channel_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
# Fetch channel as dict for notification_service
result = await db.execute(
text("""
SELECT id, tenant_id, name, channel_type,
smtp_host, smtp_port, smtp_user, smtp_password,
smtp_use_tls, from_address, to_address,
webhook_url, smtp_password_transit,
slack_webhook_url
FROM notification_channels
WHERE id = CAST(:id AS uuid)
"""),
{"id": str(channel_id)},
)
row = result.fetchone()
if not row:
raise HTTPException(404, "Notification channel not found")
channel = {
"id": str(row[0]),
"tenant_id": str(row[1]),
"name": row[2],
"channel_type": row[3],
"smtp_host": row[4],
"smtp_port": row[5],
"smtp_user": row[6],
"smtp_password": row[7],
"smtp_use_tls": row[8],
"from_address": row[9],
"to_address": row[10],
"webhook_url": row[11],
"smtp_password_transit": row[12],
"slack_webhook_url": row[13],
}
from app.services.notification_service import send_test_notification
try:
success = await send_test_notification(channel)
if success:
return {"status": "ok", "message": "Test notification sent successfully"}
else:
raise HTTPException(422, "Test notification delivery failed")
except HTTPException:
raise
except Exception as exc:
raise HTTPException(422, f"Test notification failed: {str(exc)}")
# =========================================================================
# ALERT EVENTS (read + actions)
# =========================================================================
@router.get(
"/tenants/{tenant_id}/alerts",
summary="List alert events with filtering and pagination",
dependencies=[require_scope("alerts:read")],
)
async def list_alerts(
tenant_id: uuid.UUID,
alert_status: Optional[str] = Query(None, alias="status"),
severity: Optional[str] = Query(None),
device_id: Optional[str] = Query(None),
rule_id: Optional[str] = Query(None),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=200),
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
await _check_tenant_access(current_user, tenant_id, db)
filters = ["1=1"]
params: dict[str, Any] = {}
if alert_status:
filters.append("ae.status = :status")
params["status"] = alert_status
if severity:
filters.append("ae.severity = :severity")
params["severity"] = severity
if device_id:
filters.append("ae.device_id = CAST(:device_id AS uuid)")
params["device_id"] = device_id
if rule_id:
filters.append("ae.rule_id = CAST(:rule_id AS uuid)")
params["rule_id"] = rule_id
if start_date:
filters.append("ae.fired_at >= CAST(:start_date AS timestamptz)")
params["start_date"] = start_date
if end_date:
filters.append("ae.fired_at <= CAST(:end_date AS timestamptz)")
params["end_date"] = end_date
where = " AND ".join(filters)
offset = (page - 1) * per_page
# Get total count
count_result = await db.execute(
text(f"SELECT COUNT(*) FROM alert_events ae WHERE {where}"),
params,
)
total = count_result.scalar() or 0
# Get page of results with device hostname and rule name
result = await db.execute(
text(f"""
SELECT ae.id, ae.rule_id, ae.device_id, ae.tenant_id,
ae.status, ae.severity, ae.metric, ae.value,
ae.threshold, ae.message, ae.is_flapping,
ae.acknowledged_at, ae.silenced_until,
ae.fired_at, ae.resolved_at,
d.hostname AS device_hostname,
ar.name AS rule_name
FROM alert_events ae
LEFT JOIN devices d ON d.id = ae.device_id
LEFT JOIN alert_rules ar ON ar.id = ae.rule_id
WHERE {where}
ORDER BY ae.fired_at DESC
LIMIT :limit OFFSET :offset
"""),
{**params, "limit": per_page, "offset": offset},
)
items = [
{
"id": str(row[0]),
"rule_id": str(row[1]) if row[1] else None,
"device_id": str(row[2]),
"tenant_id": str(row[3]),
"status": row[4],
"severity": row[5],
"metric": row[6],
"value": float(row[7]) if row[7] is not None else None,
"threshold": float(row[8]) if row[8] is not None else None,
"message": row[9],
"is_flapping": row[10],
"acknowledged_at": row[11].isoformat() if row[11] else None,
"silenced_until": row[12].isoformat() if row[12] else None,
"fired_at": row[13].isoformat() if row[13] else None,
"resolved_at": row[14].isoformat() if row[14] else None,
"device_hostname": row[15],
"rule_name": row[16],
}
for row in result.fetchall()
]
return {
"items": items,
"total": total,
"page": page,
"per_page": per_page,
}
@router.get(
"/tenants/{tenant_id}/alerts/active-count",
summary="Get count of active (firing) alerts for nav badge",
dependencies=[require_scope("alerts:read")],
)
async def get_active_alert_count(
tenant_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, int]:
await _check_tenant_access(current_user, tenant_id, db)
result = await db.execute(
text("""
SELECT COUNT(*) FROM alert_events
WHERE status = 'firing'
AND resolved_at IS NULL
AND (silenced_until IS NULL OR silenced_until < NOW())
""")
)
count = result.scalar() or 0
return {"count": count}
@router.post(
"/tenants/{tenant_id}/alerts/{alert_id}/acknowledge",
summary="Acknowledge an active alert",
)
@limiter.limit("20/minute")
async def acknowledge_alert(
request: Request,
tenant_id: uuid.UUID,
alert_id: uuid.UUID,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, str]:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
result = await db.execute(
text("""
UPDATE alert_events
SET acknowledged_at = NOW(), acknowledged_by = CAST(:user_id AS uuid)
WHERE id = CAST(:id AS uuid)
RETURNING id
"""),
{"id": str(alert_id), "user_id": str(current_user.user_id)},
)
if not result.fetchone():
raise HTTPException(404, "Alert not found")
await db.commit()
return {"status": "ok", "message": "Alert acknowledged"}
@router.post(
"/tenants/{tenant_id}/alerts/{alert_id}/silence",
summary="Silence an alert for a specified duration",
)
@limiter.limit("20/minute")
async def silence_alert(
request: Request,
tenant_id: uuid.UUID,
alert_id: uuid.UUID,
body: SilenceRequest,
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, str]:
await _check_tenant_access(current_user, tenant_id, db)
_require_write(current_user)
if body.duration_minutes < 1:
raise HTTPException(422, "duration_minutes must be at least 1")
result = await db.execute(
text("""
UPDATE alert_events
SET silenced_until = NOW() + (:minutes || ' minutes')::interval
WHERE id = CAST(:id AS uuid)
RETURNING id
"""),
{"id": str(alert_id), "minutes": str(body.duration_minutes)},
)
if not result.fetchone():
raise HTTPException(404, "Alert not found")
await db.commit()
return {"status": "ok", "message": f"Alert silenced for {body.duration_minutes} minutes"}
@router.get(
"/tenants/{tenant_id}/devices/{device_id}/alerts",
summary="List alerts for a specific device",
dependencies=[require_scope("alerts:read")],
)
async def list_device_alerts(
tenant_id: uuid.UUID,
device_id: uuid.UUID,
alert_status: Optional[str] = Query(None, alias="status"),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
current_user: CurrentUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> dict[str, Any]:
await _check_tenant_access(current_user, tenant_id, db)
filters = ["ae.device_id = CAST(:device_id AS uuid)"]
params: dict[str, Any] = {"device_id": str(device_id)}
if alert_status:
filters.append("ae.status = :status")
params["status"] = alert_status
where = " AND ".join(filters)
offset = (page - 1) * per_page
count_result = await db.execute(
text(f"SELECT COUNT(*) FROM alert_events ae WHERE {where}"),
params,
)
total = count_result.scalar() or 0
result = await db.execute(
text(f"""
SELECT ae.id, ae.rule_id, ae.device_id, ae.tenant_id,
ae.status, ae.severity, ae.metric, ae.value,
ae.threshold, ae.message, ae.is_flapping,
ae.acknowledged_at, ae.silenced_until,
ae.fired_at, ae.resolved_at,
ar.name AS rule_name
FROM alert_events ae
LEFT JOIN alert_rules ar ON ar.id = ae.rule_id
WHERE {where}
ORDER BY ae.fired_at DESC
LIMIT :limit OFFSET :offset
"""),
{**params, "limit": per_page, "offset": offset},
)
items = [
{
"id": str(row[0]),
"rule_id": str(row[1]) if row[1] else None,
"device_id": str(row[2]),
"tenant_id": str(row[3]),
"status": row[4],
"severity": row[5],
"metric": row[6],
"value": float(row[7]) if row[7] is not None else None,
"threshold": float(row[8]) if row[8] is not None else None,
"message": row[9],
"is_flapping": row[10],
"acknowledged_at": row[11].isoformat() if row[11] else None,
"silenced_until": row[12].isoformat() if row[12] else None,
"fired_at": row[13].isoformat() if row[13] else None,
"resolved_at": row[14].isoformat() if row[14] else None,
"rule_name": row[15],
}
for row in result.fetchall()
]
return {
"items": items,
"total": total,
"page": page,
"per_page": per_page,
}