diff --git a/backend/app/services/sse_manager.py b/backend/app/services/sse_manager.py index ab15064..9007b1b 100644 --- a/backend/app/services/sse_manager.py +++ b/backend/app/services/sse_manager.py @@ -11,7 +11,7 @@ from typing import Optional import nats import structlog -from nats.js.api import ConsumerConfig, DeliverPolicy, StreamConfig +from nats.js.api import StreamConfig from app.config import settings @@ -133,17 +133,12 @@ class SSEConnectionManager: last_event_id=last_event_id, ) - # Build consumer config for replay support - if last_event_id is not None: - try: - start_seq = int(last_event_id) + 1 - consumer_cfg = ConsumerConfig( - deliver_policy=DeliverPolicy.BY_START_SEQUENCE, opt_start_seq=start_seq - ) - except (ValueError, TypeError): - consumer_cfg = ConsumerConfig(deliver_policy=DeliverPolicy.NEW) - else: - consumer_cfg = ConsumerConfig(deliver_policy=DeliverPolicy.NEW) + # Use ordered consumers for SSE — ephemeral, no server-side state, + # no ack tracking. They auto-clean on disconnect so stale consumers + # can't accumulate across API restarts or dropped browser connections. + # + # ordered_consumer=True implies DeliverPolicy.NEW unless last_event_id + # was provided (replay from sequence). # Subscribe to device events (DEVICE_EVENTS stream -- created by Go poller) for subject in _DEVICE_EVENT_SUBJECTS: @@ -151,7 +146,7 @@ class SSEConnectionManager: sub = await js.subscribe( subject, stream="DEVICE_EVENTS", - config=consumer_cfg, + ordered_consumer=True, ) self._subscriptions.append(sub) except Exception as exc: @@ -169,7 +164,7 @@ class SSEConnectionManager: sub = await js.subscribe( subject, stream="ALERT_EVENTS", - config=consumer_cfg, + ordered_consumer=True, ) self._subscriptions.append(sub) except Exception as exc: @@ -183,7 +178,7 @@ class SSEConnectionManager: ) ) sub = await js.subscribe( - subject, stream="ALERT_EVENTS", config=consumer_cfg + subject, stream="ALERT_EVENTS", ordered_consumer=True ) self._subscriptions.append(sub) logger.info("sse.stream_created_lazily", stream="ALERT_EVENTS") @@ -208,7 +203,7 @@ class SSEConnectionManager: sub = await js.subscribe( subject, stream="OPERATION_EVENTS", - config=consumer_cfg, + ordered_consumer=True, ) self._subscriptions.append(sub) except Exception as exc: @@ -222,7 +217,7 @@ class SSEConnectionManager: ) ) sub = await js.subscribe( - subject, stream="OPERATION_EVENTS", config=consumer_cfg + subject, stream="OPERATION_EVENTS", ordered_consumer=True ) self._subscriptions.append(sub) logger.info("sse.stream_created_lazily", stream="OPERATION_EVENTS")