-

Master/Replica or several databases at the same time

-

This is why db_session and other functions accept a DBConnect instance as -input. -This approach allows you to work with multiple hosts simultaneously - -for example, with both a master and a replica.

-

DBConnect can also accept factory functions instead of ready-made objects, -making it easy to switch hosts when needed.

-

For example, libpq can detect the master and replica when creating an engine, +

Using multiple databases at the same time (master/replica)

+

db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously. This system allows you to manage scalability and version control.

+
    +
  • Master: The primary database which runs the "live" version of your information, the location of your primary data set. The master database + handles INSERT, DELETE, and UPDATE API calls. +
  • +
  • Replicas: Secondary database(s) which run READ-ONLY versions of your information and receive updates from the master database.
  • +
+

DBConnect accepts factory functions instead of ready-made objects, making it easy to switch hosts when needed.

+ +

libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. -The before_create_session_handler hook allows you to change the host at +The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. -You’ll need third-party functionality to determine which host is the master -or the replica.

-

I have an extremely lightweight microservice pg-status that fits perfectly here.

-

The engine is not created immediately when DBConnect is initialized - -it is created only on the first request. + +

You need third-party functionality to determine which host is the master or the replica.

+

I have an extremely lightweight microservice pg-status that fits perfectly here

+

The engine is not created when DBConnect initializes, also known as lazy initialization. It is created only on the first request. The library uses lazy initialization in many places.

from context_async_sqlalchemy import DBConnect
 
diff --git a/docs/search/search_index.json b/docs/search/search_index.json
index 31a779c..a872a03 100644
--- a/docs/search/search_index.json
+++ b/docs/search/search_index.json
@@ -1 +1 @@
-{"config":{"indexing":"full","lang":["en"],"min_search_length":3,"prebuild_index":false,"separator":"[\\s\\-]+"},"docs":[{"location":"","text":"context-async-sqlalchemy No AI was used in the creation of this library. SOURCE CODE Provides a super convenient way to work with SQLAlchemy in asynchronous applications. It handles the lifecycle management of the engine, sessions, and transactions. The main goal is to provide quick and easy access to a session, without worrying about opening or closing it when it\u2019s not necessary. Key features: Extremely easy to use Automatically manages the lifecycle of the engine, sessions, and transactions (autocommit / autorollback) Does not interfere with manually opening or closing sessions and transactions when needed Framework-agnostic - works with any web framework Not a wrapper around SQLAlchemy Convenient for testing Runtime host switching Supports multiple databases and multiple sessions per database Provides tools for running concurrent SQL queries Fully lazy initialization What does usage look like? from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically How it works Here is a very simplified diagram of how everything works: Before executing your code, the middleware will prepare a container in which the sessions required by your code will be stored. The container is saved in contextvars Your code accesses the library to create new sessions and retrieve existing ones After your code, middleware will automatically commit or roll back open transactions. Closes open sessions and clears the context. The library also provides the ability to commit, rollback, and close at any time, without waiting for the end of the request, without any problems.","title":"context-async-sqlalchemy"},{"location":"#context-async-sqlalchemy","text":"No AI was used in the creation of this library. SOURCE CODE Provides a super convenient way to work with SQLAlchemy in asynchronous applications. It handles the lifecycle management of the engine, sessions, and transactions. The main goal is to provide quick and easy access to a session, without worrying about opening or closing it when it\u2019s not necessary. Key features: Extremely easy to use Automatically manages the lifecycle of the engine, sessions, and transactions (autocommit / autorollback) Does not interfere with manually opening or closing sessions and transactions when needed Framework-agnostic - works with any web framework Not a wrapper around SQLAlchemy Convenient for testing Runtime host switching Supports multiple databases and multiple sessions per database Provides tools for running concurrent SQL queries Fully lazy initialization","title":"context-async-sqlalchemy"},{"location":"#what-does-usage-look-like","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically","title":"What does usage look like?"},{"location":"#how-it-works","text":"Here is a very simplified diagram of how everything works: Before executing your code, the middleware will prepare a container in which the sessions required by your code will be stored. The container is saved in contextvars Your code accesses the library to create new sessions and retrieve existing ones After your code, middleware will automatically commit or roll back open transactions. Closes open sessions and clears the context. The library also provides the ability to commit, rollback, and close at any time, without waiting for the end of the request, without any problems.","title":"How it works"},{"location":"api/","text":"API Reference DBConnect DBConnect is responsible for managing the engine and session_maker You need to define two factories: Specify host to which you want to connect (optional). Specify a handler that runs before a session is created. It will be used to connect to the host for the first time or to reconnect to a new one (optional). init def __init__( self: DBConnect, engine_creator: EngineCreatorFunc, session_maker_creator: SessionMakerCreatorFunc, host: str | None = None, before_create_session_handler: AsyncFunc | None = None, ) -> None: engine_creator is a factory function for creating engines. It\u2019s an asynchronous callable that takes a host as input and returns an async engine. Example session_maker_creator is a factory function for creating an asynchronous session_maker. It\u2019s an asynchronous callable that takes an async engine as input and returns an async session_maker. Example host is an optional parameter. You can specify only this parameter to make your connection always work with a single host, without dynamic switching. However, you can still change the host in the handler if needed - it won\u2019t cause any issues. before_create_session_handler is a handler that allows you to execute custom logic before creating a session. For example, you can switch the host to another one - this is useful for dynamically determining the master if the previous master fails and a replica takes its place. The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing. Example: async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) connect async def connect(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. It doesn\u2019t need to be called explicitly. If you don't use the call, the first session request will automatically establishes the connection. change_host async def change_host(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. It then validates that the currently connected host is different from the target host. create_session async def create_session(self: DBConnect) -> AsyncSession: Creates a new session the library uses internally. You never need to call it directly. (Only maybe in some special cases.) session_maker async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]: Provides access to the session_maker currently used to create sessions. close async def close(self: DBConnect) -> None: Closes and cleans up all resources, freeing the connection pool. Use this call at the end of your application\u2019s life cycle. Middlewares Most of the work the \u201cmagic\u201d happens inside the middleware. Check out how it works and implement your own if the ready-made ones don't fit. FastAPI from context_async_sqlalchemy.fastapi_utils import ( fastapi_http_db_session_middleware, add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware ) Starlette from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, starlette_http_db_session_middleware, StarletteHTTPDBSessionMiddleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware ) # OR app.add_middleware(StarletteHTTPDBSessionMiddleware) Pure ASGI from context_async_sqlalchemy import ( ASGIHTTPDBSessionMiddleware, ) app = SomeASGIApp(...) app.add_middleware(ASGIHTTPDBSessionMiddleware) Sessions Here are the library functions you will use most often. They allow you to work with sessions directly from your asynchronous code. db_session async def db_session(connect: DBConnect) -> AsyncSession: The most important function for obtaining a session in your code. Returns a new session when you call it for the first time; subsequent calls return the same session. atomic_db_session @asynccontextmanager async def atomic_db_session( connect: DBConnect, current_transaction: Literal[\"commit\", \"rollback\", \"append\", \"raise\"] = \"commit\", ) -> AsyncGenerator[AsyncSession, None]: A context manager you can use to wrap another function which uses a context session, making that call isolated within its own transaction. Several options define how a function handles an open transaction. current_transaction: commit - commits the open transaction and starts a new one rollback - rolls back the open transaction and starts a new one append - continues using the current transaction and commits it raise - raises an InvalidRequestError commit_db_session async def commit_db_session(connect: DBConnect) -> None: Commits the active session. rollback_db_session async def rollback_db_session(connect: DBConnect) -> None: Rolls back an active session. close_db_session async def close_db_session(connect: DBConnect) -> None: Closes the current context session and returns the connection to the pool. If you close an uncommitted transaction, the connection rolls back This is useful when you need to run a database query at the start of the handler, then continue working over time without keeping the connection open. new_non_ctx_session @asynccontextmanager async def new_non_ctx_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session without placing it in a context. new_non_ctx_atomic_session @asynccontextmanager async def new_non_ctx_atomic_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: Runs a function in a new context with new session(s) that have a separate connection. It commits the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction rolls back. It is intended to allow you to run multiple database queries concurrently. Context run_in_new_ctx async def run_in_new_ctx( callable_func: AsyncCallable[AsyncCallableResult], *args: Any, **kwargs: Any, ) -> AsyncCallableResult: Runs a function in a new context with new session(s) that have a separate connection. It commits the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction rolls back. It is intended to allow you to run multiple database queries concurrently. example of use: await asyncio.gather( your_function_with_db_session(...), run_in_new_ctx( your_function_with_db_session, some_arg, some_kwarg=123, ), run_in_new_ctx(your_function_with_db_session, ...), ) Testing rollback_session @asynccontextmanager async def rollback_session( connection: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that creates a session which automatically rolls back at the end of the session. It is intended for you to use in fixtures to execute SQL queries during tests. set_test_context @asynccontextmanager async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]: A context manager that creates a new context in which you can place a dedicated test session. It is intended to test the application when it shares a single transaction. Use auto_close=False if you\u2019re using a test session and transaction that you close elsewhere in your code. Use auto_close=True if you want to call a function in a test that uses a context while the middleware is not active. All sessions close automatically. put_savepoint_session_in_ctx async def put_savepoint_session_in_ctx( connection: DBConnect, session: AsyncSession, ) -> AsyncGenerator[None, None]: Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. It is important to use this function inside set_test_context. Learn more about testing","title":"API Reference"},{"location":"api/#api-reference","text":"","title":"API Reference"},{"location":"api/#dbconnect","text":"DBConnect is responsible for managing the engine and session_maker You need to define two factories: Specify host to which you want to connect (optional). Specify a handler that runs before a session is created. It will be used to connect to the host for the first time or to reconnect to a new one (optional).","title":"DBConnect"},{"location":"api/#init","text":"def __init__( self: DBConnect, engine_creator: EngineCreatorFunc, session_maker_creator: SessionMakerCreatorFunc, host: str | None = None, before_create_session_handler: AsyncFunc | None = None, ) -> None: engine_creator is a factory function for creating engines. It\u2019s an asynchronous callable that takes a host as input and returns an async engine. Example session_maker_creator is a factory function for creating an asynchronous session_maker. It\u2019s an asynchronous callable that takes an async engine as input and returns an async session_maker. Example host is an optional parameter. You can specify only this parameter to make your connection always work with a single host, without dynamic switching. However, you can still change the host in the handler if needed - it won\u2019t cause any issues. before_create_session_handler is a handler that allows you to execute custom logic before creating a session. For example, you can switch the host to another one - this is useful for dynamically determining the master if the previous master fails and a replica takes its place. The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing. Example: async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host)","title":"init"},{"location":"api/#connect","text":"async def connect(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. It doesn\u2019t need to be called explicitly. If you don't use the call, the first session request will automatically establishes the connection.","title":"connect"},{"location":"api/#change_host","text":"async def change_host(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. It then validates that the currently connected host is different from the target host.","title":"change_host"},{"location":"api/#create_session","text":"async def create_session(self: DBConnect) -> AsyncSession: Creates a new session the library uses internally. You never need to call it directly. (Only maybe in some special cases.)","title":"create_session"},{"location":"api/#session_maker","text":"async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]: Provides access to the session_maker currently used to create sessions.","title":"session_maker"},{"location":"api/#close","text":"async def close(self: DBConnect) -> None: Closes and cleans up all resources, freeing the connection pool. Use this call at the end of your application\u2019s life cycle.","title":"close"},{"location":"api/#middlewares","text":"Most of the work the \u201cmagic\u201d happens inside the middleware. Check out how it works and implement your own if the ready-made ones don't fit.","title":"Middlewares"},{"location":"api/#fastapi","text":"from context_async_sqlalchemy.fastapi_utils import ( fastapi_http_db_session_middleware, add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware )","title":"FastAPI"},{"location":"api/#starlette","text":"from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, starlette_http_db_session_middleware, StarletteHTTPDBSessionMiddleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware ) # OR app.add_middleware(StarletteHTTPDBSessionMiddleware)","title":"Starlette"},{"location":"api/#pure-asgi","text":"from context_async_sqlalchemy import ( ASGIHTTPDBSessionMiddleware, ) app = SomeASGIApp(...) app.add_middleware(ASGIHTTPDBSessionMiddleware)","title":"Pure ASGI"},{"location":"api/#sessions","text":"Here are the library functions you will use most often. They allow you to work with sessions directly from your asynchronous code.","title":"Sessions"},{"location":"api/#db_session","text":"async def db_session(connect: DBConnect) -> AsyncSession: The most important function for obtaining a session in your code. Returns a new session when you call it for the first time; subsequent calls return the same session.","title":"db_session"},{"location":"api/#atomic_db_session","text":"@asynccontextmanager async def atomic_db_session( connect: DBConnect, current_transaction: Literal[\"commit\", \"rollback\", \"append\", \"raise\"] = \"commit\", ) -> AsyncGenerator[AsyncSession, None]: A context manager you can use to wrap another function which uses a context session, making that call isolated within its own transaction. Several options define how a function handles an open transaction. current_transaction: commit - commits the open transaction and starts a new one rollback - rolls back the open transaction and starts a new one append - continues using the current transaction and commits it raise - raises an InvalidRequestError","title":"atomic_db_session"},{"location":"api/#commit_db_session","text":"async def commit_db_session(connect: DBConnect) -> None: Commits the active session.","title":"commit_db_session"},{"location":"api/#rollback_db_session","text":"async def rollback_db_session(connect: DBConnect) -> None: Rolls back an active session.","title":"rollback_db_session"},{"location":"api/#close_db_session","text":"async def close_db_session(connect: DBConnect) -> None: Closes the current context session and returns the connection to the pool. If you close an uncommitted transaction, the connection rolls back This is useful when you need to run a database query at the start of the handler, then continue working over time without keeping the connection open.","title":"close_db_session"},{"location":"api/#new_non_ctx_session","text":"@asynccontextmanager async def new_non_ctx_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session without placing it in a context.","title":"new_non_ctx_session"},{"location":"api/#new_non_ctx_atomic_session","text":"@asynccontextmanager async def new_non_ctx_atomic_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: Runs a function in a new context with new session(s) that have a separate connection. It commits the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction rolls back. It is intended to allow you to run multiple database queries concurrently.","title":"new_non_ctx_atomic_session"},{"location":"api/#context","text":"","title":"Context"},{"location":"api/#run_in_new_ctx","text":"async def run_in_new_ctx( callable_func: AsyncCallable[AsyncCallableResult], *args: Any, **kwargs: Any, ) -> AsyncCallableResult: Runs a function in a new context with new session(s) that have a separate connection. It commits the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction rolls back. It is intended to allow you to run multiple database queries concurrently. example of use: await asyncio.gather( your_function_with_db_session(...), run_in_new_ctx( your_function_with_db_session, some_arg, some_kwarg=123, ), run_in_new_ctx(your_function_with_db_session, ...), )","title":"run_in_new_ctx"},{"location":"api/#testing","text":"","title":"Testing"},{"location":"api/#rollback_session","text":"@asynccontextmanager async def rollback_session( connection: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that creates a session which automatically rolls back at the end of the session. It is intended for you to use in fixtures to execute SQL queries during tests.","title":"rollback_session"},{"location":"api/#set_test_context","text":"@asynccontextmanager async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]: A context manager that creates a new context in which you can place a dedicated test session. It is intended to test the application when it shares a single transaction. Use auto_close=False if you\u2019re using a test session and transaction that you close elsewhere in your code. Use auto_close=True if you want to call a function in a test that uses a context while the middleware is not active. All sessions close automatically.","title":"set_test_context"},{"location":"api/#put_savepoint_session_in_ctx","text":"async def put_savepoint_session_in_ctx( connection: DBConnect, session: AsyncSession, ) -> AsyncGenerator[None, None]: Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. It is important to use this function inside set_test_context. Learn more about testing","title":"put_savepoint_session_in_ctx"},{"location":"concurrent_queries/","text":"Concurrent sql queries Concurrent query execution deserves special attention. In SQLAlchemy, you can\u2019t run multiple queries concurrently within the same session - you need to create a new one. The library provides two simple ways to execute queries concurrently: Run a function in a new context - run_in_new_ctx Create a new session that is completely independent of the current context - new_non_ctx_atomic_session or new_non_ctx_session import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions. For example, to run several queries concurrently. You can also use the same techniques to create new sessions whenever you need them, not necessarily because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commits the transaction (optional) await commit_db_session(connection) # manually closes the session (optional) await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using context to work with sessions is optional. \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using context to work with sessions is optional. \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Concurrent SQL Queries"},{"location":"concurrent_queries/#concurrent-sql-queries","text":"Concurrent query execution deserves special attention. In SQLAlchemy, you can\u2019t run multiple queries concurrently within the same session - you need to create a new one. The library provides two simple ways to execute queries concurrently: Run a function in a new context - run_in_new_ctx Create a new session that is completely independent of the current context - new_non_ctx_atomic_session or new_non_ctx_session import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions. For example, to run several queries concurrently. You can also use the same techniques to create new sessions whenever you need them, not necessarily because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commits the transaction (optional) await commit_db_session(connection) # manually closes the session (optional) await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using context to work with sessions is optional. \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using context to work with sessions is optional. \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Concurrent sql queries"},{"location":"examples/","text":"Usage examples You can see not only fragments of examples, but also web application examples . Basic usage from sqlalchemy import insert from context_async_sqlalchemy import db_session from ..database import connection from ..models import ExampleTable async def handler_with_db_session() -> None: \"\"\" A typical handle that uses a context session to work with a database. Autocommit or autorollback occurs automatically at the end of a request in middleware. \"\"\" # Creates a session with no connection to the database yet # If you call db_session again, it returns the same session # even in child coroutines. session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction are opened await session.execute(stmt) # Commits automatically Atomic from context_async_sqlalchemy import atomic_db_session, db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_atomic() -> None: \"\"\" You have a function that works with a contextual session, and its use case calls autocommit at the end of the request. You want to reuse this function, but you need to commit immediately, instead of wait for the request to complete. \"\"\" # the transaction commits or rolls back automatically # using the context manager async with atomic_db_session(connection): await _insert_1() # a new transaction in the same connection await _insert_1() async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_atomic\" ) await session.execute(stmt) Manually close the transaction and session from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_close() -> None: \"\"\" An example of a handle that uses a session in context, but commits manually and closes the session to release the connection. \"\"\" # new connect -> new transaction -> commit await _insert_1() # old connect -> new transaction -> commit -> close connect await _insert_2() # new connect -> new transaction await _insert_3() # same connect -> same transaction await _insert_3() # autocommit async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await session.commit() # or await commit_db_session() async def _insert_2() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await commit_db_session(connection) # We closed the session and returned the connection to the pool. # Use if you have more work you need to complete without keeping the connection open. await close_db_session(connection) async def _insert_3() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) Multiple sessions and concurrent execution import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions. For example, to run several queries concurrently. You can also use the same techniques to create new sessions whenever you need them, not necessarily because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commits the transaction (optional) await commit_db_session(connection) # manually closes the session (optional) await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using context to work with sessions is optional. \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using context to work with sessions is optional. \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit() Rollback from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_exception() -> None: \"\"\" let's imagine that an exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise Exception(\"Some exception\") # transaction rolls back automatically from fastapi import HTTPException from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_http_exception() -> None: \"\"\" let's imagine that an http exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise HTTPException(status_code=500) # transaction rolls back automatically by status code from context_async_sqlalchemy import db_session, rollback_db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_rollback() -> None: \"\"\" An example of a handle that uses a rollback \"\"\" # it's convenient this way await _insert() await rollback_db_session(connection) # but it's possible this way too await _insert() session = await db_session(connection) await session.rollback() async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Usage Examples"},{"location":"examples/#usage-examples","text":"You can see not only fragments of examples, but also web application examples .","title":"Usage examples"},{"location":"examples/#basic-usage","text":"from sqlalchemy import insert from context_async_sqlalchemy import db_session from ..database import connection from ..models import ExampleTable async def handler_with_db_session() -> None: \"\"\" A typical handle that uses a context session to work with a database. Autocommit or autorollback occurs automatically at the end of a request in middleware. \"\"\" # Creates a session with no connection to the database yet # If you call db_session again, it returns the same session # even in child coroutines. session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction are opened await session.execute(stmt) # Commits automatically","title":"Basic usage"},{"location":"examples/#atomic","text":"from context_async_sqlalchemy import atomic_db_session, db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_atomic() -> None: \"\"\" You have a function that works with a contextual session, and its use case calls autocommit at the end of the request. You want to reuse this function, but you need to commit immediately, instead of wait for the request to complete. \"\"\" # the transaction commits or rolls back automatically # using the context manager async with atomic_db_session(connection): await _insert_1() # a new transaction in the same connection await _insert_1() async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_atomic\" ) await session.execute(stmt)","title":"Atomic"},{"location":"examples/#manually-close-the-transaction-and-session","text":"from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_close() -> None: \"\"\" An example of a handle that uses a session in context, but commits manually and closes the session to release the connection. \"\"\" # new connect -> new transaction -> commit await _insert_1() # old connect -> new transaction -> commit -> close connect await _insert_2() # new connect -> new transaction await _insert_3() # same connect -> same transaction await _insert_3() # autocommit async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await session.commit() # or await commit_db_session() async def _insert_2() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await commit_db_session(connection) # We closed the session and returned the connection to the pool. # Use if you have more work you need to complete without keeping the connection open. await close_db_session(connection) async def _insert_3() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Manually close the transaction and session"},{"location":"examples/#multiple-sessions-and-concurrent-execution","text":"import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions. For example, to run several queries concurrently. You can also use the same techniques to create new sessions whenever you need them, not necessarily because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commits the transaction (optional) await commit_db_session(connection) # manually closes the session (optional) await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using context to work with sessions is optional. \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using context to work with sessions is optional. \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Multiple sessions and concurrent execution"},{"location":"examples/#rollback","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_exception() -> None: \"\"\" let's imagine that an exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise Exception(\"Some exception\") # transaction rolls back automatically from fastapi import HTTPException from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_http_exception() -> None: \"\"\" let's imagine that an http exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise HTTPException(status_code=500) # transaction rolls back automatically by status code from context_async_sqlalchemy import db_session, rollback_db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_rollback() -> None: \"\"\" An example of a handle that uses a rollback \"\"\" # it's convenient this way await _insert() await rollback_db_session(connection) # but it's possible this way too await _insert() session = await db_session(connection) await session.rollback() async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Rollback"},{"location":"getting_started/","text":"Getting started Configure the connection to the database For example, for PostgreSQL's database.py: from sqlalchemy.ext.asyncio import ( async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine, ) from context_async_sqlalchemy import DBConnect def create_engine(host: str) -> AsyncEngine: \"\"\" database connection parameters. \"\"\" # In production code, you will probably take these parameters from env pg_user = \"krylosov-aa\" pg_password = \"\" pg_port = 6432 pg_db = \"test\" return create_async_engine( f\"postgresql+asyncpg://\" f\"{pg_user}:{pg_password}\" f\"@{host}:{pg_port}\" f\"/{pg_db}\", future=True, pool_pre_ping=True, ) def create_session_maker( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: \"\"\"session parameters\"\"\" return async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) connection = DBConnect( engine_creator=create_engine, session_maker_creator=create_session_maker, host=\"127.0.0.1\", # optional ) The host parameter is optional if you use a handler before creating a session: before_create_session_handler . From there, you can dynamically set the host. Read more in Master/Replica or several databases at the same time Manage the database connection lifecycle Close the resources at the end of your application's lifecycle. Example for FastAPI: from contextlib import asynccontextmanager from typing import Any, AsyncGenerator from fastapi import FastAPI from database import connection @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: \"\"\"Database connection lifecycle management\"\"\" yield await connection.close() # Close the engine if it was open Setup middleware Middleware handles the most important and complex parts: managing context and sessions. You can use ready-made middleware components: Pure ASGI from context_async_sqlalchemy import ASGIHTTPDBSessionMiddleware app.add_middleware(ASGIHTTPDBSessionMiddleware) FastAPI from context_async_sqlalchemy.fastapi_utils import ( add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) Starlette from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) Write own If there\u2019s no ready-made solution that fits your needs, don\u2019t worry! You can check out how it works implement your own. Use it from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically Examples The repository includes an example integration with FastAPI demonstrating various workflows. It also contains two types of test setups that you can use in your own projects. All library tests are included within the examples - because we aim to test the functionality in the context of a real, asynchronous web application. FastAPI tests example","title":"Getting Started"},{"location":"getting_started/#getting-started","text":"","title":"Getting started"},{"location":"getting_started/#configure-the-connection-to-the-database","text":"For example, for PostgreSQL's database.py: from sqlalchemy.ext.asyncio import ( async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine, ) from context_async_sqlalchemy import DBConnect def create_engine(host: str) -> AsyncEngine: \"\"\" database connection parameters. \"\"\" # In production code, you will probably take these parameters from env pg_user = \"krylosov-aa\" pg_password = \"\" pg_port = 6432 pg_db = \"test\" return create_async_engine( f\"postgresql+asyncpg://\" f\"{pg_user}:{pg_password}\" f\"@{host}:{pg_port}\" f\"/{pg_db}\", future=True, pool_pre_ping=True, ) def create_session_maker( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: \"\"\"session parameters\"\"\" return async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) connection = DBConnect( engine_creator=create_engine, session_maker_creator=create_session_maker, host=\"127.0.0.1\", # optional ) The host parameter is optional if you use a handler before creating a session: before_create_session_handler . From there, you can dynamically set the host. Read more in Master/Replica or several databases at the same time","title":"Configure the connection to the database"},{"location":"getting_started/#manage-the-database-connection-lifecycle","text":"Close the resources at the end of your application's lifecycle. Example for FastAPI: from contextlib import asynccontextmanager from typing import Any, AsyncGenerator from fastapi import FastAPI from database import connection @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: \"\"\"Database connection lifecycle management\"\"\" yield await connection.close() # Close the engine if it was open","title":"Manage the database connection lifecycle"},{"location":"getting_started/#setup-middleware","text":"Middleware handles the most important and complex parts: managing context and sessions. You can use ready-made middleware components:","title":"Setup middleware"},{"location":"getting_started/#pure-asgi","text":"from context_async_sqlalchemy import ASGIHTTPDBSessionMiddleware app.add_middleware(ASGIHTTPDBSessionMiddleware)","title":"Pure ASGI"},{"location":"getting_started/#fastapi","text":"from context_async_sqlalchemy.fastapi_utils import ( add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app)","title":"FastAPI"},{"location":"getting_started/#starlette","text":"from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app)","title":"Starlette"},{"location":"getting_started/#write-own","text":"If there\u2019s no ready-made solution that fits your needs, don\u2019t worry! You can check out how it works implement your own.","title":"Write own"},{"location":"getting_started/#use-it","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically","title":"Use it"},{"location":"getting_started/#examples","text":"The repository includes an example integration with FastAPI demonstrating various workflows. It also contains two types of test setups that you can use in your own projects. All library tests are included within the examples - because we aim to test the functionality in the context of a real, asynchronous web application. FastAPI tests example","title":"Examples"},{"location":"how_middleware_works/","text":"How middleware works Most of the work - and the \u201cmagic\u201d - happens inside the middleware. Here is a diagram describing how it works. At the beginning of a request, the middleware initializes a new asynchronous context. This asynchronous context is implemented using a contextvar. It holds a mutable container that stores sessions. A mutable container is used so that any coroutine, at any level, can create, modify, or close sessions, and those changes will affect the execution of the entire request. Whenever your code accesses the library\u2019s functionality, it interacts with this container. Finally, the middleware checks the container for any active sessions and open transactions. If transactions are open, they are either committed when the query execution is successful or rolled back if it fails. After that, all sessions are closed. That\u2019s precisely why you can safely close transactions and sessions early. The middleware simply works with whatever is in the container: if there\u2019s anything left, it will close it properly; if you\u2019ve already handled it yourself, the middleware only needs to reset the context. Code example The library aims to provide ready-made solutions so that you don\u2019t have to worry about these details - but they\u2019re not always available. So, let\u2019s take a look at how Starlette middleware works. You can use this example as a reference to implement your own. CODE from starlette.middleware.base import ( # type: ignore[attr-defined] Request, Response, RequestResponseEndpoint, BaseHTTPMiddleware, ) from context_async_sqlalchemy import ( init_db_session_ctx, is_context_initiated, reset_db_session_ctx, auto_commit_by_status_code, rollback_all_sessions, ) async def starlette_http_db_session_middleware( request: Request, call_next: RequestResponseEndpoint ) -> Response: \"\"\" Database session lifecycle management. The session itself is created on demand in db_session(). Transaction auto-commit is implemented if there is no exception and the response status is < 400. Otherwise, a rollback is performed. But you can commit or rollback manually in the handler. \"\"\" # Tests have different session management rules # so if the context variable is already set, we do nothing if is_context_initiated(): return await call_next(request) # We set the context here, meaning all child coroutines will receive the # same context. And even if a child coroutine requests the # session first, the container itself is shared, and this coroutine will # add the session to container = shared context. token = init_db_session_ctx() try: response = await call_next(request) # using the status code, we decide to commit or rollback all sessions await auto_commit_by_status_code(response.status_code) return response except Exception: # If an exception occurs, we roll all sessions back await rollback_all_sessions() raise finally: # Close all sessions and clear the context await reset_db_session_ctx(token)","title":"How middleware works"},{"location":"how_middleware_works/#how-middleware-works","text":"Most of the work - and the \u201cmagic\u201d - happens inside the middleware. Here is a diagram describing how it works. At the beginning of a request, the middleware initializes a new asynchronous context. This asynchronous context is implemented using a contextvar. It holds a mutable container that stores sessions. A mutable container is used so that any coroutine, at any level, can create, modify, or close sessions, and those changes will affect the execution of the entire request. Whenever your code accesses the library\u2019s functionality, it interacts with this container. Finally, the middleware checks the container for any active sessions and open transactions. If transactions are open, they are either committed when the query execution is successful or rolled back if it fails. After that, all sessions are closed. That\u2019s precisely why you can safely close transactions and sessions early. The middleware simply works with whatever is in the container: if there\u2019s anything left, it will close it properly; if you\u2019ve already handled it yourself, the middleware only needs to reset the context.","title":"How middleware works"},{"location":"how_middleware_works/#code-example","text":"The library aims to provide ready-made solutions so that you don\u2019t have to worry about these details - but they\u2019re not always available. So, let\u2019s take a look at how Starlette middleware works. You can use this example as a reference to implement your own. CODE from starlette.middleware.base import ( # type: ignore[attr-defined] Request, Response, RequestResponseEndpoint, BaseHTTPMiddleware, ) from context_async_sqlalchemy import ( init_db_session_ctx, is_context_initiated, reset_db_session_ctx, auto_commit_by_status_code, rollback_all_sessions, ) async def starlette_http_db_session_middleware( request: Request, call_next: RequestResponseEndpoint ) -> Response: \"\"\" Database session lifecycle management. The session itself is created on demand in db_session(). Transaction auto-commit is implemented if there is no exception and the response status is < 400. Otherwise, a rollback is performed. But you can commit or rollback manually in the handler. \"\"\" # Tests have different session management rules # so if the context variable is already set, we do nothing if is_context_initiated(): return await call_next(request) # We set the context here, meaning all child coroutines will receive the # same context. And even if a child coroutine requests the # session first, the container itself is shared, and this coroutine will # add the session to container = shared context. token = init_db_session_ctx() try: response = await call_next(request) # using the status code, we decide to commit or rollback all sessions await auto_commit_by_status_code(response.status_code) return response except Exception: # If an exception occurs, we roll all sessions back await rollback_all_sessions() raise finally: # Close all sessions and clear the context await reset_db_session_ctx(token)","title":"Code example"},{"location":"master_replica/","text":"Master/Replica or several databases at the same time This is why db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously - for example, with both a master and a replica. DBConnect can also accept factory functions instead of ready-made objects, making it easy to switch hosts when needed. For example, libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. You\u2019ll need third-party functionality to determine which host is the master or the replica. I have an extremely lightweight microservice pg-status that fits perfectly here. The engine is not created immediately when DBConnect is initialized - it is created only on the first request. The library uses lazy initialization in many places. from context_async_sqlalchemy import DBConnect from master_replica_helper import get_master, get_replica async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) master = DBConnect( ..., before_create_session_handler=renew_master_connect, ) async def renew_replica_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the replica has changed\"\"\" replica_host = await get_replica() if replica_host != connect.host: await connect.change_host(replica_host) replica = DBConnect( ..., before_create_session_handler=renew_replica_connect, )","title":"Master/Replica or several databases at the same time"},{"location":"master_replica/#masterreplica-or-several-databases-at-the-same-time","text":"This is why db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously - for example, with both a master and a replica. DBConnect can also accept factory functions instead of ready-made objects, making it easy to switch hosts when needed. For example, libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. You\u2019ll need third-party functionality to determine which host is the master or the replica.","title":"Master/Replica or several databases at the same time"},{"location":"master_replica/#i-have-an-extremely-lightweight-microservice-pg-status-that-fits-perfectly-here","text":"The engine is not created immediately when DBConnect is initialized - it is created only on the first request. The library uses lazy initialization in many places. from context_async_sqlalchemy import DBConnect from master_replica_helper import get_master, get_replica async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) master = DBConnect( ..., before_create_session_handler=renew_master_connect, ) async def renew_replica_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the replica has changed\"\"\" replica_host = await get_replica() if replica_host != connect.host: await connect.change_host(replica_host) replica = DBConnect( ..., before_create_session_handler=renew_replica_connect, )","title":"I have an extremely lightweight microservice pg-status that fits perfectly here."},{"location":"problem/","text":"The Problem It Solves SQLAlchemy uses an engine that manages the connection pool. The engine must remain active for as long as the application is running, providing ready-to-use connections whenever the application needs them. In the application, we work with sessions. A session obtains a single connection from the pool and should have a short lifespan - usually lasting only for the duration of a single request, or even less. Let's see what existing solutions are available to manage sessions: Manual solution This is how the code duplicates, and two connections and two transactions are in use - even when only one connection and one transaction are actually needed. @app.post(\"/users/\") async def create_user(name): await insert_user(name) await insert_user_profile(name) async def insert_user(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) async def insert_user_profile(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) You can move the duplicated code to a higher level, which will result in using a single connection and a single transaction. @app.post(\"/users/\") async def create_user(name:): async with get_async_session() as session: async with session.begin(): await insert_user(name, session) await insert_user_profile(name, session) async def insert_user(name, session): await session.execute(stmt) async def insert_user_profile(name, session): await session.execute(stmt) If you look at it more broadly, the code duplication doesn\u2019t go away - you still have to do this in every handler. @app.post(\"/dogs/\") async def create_dog(name): async with get_async_session() as session: async with session.begin(): ... @app.post(\"/cats\") async def create_cat(name): async with get_async_session() as session: async with session.begin(): ... You also have to set everything up manually. No ready-made integration solutions are available - which means freedom but also a lot of coding. Dependency You can use a dependency. For example, in FastAPI, it looks like this: async def get_atomic_session(): async with session_maker() as session: async with session.begin(): yield session @app.post(\"/dogs/\") async def create_dog(name, session=Depends(get_atomic_session)): ... @app.post(\"/cats/\") async def create_cat(name, session=Depends(get_atomic_session)): ... There are two problems here: You can\u2019t close the session or transaction prematurely, because the dependency is responsible for that. The session has to be passed all the way down the stack to the place where it\u2019s actually needed. By the way, there\u2019s no ready-made solution for integrating with the framework - you have to implement the dependency yourself. Wrappers over sqlalachemy There are various wrappers that often provide more convenient integration. Litestar, for example, has the same advantages and disadvantages as using dependencies: config = SQLAlchemyAsyncConfig( connection_string=URL ) sqlalchemy_plugin = SQLAlchemyInitPlugin(config) class UserRepository(SQLAlchemyAsyncRepository[User]): model_type = User @post(\"/users\") async def create_user(data: User, repo: UserRepository): await repo.add(data) # <- insert into User Here\u2019s an example using Ormar: class BaseMeta(ormar.ModelMeta): ... class User(ormar.Model): ... @app.post(\"/users/\") async def create_user(name): await User.objects.create(name=name) The main problem with wrappers is that they require developers to learn something new. They introduce their own syntax - so even if a developer is familiar with SQLAlchemy, it doesn\u2019t mean they\u2019ll understand the wrapper. Wrappers are also often designed for convenience when working with simple CRUD operations, but writing complex SQL queries with them can be very challenging. Solution And this library solves all of these issues: Easy integration with web frameworks Automatic management of engine, session, and transaction lifecycles Manually close a session at any time without waiting for the end of process Access a session from the context only where it\u2019s actually needed","title":"The Problem It Solves"},{"location":"problem/#the-problem-it-solves","text":"SQLAlchemy uses an engine that manages the connection pool. The engine must remain active for as long as the application is running, providing ready-to-use connections whenever the application needs them. In the application, we work with sessions. A session obtains a single connection from the pool and should have a short lifespan - usually lasting only for the duration of a single request, or even less. Let's see what existing solutions are available to manage sessions:","title":"The Problem It Solves"},{"location":"problem/#manual-solution","text":"This is how the code duplicates, and two connections and two transactions are in use - even when only one connection and one transaction are actually needed. @app.post(\"/users/\") async def create_user(name): await insert_user(name) await insert_user_profile(name) async def insert_user(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) async def insert_user_profile(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) You can move the duplicated code to a higher level, which will result in using a single connection and a single transaction. @app.post(\"/users/\") async def create_user(name:): async with get_async_session() as session: async with session.begin(): await insert_user(name, session) await insert_user_profile(name, session) async def insert_user(name, session): await session.execute(stmt) async def insert_user_profile(name, session): await session.execute(stmt) If you look at it more broadly, the code duplication doesn\u2019t go away - you still have to do this in every handler. @app.post(\"/dogs/\") async def create_dog(name): async with get_async_session() as session: async with session.begin(): ... @app.post(\"/cats\") async def create_cat(name): async with get_async_session() as session: async with session.begin(): ... You also have to set everything up manually. No ready-made integration solutions are available - which means freedom but also a lot of coding.","title":"Manual solution"},{"location":"problem/#dependency","text":"You can use a dependency. For example, in FastAPI, it looks like this: async def get_atomic_session(): async with session_maker() as session: async with session.begin(): yield session @app.post(\"/dogs/\") async def create_dog(name, session=Depends(get_atomic_session)): ... @app.post(\"/cats/\") async def create_cat(name, session=Depends(get_atomic_session)): ... There are two problems here: You can\u2019t close the session or transaction prematurely, because the dependency is responsible for that. The session has to be passed all the way down the stack to the place where it\u2019s actually needed. By the way, there\u2019s no ready-made solution for integrating with the framework - you have to implement the dependency yourself.","title":"Dependency"},{"location":"problem/#wrappers-over-sqlalachemy","text":"There are various wrappers that often provide more convenient integration. Litestar, for example, has the same advantages and disadvantages as using dependencies: config = SQLAlchemyAsyncConfig( connection_string=URL ) sqlalchemy_plugin = SQLAlchemyInitPlugin(config) class UserRepository(SQLAlchemyAsyncRepository[User]): model_type = User @post(\"/users\") async def create_user(data: User, repo: UserRepository): await repo.add(data) # <- insert into User Here\u2019s an example using Ormar: class BaseMeta(ormar.ModelMeta): ... class User(ormar.Model): ... @app.post(\"/users/\") async def create_user(name): await User.objects.create(name=name) The main problem with wrappers is that they require developers to learn something new. They introduce their own syntax - so even if a developer is familiar with SQLAlchemy, it doesn\u2019t mean they\u2019ll understand the wrapper. Wrappers are also often designed for convenience when working with simple CRUD operations, but writing complex SQL queries with them can be very challenging.","title":"Wrappers over sqlalachemy"},{"location":"problem/#solution","text":"And this library solves all of these issues: Easy integration with web frameworks Automatic management of engine, session, and transaction lifecycles Manually close a session at any time without waiting for the end of process Access a session from the context only where it\u2019s actually needed","title":"Solution"},{"location":"testing/","text":"Testing When testing with a real database, one important problem needs to be solved - ensuring data isolation between tests. There are basically two approaches: Separate sessions The test has its own session that it uses to prepare data and verify results after execution. The application also has its own session. Data isolation is achieved by clearing all tables at the end of each test (and once before running all tests). Shared session and transaction The test and the application share the same session and transaction. Data isolation is achieved by rolling back the transaction at the end of the test. Personally, I prefer the first option, because it is a more \"honest\" way to test the application. We can verify how it handles sessions and transactions on its own. It\u2019s also convenient to inspect the database state when a test is paused. Sometimes, there are complex session management scenarios (for example, concurrent query execution) where other types of testing are either impossible or very difficult. The main disadvantage of this approach is the slower execution speed. Since we clear all tables after each test, this process takes additional time. This is where the second approach comes in - its main advantage is speed, as rolling back a transaction is very fast. In my projects, I use both approaches at the same time: For most tests with simple or common logic, I use a shared transaction for the test and the application For more complex cases, or ones that cannot be tested this way, I use separate transactions. This combination allows for both good performance and convenient testing. The library provides several utilities that can be used in tests - for example, in fixtures. They help create tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback. You can see these capabilities in the examples: Here are tests with a common transaction between the application and the tests. And here's an example with different transactions. Create session with autorollback rollback_session creates a session that always rolls back automatically. from context_async_sqlalchemy.test_utils import rollback_session @pytest_asyncio.fixture async def db_session_test() -> AsyncGenerator[AsyncSession, None]: \"\"\"The session that is used inside the test\"\"\" async with rollback_session(connection) as session: yield session Override context set_test_context creates a new context put_savepoint_session_in_ctx puts into context a session that uses the same connection as db_session_test , but if you commit in this session, then the transaction will not be committed, but save point will be released from context_async_sqlalchemy.test_utils import ( put_savepoint_session_in_ctx, set_test_context, ) @pytest_asyncio.fixture(autouse=True) async def db_session_override( db_session_test: AsyncSession, ) -> AsyncGenerator[None, None]: \"\"\" The key thing about these tests is that we override the context in advance. The middleware has a special check that won't initialize the context if it already exists. \"\"\" async with set_test_context(): async with put_savepoint_session_in_ctx(connection, db_session_test): yield","title":"Testing"},{"location":"testing/#testing","text":"When testing with a real database, one important problem needs to be solved - ensuring data isolation between tests. There are basically two approaches: Separate sessions The test has its own session that it uses to prepare data and verify results after execution. The application also has its own session. Data isolation is achieved by clearing all tables at the end of each test (and once before running all tests). Shared session and transaction The test and the application share the same session and transaction. Data isolation is achieved by rolling back the transaction at the end of the test. Personally, I prefer the first option, because it is a more \"honest\" way to test the application. We can verify how it handles sessions and transactions on its own. It\u2019s also convenient to inspect the database state when a test is paused. Sometimes, there are complex session management scenarios (for example, concurrent query execution) where other types of testing are either impossible or very difficult. The main disadvantage of this approach is the slower execution speed. Since we clear all tables after each test, this process takes additional time. This is where the second approach comes in - its main advantage is speed, as rolling back a transaction is very fast. In my projects, I use both approaches at the same time: For most tests with simple or common logic, I use a shared transaction for the test and the application For more complex cases, or ones that cannot be tested this way, I use separate transactions. This combination allows for both good performance and convenient testing. The library provides several utilities that can be used in tests - for example, in fixtures. They help create tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback. You can see these capabilities in the examples: Here are tests with a common transaction between the application and the tests. And here's an example with different transactions.","title":"Testing"},{"location":"testing/#create-session-with-autorollback","text":"rollback_session creates a session that always rolls back automatically. from context_async_sqlalchemy.test_utils import rollback_session @pytest_asyncio.fixture async def db_session_test() -> AsyncGenerator[AsyncSession, None]: \"\"\"The session that is used inside the test\"\"\" async with rollback_session(connection) as session: yield session","title":"Create session with autorollback"},{"location":"testing/#override-context","text":"set_test_context creates a new context put_savepoint_session_in_ctx puts into context a session that uses the same connection as db_session_test , but if you commit in this session, then the transaction will not be committed, but save point will be released from context_async_sqlalchemy.test_utils import ( put_savepoint_session_in_ctx, set_test_context, ) @pytest_asyncio.fixture(autouse=True) async def db_session_override( db_session_test: AsyncSession, ) -> AsyncGenerator[None, None]: \"\"\" The key thing about these tests is that we override the context in advance. The middleware has a special check that won't initialize the context if it already exists. \"\"\" async with set_test_context(): async with put_savepoint_session_in_ctx(connection, db_session_test): yield","title":"Override context"}]}
\ No newline at end of file
+{"config":{"indexing":"full","lang":["en"],"min_search_length":3,"prebuild_index":false,"separator":"[\\s\\-]+"},"docs":[{"location":"","text":"context-async-sqlalchemy No AI was used in the creation of this library. SOURCE CODE Provides a super convenient way to work with SQLAlchemy in asynchronous applications. It handles the lifecycle management of the engine, sessions, and transactions. The main goal is to provide quick and easy access to a session, without worrying about opening or closing it when it\u2019s not necessary. Key features: Extremely easy to use Automatically manages the lifecycle of the engine, sessions, and transactions (autocommit / autorollback) Does not interfere with manually opening or closing sessions and transactions when needed Framework-agnostic - works with any web framework Not a wrapper around SQLAlchemy Convenient for testing Runtime host switching Supports multiple databases and multiple sessions per database Provides tools for running concurrent SQL queries Fully lazy initialization What does usage look like? from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it returns the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically How it works Here is a very simplified diagram of how everything works: Before executing your code, the middleware will prepare a container in which the sessions required by your code will be stored. The container is saved in contextvars Your code accesses the library to create new sessions and retrieve existing ones After your code, middleware will automatically commit or roll back open transactions. Closes open sessions and clears the context. The library also provides the ability to commit, rollback, and close at any time, without waiting for the end of the request, without any problems.","title":"context-async-sqlalchemy"},{"location":"#context-async-sqlalchemy","text":"No AI was used in the creation of this library. SOURCE CODE Provides a super convenient way to work with SQLAlchemy in asynchronous applications. It handles the lifecycle management of the engine, sessions, and transactions. The main goal is to provide quick and easy access to a session, without worrying about opening or closing it when it\u2019s not necessary. Key features: Extremely easy to use Automatically manages the lifecycle of the engine, sessions, and transactions (autocommit / autorollback) Does not interfere with manually opening or closing sessions and transactions when needed Framework-agnostic - works with any web framework Not a wrapper around SQLAlchemy Convenient for testing Runtime host switching Supports multiple databases and multiple sessions per database Provides tools for running concurrent SQL queries Fully lazy initialization","title":"context-async-sqlalchemy"},{"location":"#what-does-usage-look-like","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically","title":"What does usage look like?"},{"location":"#how-it-works","text":"Here is a very simplified diagram of how everything works: Before executing your code, the middleware will prepare a container in which the sessions required by your code will be stored. The container is saved in contextvars Your code accesses the library to create new sessions and retrieve existing ones After your code, middleware will automatically commit or roll back open transactions. Closes open sessions and clears the context. The library also provides the ability to commit, rollback, and close at any time, without waiting for the end of the request, without any problems.","title":"How it works"},{"location":"api/","text":"API Reference DBConnect DBConnect is responsible for managing the engine and the session_maker. You need to define two factories. Optionally, you can specify a host to connect to. You can also specify a handler that runs before a session is created - this handler can be used to connect to the host for the first time or to reconnect to a new one. init def __init__( self: DBConnect, engine_creator: EngineCreatorFunc, session_maker_creator: SessionMakerCreatorFunc, host: str | None = None, before_create_session_handler: AsyncFunc | None = None, ) -> None: engine_creator is a factory function for creating engines. It\u2019s an asynchronous callable that takes a host as input and returns an async engine. Example session_maker_creator is a factory function for creating an asynchronous session_maker. It\u2019s an asynchronous callable that takes an async engine as input and returns an async session_maker. Example host is an optional parameter. You can specify only this parameter to make your connection always work with a single host, without dynamic switching. However, you can still change the host in the handler if needed - it won\u2019t cause any issues. before_create_session_handler is a handler that allows you to execute custom logic before creating a session. For example, you can switch the host to another one - this is useful for dynamically determining the master if the previous master fails and a replica takes its place. The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing. Example: async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) connect async def connect(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. This method doesn\u2019t need to be called explicitly. If it isn\u2019t called, the first session request will automatically establish the connection. change_host async def change_host(self: DBConnect, host: str) -> None: Establishes a connection to the specified host, but first checks under a lock that the currently connected host is different from the target host. create_session async def create_session(self: DBConnect) -> AsyncSession: Creates a new session. Used internally by the library - you\u2019ll probably never need to call it directly, but it\u2019s good to know it exists. session_maker async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]: Provides access to the session_maker currently used to create sessions. close async def close(self: DBConnect) -> None: Completely closes and cleans up all resources, freeing the connection pool. This should be called at the end of your application\u2019s lifecycle. Middlewares Most of the work - and the \u201cmagic\u201d - happens inside the middleware. You can check out how it works and implement your own. Fastapi from context_async_sqlalchemy.fastapi_utils import ( fastapi_http_db_session_middleware, add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware ) Starlette from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, starlette_http_db_session_middleware, StarletteHTTPDBSessionMiddleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware ) # OR app.add_middleware(StarletteHTTPDBSessionMiddleware) Pure ASGI from context_async_sqlalchemy import ( ASGIHTTPDBSessionMiddleware, ) app = SomeASGIApp(...) app.add_middleware(ASGIHTTPDBSessionMiddleware) Sessions Here are the functions you\u2019ll use most often from the library. They allow you to work with sessions directly from your asynchronous code. db_session async def db_session(connect: DBConnect) -> AsyncSession: The most important function for obtaining a session in your code. When called for the first time, it returns a new session; subsequent calls return the same session. atomic_db_session @asynccontextmanager async def atomic_db_session( connect: DBConnect, current_transaction: Literal[\"commit\", \"rollback\", \"append\", \"raise\"] = \"commit\", ) -> AsyncGenerator[AsyncSession, None]: A context manager that can be used to wrap another function which uses a context session, making that call isolated within its own transaction. There are several options that define how the function will handle an already open transaction. current_transaction: - commit - commits the open transaction and starts a new one - rollback - rolls back the open transaction and starts a new one - append - continues using the current transaction and commits it - raise - raises an InvalidRequestError commit_db_session async def commit_db_session(connect: DBConnect) -> None: Commits the active session, if there is one. rollback_db_session async def rollback_db_session(connect: DBConnect) -> None: Rollbacks the active session, if there is one. close_db_session async def close_db_session(connect: DBConnect) -> None: Closes the current context session. The connection is returned to the pool. If you close an uncommitted transaction, the connection will be rolled back. This is useful if, for example, at the beginning of the handle a database query is needed, and then there is some other long-term work and you don't want to keep the connection opened. new_non_ctx_session @asynccontextmanager async def new_non_ctx_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session without placing it in a context. It's used for manual session management when you don't want to use a context. new_non_ctx_atomic_session @asynccontextmanager async def new_non_ctx_atomic_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session with a new transaction that isn't placed in a context. It's used for manual session management when you don't want to use a context. Context run_in_new_ctx async def run_in_new_ctx( callable_func: AsyncCallable[AsyncCallableResult], *args: Any, **kwargs: Any, ) -> AsyncCallableResult: Runs a function in a new context with new sessions that have their own connection. It will commit the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction will be rolled back. The intended use is to run multiple database queries concurrently. example of use: await asyncio.gather( your_function_with_db_session(...), run_in_new_ctx( your_function_with_db_session, some_arg, some_kwarg=123, ), run_in_new_ctx(your_function_with_db_session, ...), ) Testing You can read more about testing here: Testing rollback_session @asynccontextmanager async def rollback_session( connection: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that creates a session which is automatically rolled back at the end. It\u2019s intended for use in fixtures to execute SQL queries during tests. set_test_context @asynccontextmanager async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]: A context manager that creates a new context in which you can place a dedicated test session. It\u2019s intended for use in tests where the test and the application share a single transaction. Use auto_close=False if you\u2019re using a test session and transaction that you close manually elsewhere in your code. Use auto_close=True if, for example, you want to call a function in a test that uses a context bypassing the middleware, and you want all sessions to be closed automatically. put_savepoint_session_in_ctx async def put_savepoint_session_in_ctx( connection: DBConnect, session: AsyncSession, ) -> AsyncGenerator[None, None]: Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. It is important to use this function inside set_test_context.","title":"API Reference"},{"location":"api/#api-reference","text":"","title":"API Reference"},{"location":"api/#dbconnect","text":"DBConnect is responsible for managing the engine and the session_maker. You need to define two factories. Optionally, you can specify a host to connect to. You can also specify a handler that runs before a session is created - this handler can be used to connect to the host for the first time or to reconnect to a new one.","title":"DBConnect"},{"location":"api/#init","text":"def __init__( self: DBConnect, engine_creator: EngineCreatorFunc, session_maker_creator: SessionMakerCreatorFunc, host: str | None = None, before_create_session_handler: AsyncFunc | None = None, ) -> None: engine_creator is a factory function for creating engines. It\u2019s an asynchronous callable that takes a host as input and returns an async engine. Example session_maker_creator is a factory function for creating an asynchronous session_maker. It\u2019s an asynchronous callable that takes an async engine as input and returns an async session_maker. Example host is an optional parameter. You can specify only this parameter to make your connection always work with a single host, without dynamic switching. However, you can still change the host in the handler if needed - it won\u2019t cause any issues. before_create_session_handler is a handler that allows you to execute custom logic before creating a session. For example, you can switch the host to another one - this is useful for dynamically determining the master if the previous master fails and a replica takes its place. The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing. Example: async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host)","title":"init"},{"location":"api/#connect","text":"async def connect(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. This method doesn\u2019t need to be called explicitly. If it isn\u2019t called, the first session request will automatically establish the connection.","title":"connect"},{"location":"api/#change_host","text":"async def change_host(self: DBConnect, host: str) -> None: Establishes a connection to the specified host, but first checks under a lock that the currently connected host is different from the target host.","title":"change_host"},{"location":"api/#create_session","text":"async def create_session(self: DBConnect) -> AsyncSession: Creates a new session. Used internally by the library - you\u2019ll probably never need to call it directly, but it\u2019s good to know it exists.","title":"create_session"},{"location":"api/#session_maker","text":"async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]: Provides access to the session_maker currently used to create sessions.","title":"session_maker"},{"location":"api/#close","text":"async def close(self: DBConnect) -> None: Completely closes and cleans up all resources, freeing the connection pool. This should be called at the end of your application\u2019s lifecycle.","title":"close"},{"location":"api/#middlewares","text":"Most of the work - and the \u201cmagic\u201d - happens inside the middleware. You can check out how it works and implement your own.","title":"Middlewares"},{"location":"api/#fastapi","text":"from context_async_sqlalchemy.fastapi_utils import ( fastapi_http_db_session_middleware, add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware )","title":"Fastapi"},{"location":"api/#starlette","text":"from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, starlette_http_db_session_middleware, StarletteHTTPDBSessionMiddleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware ) # OR app.add_middleware(StarletteHTTPDBSessionMiddleware)","title":"Starlette"},{"location":"api/#pure-asgi","text":"from context_async_sqlalchemy import ( ASGIHTTPDBSessionMiddleware, ) app = SomeASGIApp(...) app.add_middleware(ASGIHTTPDBSessionMiddleware)","title":"Pure ASGI"},{"location":"api/#sessions","text":"Here are the functions you\u2019ll use most often from the library. They allow you to work with sessions directly from your asynchronous code.","title":"Sessions"},{"location":"api/#db_session","text":"async def db_session(connect: DBConnect) -> AsyncSession: The most important function for obtaining a session in your code. When called for the first time, it returns a new session; subsequent calls return the same session.","title":"db_session"},{"location":"api/#atomic_db_session","text":"@asynccontextmanager async def atomic_db_session( connect: DBConnect, current_transaction: Literal[\"commit\", \"rollback\", \"append\", \"raise\"] = \"commit\", ) -> AsyncGenerator[AsyncSession, None]: A context manager that can be used to wrap another function which uses a context session, making that call isolated within its own transaction. There are several options that define how the function will handle an already open transaction. current_transaction: - commit - commits the open transaction and starts a new one - rollback - rolls back the open transaction and starts a new one - append - continues using the current transaction and commits it - raise - raises an InvalidRequestError","title":"atomic_db_session"},{"location":"api/#commit_db_session","text":"async def commit_db_session(connect: DBConnect) -> None: Commits the active session, if there is one.","title":"commit_db_session"},{"location":"api/#rollback_db_session","text":"async def rollback_db_session(connect: DBConnect) -> None: Rollbacks the active session, if there is one.","title":"rollback_db_session"},{"location":"api/#close_db_session","text":"async def close_db_session(connect: DBConnect) -> None: Closes the current context session. The connection is returned to the pool. If you close an uncommitted transaction, the connection will be rolled back. This is useful if, for example, at the beginning of the handle a database query is needed, and then there is some other long-term work and you don't want to keep the connection opened.","title":"close_db_session"},{"location":"api/#new_non_ctx_session","text":"@asynccontextmanager async def new_non_ctx_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session without placing it in a context. It's used for manual session management when you don't want to use a context.","title":"new_non_ctx_session"},{"location":"api/#new_non_ctx_atomic_session","text":"@asynccontextmanager async def new_non_ctx_atomic_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session with a new transaction that isn't placed in a context. It's used for manual session management when you don't want to use a context.","title":"new_non_ctx_atomic_session"},{"location":"api/#context","text":"","title":"Context"},{"location":"api/#run_in_new_ctx","text":"async def run_in_new_ctx( callable_func: AsyncCallable[AsyncCallableResult], *args: Any, **kwargs: Any, ) -> AsyncCallableResult: Runs a function in a new context with new sessions that have their own connection. It will commit the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction will be rolled back. The intended use is to run multiple database queries concurrently. example of use: await asyncio.gather( your_function_with_db_session(...), run_in_new_ctx( your_function_with_db_session, some_arg, some_kwarg=123, ), run_in_new_ctx(your_function_with_db_session, ...), )","title":"run_in_new_ctx"},{"location":"api/#testing","text":"You can read more about testing here: Testing","title":"Testing"},{"location":"api/#rollback_session","text":"@asynccontextmanager async def rollback_session( connection: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that creates a session which is automatically rolled back at the end. It\u2019s intended for use in fixtures to execute SQL queries during tests.","title":"rollback_session"},{"location":"api/#set_test_context","text":"@asynccontextmanager async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]: A context manager that creates a new context in which you can place a dedicated test session. It\u2019s intended for use in tests where the test and the application share a single transaction. Use auto_close=False if you\u2019re using a test session and transaction that you close manually elsewhere in your code. Use auto_close=True if, for example, you want to call a function in a test that uses a context bypassing the middleware, and you want all sessions to be closed automatically.","title":"set_test_context"},{"location":"api/#put_savepoint_session_in_ctx","text":"async def put_savepoint_session_in_ctx( connection: DBConnect, session: AsyncSession, ) -> AsyncGenerator[None, None]: Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. It is important to use this function inside set_test_context.","title":"put_savepoint_session_in_ctx"},{"location":"concurrent_queries/","text":"Concurrent sql queries Concurrent query execution deserves special attention. In SQLAlchemy, you can\u2019t run multiple queries concurrently within the same session - you need to create a new one. The library provides two simple ways to execute queries concurrently: Run a function in a new context - run_in_new_ctx Create a new session that is completely independent of the current context - new_non_ctx_atomic_session or new_non_ctx_session import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commit the transaction optional await commit_db_session(connection) # manually close the session optional await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" While working with sessions, using context is optional \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" While working with sessions, using context is optional \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Concurrent SQL Queries"},{"location":"concurrent_queries/#concurrent-sql-queries","text":"Concurrent query execution deserves special attention. In SQLAlchemy, you can\u2019t run multiple queries concurrently within the same session - you need to create a new one. The library provides two simple ways to execute queries concurrently: Run a function in a new context - run_in_new_ctx Create a new session that is completely independent of the current context - new_non_ctx_atomic_session or new_non_ctx_session import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commit the transaction optional await commit_db_session(connection) # manually close the session optional await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Concurrent sql queries"},{"location":"examples/","text":"Usage examples You can see not only fragments of examples, but also web application examples . Basic usage from sqlalchemy import insert from context_async_sqlalchemy import db_session from ..database import connection from ..models import ExampleTable async def handler_with_db_session() -> None: \"\"\" An example of a typical handle that uses a context session to work with a database. Autocommit or autorollback occurs automatically at the end of a request (in middleware). \"\"\" # Created a session (no connection to the database yet) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # Commit will happen automatically Atomic from context_async_sqlalchemy import atomic_db_session, db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_atomic() -> None: \"\"\" Let's imagine you already have a function that works with a contextual session, and its use case calls autocommit at the end of the request. You want to reuse this function, but you need to commit immediately, rather than wait for the request to complete. \"\"\" # the transaction will be committed or rolled back automatically # using the context manager async with atomic_db_session(connection): await _insert_1() # This is a new transaction in the same connection await _insert_1() async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_atomic\" ) await session.execute(stmt) Manually close the transaction and session from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_close() -> None: \"\"\" but commits manually but commits manually and closes the session to release the connection. \"\"\" # new connect -> new transaction -> commit await _insert_1() # old connect -> new transaction -> commit -> close connect await _insert_2() # new connect -> new transaction await _insert_3() # same connect -> same transaction await _insert_3() # autocommit async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await session.commit() # or await commit_db_session() async def _insert_2() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await commit_db_session(connection) # We closed the session and returned the connection to the pool # This is useful if, for example, at the beginning of the handle a # database query is needed, and then there is some other long-term work # and you don't want to keep the connection opened. await close_db_session(connection) async def _insert_3() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) Multiple sessions and concurrent execution import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commit the transaction (optional) await commit_db_session(connection) # manually close the session (optional) await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit() Rollback from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_exception() -> None: \"\"\" let's imagine that an exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise Exception(\"Some exception\") # transaction rolls back automatically from fastapi import HTTPException from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_http_exception() -> None: \"\"\" let's imagine that an http exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise HTTPException(status_code=500) # transaction rolls back automatically by status code from context_async_sqlalchemy import db_session, rollback_db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_rollback() -> None: \"\"\" An example of a handle that uses a rollback \"\"\" # it's convenient this way await _insert() await rollback_db_session(connection) # but it's possible this way too await _insert() session = await db_session(connection) await session.rollback() async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Usage Examples"},{"location":"examples/#usage-examples","text":"You can see not only fragments of examples, but also web application examples .","title":"Usage examples"},{"location":"examples/#basic-usage","text":"from sqlalchemy import insert from context_async_sqlalchemy import db_session from ..database import connection from ..models import ExampleTable async def handler_with_db_session() -> None: \"\"\" An example of a typical handle that uses a context session to work with a database. Autocommit or autorollback occurs automatically at the end of a request (in middleware). \"\"\" # Created a session (no connection to the database yet) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # Commit will happen automatically","title":"Basic usage"},{"location":"examples/#atomic","text":"from context_async_sqlalchemy import atomic_db_session, db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_atomic() -> None: \"\"\" Let's imagine you already have a function that works with a contextual session, and its use case calls autocommit at the end of the request. You want to reuse this function, but you need to commit immediately, rather than wait for the request to complete. \"\"\" # the transaction will be committed or rolled back automatically # using the context manager async with atomic_db_session(connection): await _insert_1() # This is a new transaction in the same connection await _insert_1() async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_atomic\" ) await session.execute(stmt)","title":"Atomic"},{"location":"examples/#manually-close-the-transaction-and-session","text":"from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_close() -> None: \"\"\" An example of a handle that uses a session in context but commits manually and but commits manually and closes the session to release the connection. \"\"\" # new connect -> new transaction -> commit await _insert_1() # old connect -> new transaction -> commit -> close connect await _insert_2() # new connect -> new transaction await _insert_3() # same connect -> same transaction await _insert_3() # autocommit async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await session.commit() # or await commit_db_session() async def _insert_2() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await commit_db_session(connection) # We closed the session  returned the connection to the pool # This is useful if, for example, at the beginning of the handle a # database query is needed, and then there is some other long-term work # and you don't want to keep the connection opened. await close_db_session(connection) async def _insert_3() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Manually close the transaction and session"},{"location":"examples/#multiple-sessions-and-concurrent-execution","text":"import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commit the transaction (optional) await commit_db_session(connection) # manually close the session (optional)) await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Multiple sessions and concurrent execution"},{"location":"examples/#rollback","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_exception() -> None: \"\"\" let's imagine that an exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise Exception(\"Some exception\") # transaction rolls back automatically from fastapi import HTTPException from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_http_exception() -> None: \"\"\" let's imagine that an http exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise HTTPException(status_code=500) # transaction rolls back automatically by status code from context_async_sqlalchemy import db_session, rollback_db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_rollback() -> None: \"\"\" An example of a handle that uses a rollback \"\"\" # it's convenient this way await _insert() await rollback_db_session(connection) # but it's possible this way too await _insert() session = await db_session(connection) await session.rollback() async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Rollback"},{"location":"getting_started/","text":"Getting started Configure the connection to the database For example, for PostgreSQL - database.py: from sqlalchemy.ext.asyncio import ( async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine, ) from context_async_sqlalchemy import DBConnect def create_engine(host: str) -> AsyncEngine: \"\"\" database connection parameters. \"\"\" # In production code, you will probably take these parameters from env pg_user = \"krylosov-aa\" pg_password = \"\" pg_port = 6432 pg_db = \"test\" return create_async_engine( f\"postgresql+asyncpg://\" f\"{pg_user}:{pg_password}\" f\"@{host}:{pg_port}\" f\"/{pg_db}\", future=True, pool_pre_ping=True, ) def create_session_maker( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: \"\"\"session parameters\"\"\" return async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) connection = DBConnect( engine_creator=create_engine, session_maker_creator=create_session_maker, host=\"127.0.0.1\", # optional ) The host parameter is optional if you use a handler before creating a session - before_create_session_handler . In that case, you can dynamically set the host. Read more in Master/Replica or several databases at the same time Manage Database connection lifecycle Close resources at the end of your application's lifecycle. Example for FastAPI: from contextlib import asynccontextmanager from typing import Any, AsyncGenerator from fastapi import FastAPI from database import connection @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: \"\"\"Database connection lifecycle management\"\"\" yield await connection.close() # Close the engine if it was open Setup middleware Middleware handles the most important and complex part - managing context and sessions. You can use the ready-made middleware components: Pure ASGI from context_async_sqlalchemy import ASGIHTTPDBSessionMiddleware app.add_middleware(ASGIHTTPDBSessionMiddleware) FastAPI from context_async_sqlalchemy.fastapi_utils import ( add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) Starlette from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) Write own If there\u2019s no ready-made solution that fits your needs - don\u2019t worry! You can check out how it works and implement your own. Use it from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically Examples The repository includes an example integration with FastAPI, demonstrating various workflows: FastAPI example It also contains two types of test setups that you can use in your own projects. All library tests are included within the examples - because we aim to test the functionality not in isolation, but in the context of a real asynchronous web application. FastAPI tests example","title":"Getting Started"},{"location":"getting_started/#getting-started","text":"","title":"Getting started"},{"location":"getting_started/#configure-the-connection-to-the-database","text":"For example, for PostgreSQL - database.py: from sqlalchemy.ext.asyncio import ( async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine, ) from context_async_sqlalchemy import DBConnect def create_engine(host: str) -> AsyncEngine: \"\"\" database connection parameters. \"\"\" # In production code, you will probably take these parameters from env pg_user = \"krylosov-aa\" pg_password = \"\" pg_port = 6432 pg_db = \"test\" return create_async_engine( f\"postgresql+asyncpg://\" f\"{pg_user}:{pg_password}\" f\"@{host}:{pg_port}\" f\"/{pg_db}\", future=True, pool_pre_ping=True, ) def create_session_maker( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: \"\"\"session parameters\"\"\" return async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) connection = DBConnect( engine_creator=create_engine, session_maker_creator=create_session_maker, host=\"127.0.0.1\", # optional ) The host parameter is optional if you use a handler before creating a session - before_create_session_handler . In that case, you can dynamically set the host. Read more in Master/Replica or several databases at the same time","title":"Configure the connection to the database"},{"location":"getting_started/#manage-database-connection-lifecycle","text":"Close resources at the end of your application's lifecycle. Example for FastAPI: from contextlib import asynccontextmanager from typing import Any, AsyncGenerator from fastapi import FastAPI from database import connection @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: \"\"\"Database connection lifecycle management\"\"\" yield await connection.close() # Close the engine if it was open","title":"Manage Database connection lifecycle"},{"location":"getting_started/#setup-middleware","text":"Middleware handles the most important and complex part - managing context and sessions. You can use the ready-made middleware components:","title":"Setup middleware"},{"location":"getting_started/#pure-asgi","text":"from context_async_sqlalchemy import ASGIHTTPDBSessionMiddleware app.add_middleware(ASGIHTTPDBSessionMiddleware)","title":"Pure ASGI"},{"location":"getting_started/#fastapi","text":"from context_async_sqlalchemy.fastapi_utils import ( add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app)","title":"FastAPI"},{"location":"getting_started/#starlette","text":"from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app)","title":"Starlette"},{"location":"getting_started/#write-own","text":"If there\u2019s no ready-made solution that fits your needs - don\u2019t worry! You can check out how it works and implement your own.","title":"Write own"},{"location":"getting_started/#use-it","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically","title":"Use it"},{"location":"getting_started/#examples","text":"The repository includes an example integration with FastAPI, demonstrating various workflows: FastAPI example It also contains two types of test setups that you can use in your own projects. All library tests are included within the examples - because we aim to test the functionality not in isolation, but in the context of a real asynchronous web application. FastAPI tests example","title":"Examples"},{"location":"how_middleware_works/","text":"How middleware works Most of the work - and the \u201cmagic\u201d - happens inside the middleware. Here is a diagram describing how it works. At the beginning of a request, the middleware initializes a new asynchronous context. This asynchronous context is implemented using a contextvar. It holds a mutable container that stores sessions. A mutable container is used so that any coroutine, at any level, can create, modify, or close sessions, and those changes will affect the execution of the entire request. Whenever your code accesses the library\u2019s functionality, it interacts with this container. Finally, the middleware checks the container for any active sessions and open transactions. If transactions are open, they are either committed when the query execution is successful or rolled back if it fails. After that, all sessions are closed. That\u2019s precisely why you can safely close transactions and sessions early. The middleware simply works with whatever is in the container: if there\u2019s anything left, it will close it properly; if you\u2019ve already handled it yourself, the middleware only needs to reset the context. Code example The library aims to provide ready-made solutions so that you don\u2019t have to worry about these details - but they\u2019re not always available. So, let\u2019s take a look at how Starlette middleware works. You can use this example as a reference to implement your own. CODE from starlette.middleware.base import ( # type: ignore[attr-defined] Request, Response, RequestResponseEndpoint, BaseHTTPMiddleware, ) from context_async_sqlalchemy import ( init_db_session_ctx, is_context_initiated, reset_db_session_ctx, auto_commit_by_status_code, rollback_all_sessions, ) async def starlette_http_db_session_middleware( request: Request, call_next: RequestResponseEndpoint ) -> Response: \"\"\" Database session lifecycle management. The session itself is created on demand in db_session(). Transaction auto-commit is implemented if there is no exception and the response status is < 400. Otherwise, a rollback is performed. But you can commit or rollback manually in the handler. \"\"\" # Tests have different session management rules # so if the context variable is already set, we do nothing if is_context_initiated(): return await call_next(request) # We set the context here, meaning all child coroutines will receive the # same context. And even if a child coroutine requests the # session first, the container itself is shared, and this coroutine will # add the session to container = shared context. token = init_db_session_ctx() try: response = await call_next(request) # using the status code, we decide to commit or rollback all sessions await auto_commit_by_status_code(response.status_code) return response except Exception: # If an exception occurs, we roll all sessions back await rollback_all_sessions() raise finally: # Close all sessions and clear the context await reset_db_session_ctx(token)","title":"How middleware works"},{"location":"how_middleware_works/#how-middleware-works","text":"Most of the work - and the \u201cmagic\u201d - happens inside the middleware. Here is a diagram describing how it works. At the beginning of a request, the middleware initializes a new asynchronous context. This asynchronous context is implemented using a contextvar. It holds a mutable container that stores sessions. A mutable container is used so that any coroutine, at any level, can create, modify, or close sessions, and those changes will affect the execution of the entire request. Whenever your code accesses the library\u2019s functionality, it interacts with this container. Finally, the middleware checks the container for any active sessions and open transactions. If transactions are open, they are either committed when the query execution is successful or rolled back if it fails. After that, all sessions are closed. That\u2019s precisely why you can safely close transactions and sessions early. The middleware simply works with whatever is in the container: if there\u2019s anything left, it will close it properly; if you\u2019ve already handled it yourself, the middleware only needs to reset the context.","title":"How middleware works"},{"location":"how_middleware_works/#code-example","text":"The library aims to provide ready-made solutions so that you don\u2019t have to worry about these details - but they\u2019re not always available. So, let\u2019s take a look at how Starlette middleware works. You can use this example as a reference to implement your own. CODE from starlette.middleware.base import ( # type: ignore[attr-defined] Request, Response, RequestResponseEndpoint, BaseHTTPMiddleware, ) from context_async_sqlalchemy import ( init_db_session_ctx, is_context_initiated, reset_db_session_ctx, auto_commit_by_status_code, rollback_all_sessions, ) async def starlette_http_db_session_middleware( request: Request, call_next: RequestResponseEndpoint ) -> Response: \"\"\" Database session lifecycle management. The session itself is created on demand in db_session(). Transaction auto-commit is implemented if there is no exception and the response status is < 400. Otherwise, a rollback is performed. But you can commit or rollback manually in the handler. \"\"\" # Tests have different session management rules # so if the context variable is already set, we do nothing if is_context_initiated(): return await call_next(request) # We set the context here, meaning all child coroutines will receive the # same context. And even if a child coroutine requests the # session first, the container itself is shared, and this coroutine will # add the session to container = shared context. token = init_db_session_ctx() try: response = await call_next(request) # using the status code, we decide to commit or rollback all sessions await auto_commit_by_status_code(response.status_code) return response except Exception: # If an exception occurs, we roll all sessions back await rollback_all_sessions() raise finally: # Close all sessions and clear the context await reset_db_session_ctx(token)","title":"Code example"},{"location":"master_replica/","text":"Master/Replica or several databases at the same time This is why db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously - for example, with both a master and a replica. DBConnect can also accept factory functions instead of ready-made objects, making it easy to switch hosts when needed. For example, libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. You\u2019ll need third-party functionality to determine which host is the master or the replica. Hopefully, I\u2019ll be able to provide a ready-made solution for this soon. . The engine is not created immediately when DBConnect is initialized - it is created only on the first request. The library uses lazy initialization in many places. from context_async_sqlalchemy import DBConnect from master_replica_helper import get_master, get_replica async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) master = DBConnect( ..., before_create_session_handler=renew_master_connect, ) async def renew_replica_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the replica has changed\"\"\" replica_host = await get_replica() if replica_host != connect.host: await connect.change_host(replica_host) replica = DBConnect( ..., before_create_session_handler=renew_replica_connect, )","title":"Master/Replica or several databases at the same time"},{"location":"master_replica/#masterreplica-or-several-databases-at-the-same-time","text":"This is why db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously - for example, with both a master and a replica. DBConnect can also accept factory functions instead of ready-made objects, making it easy to switch hosts when needed. For example, libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. You\u2019ll need third-party functionality to determine which host is the master or the replica. Hopefully, I\u2019ll be able to provide a ready-made solution for this soon. . The engine is not created immediately when DBConnect is initialized - it is created only on the first request. The library uses lazy initialization in many places. from context_async_sqlalchemy import DBConnect from master_replica_helper import get_master, get_replica async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) master = DBConnect( ..., before_create_session_handler=renew_master_connect, ) async def renew_replica_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the replica has changed\"\"\" replica_host = await get_replica() if replica_host != connect.host: await connect.change_host(replica_host) replica = DBConnect( ..., before_create_session_handler=renew_replica_connect, )","title":"Master/Replica or several databases at the same time"},{"location":"problem/","text":"The Problem It Solves SQLAlchemy uses an engine that manages the connection pool. The engine must remain active for as long as the application is running, so it can quickly provide ready-to-use connections whenever the application needs them. In the application, we work with sessions. A session obtains a single connection from the pool and should have a short lifespan - usually lasting only for the duration of a single request, or even less. Let's see what existing solutions are available to manage sessions: Manual solution This is how the code ends up being duplicated, and two connections and two transactions are used - even though in many cases only one connection and one transaction are actually needed. @app.post(\"/users/\") async def create_user(name): await insert_user(name) await insert_user_profile(name) async def insert_user(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) async def insert_user_profile(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) You can move the duplicated code to a higher level, which will result in using a single connection and a single transaction. @app.post(\"/users/\") async def create_user(name:): async with get_async_session() as session: async with session.begin(): await insert_user(name, session) await insert_user_profile(name, session) async def insert_user(name, session): await session.execute(stmt) async def insert_user_profile(name, session): await session.execute(stmt) But if you look at it more broadly, the code duplication doesn\u2019t actually go away - you still have to do this in every handler. @app.post(\"/dogs/\") async def create_dog(name): async with get_async_session() as session: async with session.begin(): ... @app.post(\"/cats\") async def create_cat(name): async with get_async_session() as session: async with session.begin(): ... You also have to set everything up yourself. No ready-made integration solutions are used - which means freedom on one hand, but a lot of code on the other. Dependency You can use a dependency. For example, in FastAPI, it looks like this: async def get_atomic_session(): async with session_maker() as session: async with session.begin(): yield session @app.post(\"/dogs/\") async def create_dog(name, session=Depends(get_atomic_session)): ... @app.post(\"/cats/\") async def create_cat(name, session=Depends(get_atomic_session)): ... There are two problems here: You can\u2019t close the session or transaction prematurely, because the dependency is responsible for that. The session has to be passed all the way down the stack to the place where it\u2019s actually needed. By the way, there\u2019s no ready-made solution for integrating with the framework - you have to implement the dependency yourself. Wrappers over sqlalachemy There are various wrappers that often provide more convenient integration. Litestar, for example, has the same advantages and disadvantages as using dependencies: config = SQLAlchemyAsyncConfig( connection_string=URL ) sqlalchemy_plugin = SQLAlchemyInitPlugin(config) class UserRepository(SQLAlchemyAsyncRepository[User]): model_type = User @post(\"/users\") async def create_user(data: User, repo: UserRepository): await repo.add(data) # <- insert into User Here\u2019s an example using Ormar: class BaseMeta(ormar.ModelMeta): ... class User(ormar.Model): ... @app.post(\"/users/\") async def create_user(name): await User.objects.create(name=name) The main problem with wrappers is that they require developers to learn something new. They introduce their own syntax - so even if a developer is familiar with SQLAlchemy, it doesn\u2019t mean they\u2019ll understand the wrapper. Wrappers are also often designed for convenience when working with simple CRUD operations, but writing complex SQL queries with them can be very challenging. Solution And this library solves all of these issues: Easy integration with web frameworks Automatic management of engine, session, and transaction lifecycles Ability to manually close a session at any time, without waiting for the end of process Access to a session from the context only where it\u2019s actually needed","title":"The Problem It Solves"},{"location":"problem/#the-problem-it-solves","text":"SQLAlchemy uses an engine that manages the connection pool. The engine must remain active for as long as the application is running, so it can quickly provide ready-to-use connections whenever the application needs them. In the application, we work with sessions. A session obtains a single connection from the pool and should have a short lifespan - usually lasting only for the duration of a single request, or even less. Let's see what existing solutions are available to manage sessions:","title":"The Problem It Solves"},{"location":"problem/#manual-solution","text":"This is how the code ends up being duplicated, and two connections and two transactions are used - even though in many cases only one connection and one transaction are actually needed. @app.post(\"/users/\") async def create_user(name): await insert_user(name) await insert_user_profile(name) async def insert_user(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) async def insert_user_profile(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) You can move the duplicated code to a higher level, which will result in using a single connection and a single transaction. @app.post(\"/users/\") async def create_user(name:): async with get_async_session() as session: async with session.begin(): await insert_user(name, session) await insert_user_profile(name, session) async def insert_user(name, session): await session.execute(stmt) async def insert_user_profile(name, session): await session.execute(stmt) But if you look at it more broadly, the code duplication doesn\u2019t actually go away - you still have to do this in every handler. @app.post(\"/dogs/\") async def create_dog(name): async with get_async_session() as session: async with session.begin(): ... @app.post(\"/cats\") async def create_cat(name): async with get_async_session() as session: async with session.begin(): ... You also have to set everything up yourself. No ready-made integration solutions are used - which means freedom on one hand, but a lot of code on the other.","title":"Manual solution"},{"location":"problem/#dependency","text":"You can use a dependency. For example, in FastAPI, it looks like this: async def get_atomic_session(): async with session_maker() as session: async with session.begin(): yield session @app.post(\"/dogs/\") async def create_dog(name, session=Depends(get_atomic_session)): ... @app.post(\"/cats/\") async def create_cat(name, session=Depends(get_atomic_session)): ... There are two problems here: You can\u2019t close the session or transaction prematurely, because the dependency is responsible for that. The session has to be passed all the way down the stack to the place where it\u2019s actually needed. By the way, there\u2019s no ready-made solution for integrating with the framework - you have to implement the dependency yourself.","title":"Dependency"},{"location":"problem/#wrappers-over-sqlalachemy","text":"There are various wrappers that often provide more convenient integration. Litestar, for example, has the same advantages and disadvantages as using dependencies: config = SQLAlchemyAsyncConfig( connection_string=URL ) sqlalchemy_plugin = SQLAlchemyInitPlugin(config) class UserRepository(SQLAlchemyAsyncRepository[User]): model_type = User @post(\"/users\") async def create_user(data: User, repo: UserRepository): await repo.add(data) # <- insert into User Here\u2019s an example using Ormar: class BaseMeta(ormar.ModelMeta): ... class User(ormar.Model): ... @app.post(\"/users/\") async def create_user(name): await User.objects.create(name=name) The main problem with wrappers is that they require developers to learn something new. They introduce their own syntax - so even if a developer is familiar with SQLAlchemy, it doesn\u2019t mean they\u2019ll understand the wrapper. Wrappers are also often designed for convenience when working with simple CRUD operations, but writing complex SQL queries with them can be very challenging.","title":"Wrappers over sqlalachemy"},{"location":"problem/#solution","text":"And this library solves all of these issues: Easy integration with web frameworks Automatic management of engine, session, and transaction lifecycles Ability to manually close a session at any time, without waiting for the end of process Access to a session from the context only where it\u2019s actually needed","title":"Solution"},{"location":"testing/","text":"Testing When testing with a real database, one important problem needs to be solved - ensuring data isolation between tests. There are basically two approaches: Separate sessions The test has its own session that it uses to prepare data and verify results after execution. The application also has its own session. Data isolation is achieved by clearing all tables at the end of each test (and once before running all tests). Shared session and transaction The test and the application share the same session and transaction. Data isolation is achieved by rolling back the transaction at the end of the test. Personally, I prefer the first option, because it is a more \"honest\" way to test the application. We can verify how it handles sessions and transactions on its own. It\u2019s also convenient to inspect the database state when a test is paused. Sometimes, there are complex session management scenarios (for example, concurrent query execution) where other types of testing are either impossible or very difficult. The main disadvantage of this approach is the slower execution speed. Since we clear all tables after each test, this process takes additional time. This is where the second approach comes in - its main advantage is speed, as rolling back a transaction is very fast. In my projects, I use both approaches at the same time: For most tests with simple or common logic, I use a shared transaction for the test and the application For more complex cases, or ones that cannot be tested this way, I use separate transactions. This combination allows for both good performance and convenient testing. The library provides several utilities that can be used in tests - for example, in fixtures. They help create tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback. You can see these capabilities in the examples: Here are tests with a common transaction between the application and the tests. And here's an example with different transactions. Create session with autorollback rollback_session creates a session that always rolls back automatically. from context_async_sqlalchemy.test_utils import rollback_session @pytest_asyncio.fixture async def db_session_test() -> AsyncGenerator[AsyncSession, None]: \"\"\"The session that is used inside the test\"\"\" async with rollback_session(connection) as session: yield session Override context set_test_context creates a new context put_savepoint_session_in_ctx puts into context a session that uses the same connection as db_session_test , but if you commit in this session, then the transaction will not be committed, but save point will be released from context_async_sqlalchemy.test_utils import ( put_savepoint_session_in_ctx, set_test_context, ) @pytest_asyncio.fixture(autouse=True) async def db_session_override( db_session_test: AsyncSession, ) -> AsyncGenerator[None, None]: \"\"\" The key thing about these tests is that we override the context in advance. The middleware has a special check that won't initialize the context if it already exists. \"\"\" async with set_test_context(): async with put_savepoint_session_in_ctx(connection, db_session_test): yield","title":"Testing"},{"location":"testing/#testing","text":"When testing with a real database, one important problem needs to be solved - ensuring data isolation between tests. There are basically two approaches: Separate sessions The test has its own session that it uses to prepare data and verify results after execution. The application also has its own session. Data isolation is achieved by clearing all tables at the end of each test (and once before running all tests). Shared session and transaction The test and the application share the same session and transaction. Data isolation is achieved by rolling back the transaction at the end of the test. Personally, I prefer the first option, because it is a more \"honest\" way to test the application. We can verify how it handles sessions and transactions on its own. It\u2019s also convenient to inspect the database state when a test is paused. Sometimes, there are complex session management scenarios (for example, concurrent query execution) where other types of testing are either impossible or very difficult. The main disadvantage of this approach is the slower execution speed. Since we clear all tables after each test, this process takes additional time. This is where the second approach comes in - its main advantage is speed, as rolling back a transaction is very fast. In my projects, I use both approaches at the same time: For most tests with simple or common logic, I use a shared transaction for the test and the application For more complex cases, or ones that cannot be tested this way, I use separate transactions. This combination allows for both good performance and convenient testing. The library provides several utilities that can be used in tests - for example, in fixtures. They help create tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback. You can see these capabilities in the examples: Here are tests with a common transaction between the application and the tests. And here's an example with different transactions.","title":"Testing"},{"location":"testing/#create-session-with-autorollback","text":"rollback_session creates a session that always rolls back automatically. from context_async_sqlalchemy.test_utils import rollback_session @pytest_asyncio.fixture async def db_session_test() -> AsyncGenerator[AsyncSession, None]: \"\"\"The session that is used inside the test\"\"\" async with rollback_session(connection) as session: yield session","title":"Create session with autorollback"},{"location":"testing/#override-context","text":"set_test_context creates a new context put_savepoint_session_in_ctx puts into context a session that uses the same connection as db_session_test , but if you commit in this session, then the transaction will not be committed, but save point will be released from context_async_sqlalchemy.test_utils import ( put_savepoint_session_in_ctx, set_test_context, ) @pytest_asyncio.fixture(autouse=True) async def db_session_override( db_session_test: AsyncSession, ) -> AsyncGenerator[None, None]: \"\"\" The key thing about these tests is that we override the context in advance. The middleware has a special check that won't initialize the context if it already exists. \"\"\" async with set_test_context(): async with put_savepoint_session_in_ctx(connection, db_session_test): yield","title":"Override context"}]}
diff --git a/docs/testing/index.html b/docs/testing/index.html
index 4ebffa6..b45413a 100644
--- a/docs/testing/index.html
+++ b/docs/testing/index.html
@@ -106,47 +106,30 @@
             

Testing

-

When testing with a real database, one important problem needs to be -solved - ensuring data isolation between tests.

-

There are basically two approaches:

-
    -
  1. Separate sessions
  2. -
-

The test has its own session that it uses to prepare data and verify -results after execution. -The application also has its own session. -Data isolation is achieved by clearing all tables at the end of each test -(and once before running all tests).

-
    -
  1. Shared session and transaction -The test and the application share the same session and transaction. -Data isolation is achieved by rolling back the transaction at -the end of the test.
  2. -
-

Personally, I prefer the first option, because it is a more "honest" way -to test the application. -We can verify how it handles sessions and transactions on its own. -It’s also convenient to inspect the database state when a test is paused.

-

Sometimes, there are complex session management scenarios (for example, -concurrent query execution) where other types of testing are either -impossible or very difficult.

-

The main disadvantage of this approach is the slower execution speed. -Since we clear all tables after each test, this process takes additional time.

-

This is where the second approach comes in - its main advantage is speed, -as rolling back a transaction is very fast.

+

When testing with a real database, one important problem needs to be solved: ensuring data isolation between tests. There are two approaches: +

    +
  1. Separate sessions: The test uses its own session that prepares data and verifies results after execution. The application also has its own session. +
      +
    • It is a more "honest" way to test the application.
    • +
    • Verifies how it handles sessions and transactions automatically.
    • +
    • The ability to inspect the database state when the test is paused.
    • +
    • Complex session management scenarios make other test methods difficult or impossible (e.g., concurrent query execution).
    • +
    +
  2. Shared session and transaction: The test and the application share the same session and transaction. +
      +
    • Rolling back transactions is faster and takes less time than starting a new session.
    • +
    +
+

+

In my projects, I use both approaches at the same time:

    -
  • For most tests with simple or common logic, I use a shared transaction -for the test and the application
  • -
  • For more complex cases, or ones that cannot be tested this way, -I use separate transactions.
  • +
  • For most tests with simple or common logic, I use a shared transaction for the test and the application
  • +
  • For more complex cases, or ones that cannot be tested with separate sessions, I use separate transactions.
-

This combination allows for both good performance and convenient testing.

-

The library provides several utilities that can be used in tests - for -example, in fixtures. -They help create tests that share a common transaction between the test -and the application, so data isolation between tests is achieved through -fast transaction rollback.

+

The library provides several utilities that can be used in tests (e.g., fixtures). They create tests that share a common transaction between the test +and the application. You achieve data isolation by rolling back transactions. +

You can see these capabilities in the examples:

Here are tests with a common transaction between the application and the tests.

diff --git a/docs_sources/docs/api.md b/docs_sources/docs/api.md index 422aba1..257e8a0 100644 --- a/docs_sources/docs/api.md +++ b/docs_sources/docs/api.md @@ -88,8 +88,7 @@ from the target host. ```python async def create_session(self: DBConnect) -> AsyncSession: ``` -Creates a new session the library uses internally. -You never need to call it directly. (Only maybe in some special cases.) +Creates a new session. Used internally by the library. You may never need to call it directly. --- ### session_maker @@ -196,7 +195,8 @@ async def atomic_db_session( A context manager you can use to wrap another function which uses a context session, making that call isolated within its own transaction. -Several options define how a function handles an open transaction. +There are several options that define how the function will handle +an open transaction. current_transaction: @@ -219,7 +219,7 @@ Commits the active session. ```python async def rollback_db_session(connect: DBConnect) -> None: ``` -Rolls back an active session. +Rollbacks the active session. --- @@ -227,11 +227,10 @@ Rolls back an active session. ```python async def close_db_session(connect: DBConnect) -> None: ``` -Closes the current context session and returns the connection to the pool. -If you close an uncommitted transaction, the connection rolls back +Closes the current context session. The connection is returned to the pool. +If you close an uncommitted transaction, the connection rolls back. -This is useful when you need to run a database query at the start of the -handler, then continue working over time without keeping the connection open. +Use if you have more work you need to complete without keeping the connection open. ---