From eb60b219b84b3fb106c358db1c424a26f5f6c81e Mon Sep 17 00:00:00 2001 From: Jason Staack Date: Sun, 15 Mar 2026 06:19:12 -0500 Subject: [PATCH] fix(ci): switch to commit-and-cleanup test isolation Replace savepoint/shared-connection approach with real commits and table cleanup in teardown. This ensures test data is visible to API endpoint sessions without connection sharing deadlocks. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/tests/integration/conftest.py | 171 ++++++++++++++------------ 1 file changed, 93 insertions(+), 78 deletions(-) diff --git a/backend/tests/integration/conftest.py b/backend/tests/integration/conftest.py index 4a3ce56..0189b83 100644 --- a/backend/tests/integration/conftest.py +++ b/backend/tests/integration/conftest.py @@ -3,18 +3,19 @@ Integration test fixtures for the TOD backend. Provides: - Database engines (admin + app_user) pointing at real PostgreSQL+TimescaleDB -- Per-test session fixtures with transaction rollback for isolation -- app_session_factory for RLS multi-tenant tests (creates sessions with tenant context) +- Per-test session with commit-and-cleanup (no savepoint tricks) +- app_session_factory for RLS multi-tenant tests - FastAPI test client with dependency overrides - Entity factory fixtures (tenants, users, devices) -- Auth helper for getting login tokens +- Auth helper that mints JWTs directly All fixtures use the existing docker-compose PostgreSQL instance. Set TEST_DATABASE_URL / TEST_APP_USER_DATABASE_URL env vars to override defaults. -Event loop strategy: All async fixtures are function-scoped to avoid the -pytest-asyncio 0.26 session/function loop mismatch. Engine creation and DB -setup use synchronous subprocess calls (Alembic) and module-level singletons. +Test isolation strategy: tests commit data to the real database, then +clean up all test-created rows in teardown. This avoids the savepoint +visibility issues that break FK checks when API endpoints use separate +DB sessions. """ import os @@ -31,6 +32,7 @@ from httpx import ASGITransport, AsyncClient from sqlalchemy import text from sqlalchemy.ext.asyncio import ( AsyncSession, + async_sessionmaker, create_async_engine, ) @@ -64,16 +66,9 @@ def _ensure_database_setup(): backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) env = os.environ.copy() - # Ensure DATABASE_URL points at the test database, not the dev/prod URL - # hardcoded in alembic.ini. alembic/env.py reads this variable and overrides - # sqlalchemy.url before opening any connection. env["DATABASE_URL"] = TEST_DATABASE_URL - # Migration 029 (VPN tenant isolation) encrypts a WireGuard server private key - # and requires CREDENTIAL_ENCRYPTION_KEY. Provide the dev default if the - # environment does not already supply it (CI always sets this explicitly). env.setdefault("CREDENTIAL_ENCRYPTION_KEY", "LLLjnfBZTSycvL2U07HDSxUeTtLxb9cZzryQl0R9E4w=") - # Run Alembic migrations via subprocess (handles DB creation and schema) result = subprocess.run( [sys.executable, "-m", "alembic", "upgrade", "head"], capture_output=True, @@ -99,13 +94,9 @@ def setup_database(): @pytest_asyncio.fixture async def admin_engine(): - """Admin engine (superuser) -- bypasses RLS. - - Created fresh per-test to avoid event loop issues. - pool_size=2 since each test only needs a few connections. - """ + """Admin engine (superuser) -- bypasses RLS.""" engine = create_async_engine( - TEST_DATABASE_URL, echo=False, pool_pre_ping=True, pool_size=2, max_overflow=3 + TEST_DATABASE_URL, echo=False, pool_pre_ping=True, pool_size=5, max_overflow=5 ) yield engine await engine.dispose() @@ -113,46 +104,77 @@ async def admin_engine(): @pytest_asyncio.fixture async def app_engine(): - """App-user engine -- RLS enforced. - - Created fresh per-test to avoid event loop issues. - """ + """App-user engine -- RLS enforced.""" engine = create_async_engine( - TEST_APP_USER_DATABASE_URL, echo=False, pool_pre_ping=True, pool_size=2, max_overflow=3 + TEST_APP_USER_DATABASE_URL, echo=False, pool_pre_ping=True, pool_size=5, max_overflow=5 ) yield engine await engine.dispose() # --------------------------------------------------------------------------- -# Function-scoped session fixtures (fresh per test) +# Tables to clean up after each test (reverse FK order) +# --------------------------------------------------------------------------- + +_CLEANUP_TABLES = [ + "key_access_log", + "user_key_sets", + "device_certificates", + "certificate_authorities", + "template_push_jobs", + "config_template_tags", + "config_templates", + "config_push_operations", + "config_backup_schedules", + "config_backup_runs", + "router_config_changes", + "router_config_diffs", + "router_config_snapshots", + "firmware_upgrade_jobs", + "alert_rule_channels", + "alert_events", + "alert_rules", + "notification_channels", + "maintenance_windows", + "vpn_peers", + "vpn_config", + "device_tag_assignments", + "device_group_memberships", + "device_tags", + "device_groups", + "api_keys", + "audit_logs", + "devices", + "invites", + "user_tenants", + "users", + "tenants", +] + + +# --------------------------------------------------------------------------- +# Session fixtures (commit-and-cleanup, no savepoints) # --------------------------------------------------------------------------- @pytest_asyncio.fixture -async def admin_conn(admin_engine): - """Shared admin connection with transaction rollback. +async def admin_session(admin_engine) -> AsyncGenerator[AsyncSession, None]: + """Per-test admin session that commits to disk. - Both admin_session and test_app bind to the same connection so that - data created in the test (via admin_session) is visible to API - endpoints (via get_db / get_admin_db overrides). + Data is visible to all connections (including API endpoint sessions). + Cleanup deletes all rows from test tables after the test. """ - conn = await admin_engine.connect() - trans = await conn.begin() - try: - yield conn - finally: - await trans.rollback() - await conn.close() - - -@pytest_asyncio.fixture -async def admin_session(admin_conn) -> AsyncGenerator[AsyncSession, None]: - """Per-test admin session sharing the admin_conn transaction.""" - session = AsyncSession(bind=admin_conn, expire_on_commit=False) + session = AsyncSession(admin_engine, expire_on_commit=False) try: yield session finally: + # Clean up all test data in reverse FK order + for table in _CLEANUP_TABLES: + try: + await session.execute(text(f"DELETE FROM {table}")) + except Exception: + pass # Table might not exist in some migration states + await session.commit() await session.close() @@ -165,7 +187,6 @@ async def app_session(app_engine) -> AsyncGenerator[AsyncSession, None]: async with app_engine.connect() as conn: trans = await conn.begin() session = AsyncSession(bind=conn, expire_on_commit=False) - # Reset tenant context await session.execute(text("RESET app.current_tenant")) try: yield session @@ -180,10 +201,6 @@ def app_session_factory(app_engine): Each session gets its own connection and transaction (rolled back on exit). Caller can pass tenant_id to auto-set RLS context. - - Usage: - async with app_session_factory(tenant_id=str(tenant.id)) as session: - result = await session.execute(select(Device)) """ from app.database import set_tenant_context @@ -192,7 +209,6 @@ def app_session_factory(app_engine): async with app_engine.connect() as conn: trans = await conn.begin() session = AsyncSession(bind=conn, expire_on_commit=False) - # Reset tenant context to prevent leakage await session.execute(text("RESET app.current_tenant")) if tenant_id: await set_tenant_context(session, tenant_id) @@ -211,21 +227,19 @@ def app_session_factory(app_engine): @pytest_asyncio.fixture -async def test_app(admin_conn, app_engine): +async def test_app(admin_engine, app_engine): """Create a FastAPI app instance with test database dependency overrides. - Both get_db and get_admin_db bind to admin_conn (the shared connection - from admin_conn fixture). This means data created via admin_session - is visible to API endpoints, and everything rolls back after the test. + Both get_db and get_admin_db create independent sessions from the admin + engine. Since admin_session commits data to disk, it is visible to + these sessions without needing shared connections. """ from fastapi import FastAPI from app.database import get_admin_db, get_db - # Create a minimal app without lifespan app = FastAPI(lifespan=None) - # Import and mount all routers (same as main app) from app.routers.alerts import router as alerts_router from app.routers.auth import router as auth_router from app.routers.config_backups import router as config_router @@ -254,26 +268,31 @@ async def test_app(admin_conn, app_engine): app.include_router(templates_router, prefix="/api") app.include_router(vpn_router, prefix="/api") - # Register rate limiter (auth endpoints use @limiter.limit) from app.middleware.rate_limit import setup_rate_limiting setup_rate_limiting(app) - # API endpoints bind to the same shared connection as admin_session - # so test-created data is visible across the transaction. + test_session_factory = async_sessionmaker( + admin_engine, class_=AsyncSession, expire_on_commit=False + ) + async def override_get_db() -> AsyncGenerator[AsyncSession, None]: - session = AsyncSession(bind=admin_conn, expire_on_commit=False) - try: - yield session - finally: - await session.close() + async with test_session_factory() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise async def override_get_admin_db() -> AsyncGenerator[AsyncSession, None]: - session = AsyncSession(bind=admin_conn, expire_on_commit=False) - try: - yield session - finally: - await session.close() + async with test_session_factory() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise app.dependency_overrides[get_db] = override_get_db app.dependency_overrides[get_admin_db] = override_get_admin_db @@ -293,12 +312,11 @@ async def client(test_app) -> AsyncGenerator[AsyncClient, None]: import redis try: - # Rate limiter uses Redis DB 1 (see app/middleware/rate_limit.py) r = redis.Redis(host="localhost", port=6379, db=1) r.flushdb() r.close() except Exception: - pass # Redis not available -- skip clearing + pass transport = ASGITransport(app=test_app) async with AsyncClient(transport=transport, base_url="http://test") as ac: @@ -394,12 +412,8 @@ def create_test_device(): def auth_headers_factory(create_test_tenant, create_test_user): """Factory to create authenticated headers for a test user. - Creates a tenant + user, generates a JWT directly (no HTTP login - round-trip), and returns the Authorization headers dict. - - We mint the token directly rather than going through /api/auth/login - because the test admin_session uses a savepoint transaction that is - invisible to the login endpoint's own DB session. + Creates a tenant + user, commits to disk, then mints a JWT directly. + The commit ensures data is visible to API endpoint sessions. """ async def _create( @@ -410,7 +424,7 @@ def auth_headers_factory(create_test_tenant, create_test_user): tenant_name: str | None = None, existing_tenant_id: uuid.UUID | None = None, ) -> dict[str, Any]: - """Create user, mint JWT, return headers + tenant/user info.""" + """Create user, commit, mint JWT, return headers + tenant/user info.""" from app.services.auth import create_access_token if existing_tenant_id: @@ -426,7 +440,8 @@ def auth_headers_factory(create_test_tenant, create_test_user): password=password, role=role, ) - await admin_session.flush() + # Commit to disk so API endpoint sessions can see this data + await admin_session.commit() access_token = create_access_token( user_id=user.id,