"""Alembic environment configuration for async SQLAlchemy with PostgreSQL.""" import asyncio import os from logging.config import fileConfig from alembic import context from sqlalchemy import pool from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config # Import Base without triggering engine creation (app.database creates engines # at module level, which would fail if DATABASE_URL points to a non-existent DB). from app.models.base import Base import app.models.tenant # noqa: F401 import app.models.user # noqa: F401 import app.models.device # noqa: F401 import app.models.config_backup # noqa: F401 # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config # Override sqlalchemy.url from environment variable so alembic never uses the # hardcoded URL in alembic.ini. DATABASE_URL takes precedence; TEST_DATABASE_URL # is a fallback for test runners that set only that variable. _db_url = os.environ.get("DATABASE_URL") or os.environ.get("TEST_DATABASE_URL") if _db_url: config.set_main_option("sqlalchemy.url", _db_url) # Interpret the config file for Python logging. if config.config_file_name is not None: fileConfig(config.config_file_name) # Add your model's MetaData object here for 'autogenerate' support target_metadata = Base.metadata def run_migrations_offline() -> None: """Run migrations in 'offline' mode.""" url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations() def do_run_migrations(connection: Connection) -> None: context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() async def run_async_migrations() -> None: """Run migrations in 'online' mode with async engine.""" connectable = async_engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) await connectable.dispose() def run_migrations_online() -> None: """Run migrations in 'online' mode.""" asyncio.run(run_async_migrations()) if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()