feat(05-02): wire change parser into diff service with RETURNING id
- Diff INSERT now uses RETURNING id to capture diff_id - parse_diff_changes called after diff commit, results stored in router_config_changes - Change parser errors are best-effort (logged, never block diff storage) - Added tests for change storage and parser error resilience Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -16,6 +16,7 @@ import time
|
||||
from prometheus_client import Counter, Histogram
|
||||
from sqlalchemy import text
|
||||
|
||||
from app.services.config_change_parser import parse_diff_changes
|
||||
from app.services.openbao_service import OpenBaoTransitService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -125,15 +126,16 @@ async def generate_and_store_diff(
|
||||
if line.startswith("-") and not line.startswith("--")
|
||||
)
|
||||
|
||||
# 9. INSERT into router_config_diffs
|
||||
await session.execute(
|
||||
# 9. INSERT into router_config_diffs (RETURNING id for change parser)
|
||||
diff_result = await session.execute(
|
||||
text(
|
||||
"INSERT INTO router_config_diffs "
|
||||
"(device_id, tenant_id, old_snapshot_id, new_snapshot_id, "
|
||||
"diff_text, lines_added, lines_removed) "
|
||||
"VALUES (CAST(:device_id AS uuid), CAST(:tenant_id AS uuid), "
|
||||
"CAST(:old_snapshot_id AS uuid), CAST(:new_snapshot_id AS uuid), "
|
||||
":diff_text, :lines_added, :lines_removed)"
|
||||
":diff_text, :lines_added, :lines_removed) "
|
||||
"RETURNING id"
|
||||
),
|
||||
{
|
||||
"device_id": device_id,
|
||||
@@ -145,8 +147,9 @@ async def generate_and_store_diff(
|
||||
"lines_removed": lines_removed,
|
||||
},
|
||||
)
|
||||
diff_id = diff_result.scalar_one()
|
||||
|
||||
# 10. Commit
|
||||
# 10. Commit diff
|
||||
await session.commit()
|
||||
|
||||
config_diff_generated_total.inc()
|
||||
@@ -159,6 +162,39 @@ async def generate_and_store_diff(
|
||||
lines_removed,
|
||||
)
|
||||
|
||||
# 11. Parse structured changes (best-effort)
|
||||
try:
|
||||
changes = parse_diff_changes(diff_text)
|
||||
for change in changes:
|
||||
await session.execute(
|
||||
text(
|
||||
"INSERT INTO router_config_changes "
|
||||
"(diff_id, device_id, tenant_id, component, summary, raw_line) "
|
||||
"VALUES (CAST(:diff_id AS uuid), CAST(:device_id AS uuid), "
|
||||
"CAST(:tenant_id AS uuid), :component, :summary, :raw_line)"
|
||||
),
|
||||
{
|
||||
"diff_id": str(diff_id),
|
||||
"device_id": device_id,
|
||||
"tenant_id": tenant_id,
|
||||
"component": change["component"],
|
||||
"summary": change["summary"],
|
||||
"raw_line": change["raw_line"],
|
||||
},
|
||||
)
|
||||
if changes:
|
||||
await session.commit()
|
||||
logger.info(
|
||||
"Stored %d config changes for device %s diff %s",
|
||||
len(changes), device_id, diff_id,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Change parser error for device %s diff %s (non-fatal): %s",
|
||||
device_id, diff_id, exc,
|
||||
)
|
||||
config_diff_errors_total.labels(error_type="change_parser").inc()
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Diff generation error for device %s (non-fatal): %s",
|
||||
|
||||
@@ -42,8 +42,10 @@ async def test_diff_generated_and_stored():
|
||||
new_result = MagicMock()
|
||||
new_result.scalar_one.return_value = "vault:v1:new_encrypted"
|
||||
|
||||
# Query 3: INSERT
|
||||
# Query 3: INSERT RETURNING id
|
||||
insert_result = MagicMock()
|
||||
diff_id = str(uuid4())
|
||||
insert_result.scalar_one.return_value = diff_id
|
||||
|
||||
mock_session.execute = AsyncMock(side_effect=[prev_result, new_result, insert_result])
|
||||
mock_session.commit = AsyncMock()
|
||||
@@ -57,12 +59,15 @@ async def test_diff_generated_and_stored():
|
||||
with patch(
|
||||
"app.services.config_diff_service.OpenBaoTransitService",
|
||||
return_value=mock_openbao,
|
||||
), patch(
|
||||
"app.services.config_diff_service.parse_diff_changes",
|
||||
return_value=[],
|
||||
):
|
||||
await generate_and_store_diff(device_id, tenant_id, new_snapshot_id, mock_session)
|
||||
|
||||
# Should decrypt both configs
|
||||
assert mock_openbao.decrypt.call_count == 2
|
||||
# Should INSERT (3 executes: prev query, new query, INSERT)
|
||||
# Should INSERT (3 executes: prev query, new query, INSERT RETURNING id)
|
||||
assert mock_session.execute.call_count == 3
|
||||
# Should commit
|
||||
mock_session.commit.assert_called_once()
|
||||
@@ -167,6 +172,7 @@ async def test_line_counts_correct():
|
||||
new_result = MagicMock()
|
||||
new_result.scalar_one.return_value = "vault:v1:new"
|
||||
insert_result = MagicMock()
|
||||
insert_result.scalar_one.return_value = str(uuid4())
|
||||
|
||||
mock_session.execute = AsyncMock(side_effect=[prev_result, new_result, insert_result])
|
||||
mock_session.commit = AsyncMock()
|
||||
@@ -180,6 +186,9 @@ async def test_line_counts_correct():
|
||||
with patch(
|
||||
"app.services.config_diff_service.OpenBaoTransitService",
|
||||
return_value=mock_openbao,
|
||||
), patch(
|
||||
"app.services.config_diff_service.parse_diff_changes",
|
||||
return_value=[],
|
||||
):
|
||||
await generate_and_store_diff(device_id, tenant_id, new_snapshot_id, mock_session)
|
||||
|
||||
@@ -227,3 +236,118 @@ async def test_empty_diff_skips_insert():
|
||||
# Only 2 queries (prev + new), no INSERT
|
||||
assert mock_session.execute.call_count == 2
|
||||
mock_session.commit.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_parser_called_and_changes_stored():
|
||||
"""Test 6: After diff INSERT, parse_diff_changes is called and results stored in router_config_changes."""
|
||||
from app.services.config_diff_service import generate_and_store_diff
|
||||
|
||||
device_id = str(uuid4())
|
||||
tenant_id = str(uuid4())
|
||||
new_snapshot_id = str(uuid4())
|
||||
old_snapshot_id = str(uuid4())
|
||||
diff_id = str(uuid4())
|
||||
|
||||
old_config = "/ip firewall filter\nadd chain=input action=accept"
|
||||
new_config = "/ip firewall filter\nadd chain=input action=accept\nadd chain=forward action=drop"
|
||||
|
||||
mock_session = AsyncMock()
|
||||
|
||||
prev_result = MagicMock()
|
||||
prev_result.fetchone.return_value = MagicMock(
|
||||
_mapping={"id": old_snapshot_id, "config_text": "vault:v1:old"}
|
||||
)
|
||||
new_result = MagicMock()
|
||||
new_result.scalar_one.return_value = "vault:v1:new"
|
||||
insert_result = MagicMock()
|
||||
insert_result.scalar_one.return_value = diff_id
|
||||
|
||||
# Allow unlimited execute calls (diff INSERT + change INSERTs)
|
||||
change_insert_result = MagicMock()
|
||||
mock_session.execute = AsyncMock(
|
||||
side_effect=[prev_result, new_result, insert_result, change_insert_result]
|
||||
)
|
||||
mock_session.commit = AsyncMock()
|
||||
|
||||
mock_openbao = AsyncMock()
|
||||
mock_openbao.decrypt = AsyncMock(side_effect=[
|
||||
old_config.encode("utf-8"),
|
||||
new_config.encode("utf-8"),
|
||||
])
|
||||
|
||||
mock_changes = [
|
||||
{"component": "ip/firewall/filter", "summary": "Added 1 firewall filter rule", "raw_line": "+add chain=forward action=drop"},
|
||||
]
|
||||
|
||||
with patch(
|
||||
"app.services.config_diff_service.OpenBaoTransitService",
|
||||
return_value=mock_openbao,
|
||||
), patch(
|
||||
"app.services.config_diff_service.parse_diff_changes",
|
||||
return_value=mock_changes,
|
||||
) as mock_parser:
|
||||
await generate_and_store_diff(device_id, tenant_id, new_snapshot_id, mock_session)
|
||||
|
||||
# parse_diff_changes called with the diff text
|
||||
mock_parser.assert_called_once()
|
||||
# 4 execute calls: prev query, new query, diff INSERT, change INSERT
|
||||
assert mock_session.execute.call_count == 4
|
||||
# 2 commits: one for diff, one for changes
|
||||
assert mock_session.commit.call_count == 2
|
||||
# Verify change INSERT params
|
||||
change_call = mock_session.execute.call_args_list[3]
|
||||
change_params = change_call[0][1]
|
||||
assert change_params["diff_id"] == diff_id
|
||||
assert change_params["component"] == "ip/firewall/filter"
|
||||
assert change_params["summary"] == "Added 1 firewall filter rule"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_parser_error_does_not_block_diff():
|
||||
"""Test 7: parse_diff_changes error does not prevent diff from being stored."""
|
||||
from app.services.config_diff_service import generate_and_store_diff
|
||||
|
||||
device_id = str(uuid4())
|
||||
tenant_id = str(uuid4())
|
||||
new_snapshot_id = str(uuid4())
|
||||
old_snapshot_id = str(uuid4())
|
||||
diff_id = str(uuid4())
|
||||
|
||||
old_config = "line1\nline2"
|
||||
new_config = "line1\nline2_modified"
|
||||
|
||||
mock_session = AsyncMock()
|
||||
|
||||
prev_result = MagicMock()
|
||||
prev_result.fetchone.return_value = MagicMock(
|
||||
_mapping={"id": old_snapshot_id, "config_text": "vault:v1:old"}
|
||||
)
|
||||
new_result = MagicMock()
|
||||
new_result.scalar_one.return_value = "vault:v1:new"
|
||||
insert_result = MagicMock()
|
||||
insert_result.scalar_one.return_value = diff_id
|
||||
|
||||
mock_session.execute = AsyncMock(side_effect=[prev_result, new_result, insert_result])
|
||||
mock_session.commit = AsyncMock()
|
||||
|
||||
mock_openbao = AsyncMock()
|
||||
mock_openbao.decrypt = AsyncMock(side_effect=[
|
||||
old_config.encode("utf-8"),
|
||||
new_config.encode("utf-8"),
|
||||
])
|
||||
|
||||
with patch(
|
||||
"app.services.config_diff_service.OpenBaoTransitService",
|
||||
return_value=mock_openbao,
|
||||
), patch(
|
||||
"app.services.config_diff_service.parse_diff_changes",
|
||||
side_effect=Exception("Parser exploded"),
|
||||
):
|
||||
# Should NOT raise
|
||||
await generate_and_store_diff(device_id, tenant_id, new_snapshot_id, mock_session)
|
||||
|
||||
# Diff INSERT still happened (3 executes)
|
||||
assert mock_session.execute.call_count == 3
|
||||
# Diff commit still happened
|
||||
mock_session.commit.assert_called_once()
|
||||
|
||||
Reference in New Issue
Block a user