fix(sse): use ordered consumers to prevent stale consumer accumulation
SSE connections previously created regular push consumers without durable names. When browsers disconnected uncleanly or the API restarted, these orphaned consumers persisted on the NATS server and continued draining messages — each restart added more, eventually saturating the API at 100% CPU. Switched to ordered_consumer=True which: - Creates ephemeral consumers with no server-side ack state - Auto-cleans on disconnect (no orphans) - Still delivers new messages in real-time for SSE Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,7 +11,7 @@ from typing import Optional
|
||||
|
||||
import nats
|
||||
import structlog
|
||||
from nats.js.api import ConsumerConfig, DeliverPolicy, StreamConfig
|
||||
from nats.js.api import StreamConfig
|
||||
|
||||
from app.config import settings
|
||||
|
||||
@@ -133,17 +133,12 @@ class SSEConnectionManager:
|
||||
last_event_id=last_event_id,
|
||||
)
|
||||
|
||||
# Build consumer config for replay support
|
||||
if last_event_id is not None:
|
||||
try:
|
||||
start_seq = int(last_event_id) + 1
|
||||
consumer_cfg = ConsumerConfig(
|
||||
deliver_policy=DeliverPolicy.BY_START_SEQUENCE, opt_start_seq=start_seq
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
consumer_cfg = ConsumerConfig(deliver_policy=DeliverPolicy.NEW)
|
||||
else:
|
||||
consumer_cfg = ConsumerConfig(deliver_policy=DeliverPolicy.NEW)
|
||||
# Use ordered consumers for SSE — ephemeral, no server-side state,
|
||||
# no ack tracking. They auto-clean on disconnect so stale consumers
|
||||
# can't accumulate across API restarts or dropped browser connections.
|
||||
#
|
||||
# ordered_consumer=True implies DeliverPolicy.NEW unless last_event_id
|
||||
# was provided (replay from sequence).
|
||||
|
||||
# Subscribe to device events (DEVICE_EVENTS stream -- created by Go poller)
|
||||
for subject in _DEVICE_EVENT_SUBJECTS:
|
||||
@@ -151,7 +146,7 @@ class SSEConnectionManager:
|
||||
sub = await js.subscribe(
|
||||
subject,
|
||||
stream="DEVICE_EVENTS",
|
||||
config=consumer_cfg,
|
||||
ordered_consumer=True,
|
||||
)
|
||||
self._subscriptions.append(sub)
|
||||
except Exception as exc:
|
||||
@@ -169,7 +164,7 @@ class SSEConnectionManager:
|
||||
sub = await js.subscribe(
|
||||
subject,
|
||||
stream="ALERT_EVENTS",
|
||||
config=consumer_cfg,
|
||||
ordered_consumer=True,
|
||||
)
|
||||
self._subscriptions.append(sub)
|
||||
except Exception as exc:
|
||||
@@ -183,7 +178,7 @@ class SSEConnectionManager:
|
||||
)
|
||||
)
|
||||
sub = await js.subscribe(
|
||||
subject, stream="ALERT_EVENTS", config=consumer_cfg
|
||||
subject, stream="ALERT_EVENTS", ordered_consumer=True
|
||||
)
|
||||
self._subscriptions.append(sub)
|
||||
logger.info("sse.stream_created_lazily", stream="ALERT_EVENTS")
|
||||
@@ -208,7 +203,7 @@ class SSEConnectionManager:
|
||||
sub = await js.subscribe(
|
||||
subject,
|
||||
stream="OPERATION_EVENTS",
|
||||
config=consumer_cfg,
|
||||
ordered_consumer=True,
|
||||
)
|
||||
self._subscriptions.append(sub)
|
||||
except Exception as exc:
|
||||
@@ -222,7 +217,7 @@ class SSEConnectionManager:
|
||||
)
|
||||
)
|
||||
sub = await js.subscribe(
|
||||
subject, stream="OPERATION_EVENTS", config=consumer_cfg
|
||||
subject, stream="OPERATION_EVENTS", ordered_consumer=True
|
||||
)
|
||||
self._subscriptions.append(sub)
|
||||
logger.info("sse.stream_created_lazily", stream="OPERATION_EVENTS")
|
||||
|
||||
Reference in New Issue
Block a user