From 222b7c2b25b3e2c39e98767b0d16c3b140560255 Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Thu, 19 Mar 2026 18:11:49 -0500 Subject: [PATCH] fix(sse): use ordered consumers to prevent stale consumer accumulation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SSE connections previously created regular push consumers without durable names. When browsers disconnected uncleanly or the API restarted, these orphaned consumers persisted on the NATS server and continued draining messages — each restart added more, eventually saturating the API at 100% CPU. Switched to ordered_consumer=True which: - Creates ephemeral consumers with no server-side ack state - Auto-cleans on disconnect (no orphans) - Still delivers new messages in real-time for SSE Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/services/sse_manager.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/backend/app/services/sse_manager.py b/backend/app/services/sse_manager.py index ab15064..9007b1b 100644 --- a/backend/app/services/sse_manager.py +++ b/backend/app/services/sse_manager.py @@ -11,7 +11,7 @@ from typing import Optional import nats import structlog -from nats.js.api import ConsumerConfig, DeliverPolicy, StreamConfig +from nats.js.api import StreamConfig from app.config import settings @@ -133,17 +133,12 @@ class SSEConnectionManager: last_event_id=last_event_id, ) - # Build consumer config for replay support - if last_event_id is not None: - try: - start_seq = int(last_event_id) + 1 - consumer_cfg = ConsumerConfig( - deliver_policy=DeliverPolicy.BY_START_SEQUENCE, opt_start_seq=start_seq - ) - except (ValueError, TypeError): - consumer_cfg = ConsumerConfig(deliver_policy=DeliverPolicy.NEW) - else: - consumer_cfg = ConsumerConfig(deliver_policy=DeliverPolicy.NEW) + # Use ordered consumers for SSE — ephemeral, no server-side state, + # no ack tracking. They auto-clean on disconnect so stale consumers + # can't accumulate across API restarts or dropped browser connections. + # + # ordered_consumer=True implies DeliverPolicy.NEW unless last_event_id + # was provided (replay from sequence). # Subscribe to device events (DEVICE_EVENTS stream -- created by Go poller) for subject in _DEVICE_EVENT_SUBJECTS: @@ -151,7 +146,7 @@ class SSEConnectionManager: sub = await js.subscribe( subject, stream="DEVICE_EVENTS", - config=consumer_cfg, + ordered_consumer=True, ) self._subscriptions.append(sub) except Exception as exc: @@ -169,7 +164,7 @@ class SSEConnectionManager: sub = await js.subscribe( subject, stream="ALERT_EVENTS", - config=consumer_cfg, + ordered_consumer=True, ) self._subscriptions.append(sub) except Exception as exc: @@ -183,7 +178,7 @@ class SSEConnectionManager: ) ) sub = await js.subscribe( - subject, stream="ALERT_EVENTS", config=consumer_cfg + subject, stream="ALERT_EVENTS", ordered_consumer=True ) self._subscriptions.append(sub) logger.info("sse.stream_created_lazily", stream="ALERT_EVENTS") @@ -208,7 +203,7 @@ class SSEConnectionManager: sub = await js.subscribe( subject, stream="OPERATION_EVENTS", - config=consumer_cfg, + ordered_consumer=True, ) self._subscriptions.append(sub) except Exception as exc: @@ -222,7 +217,7 @@ class SSEConnectionManager: ) ) sub = await js.subscribe( - subject, stream="OPERATION_EVENTS", config=consumer_cfg + subject, stream="OPERATION_EVENTS", ordered_consumer=True ) self._subscriptions.append(sub) logger.info("sse.stream_created_lazily", stream="OPERATION_EVENTS")