diff --git a/README.md b/README.md index f790b98..6e21a0c 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ transactions - Not a wrapper around SQLAlchemy - Convenient for testing - Runtime host switching -- Supports multiple databases and multiple sessions per database +- Supports multiple databases and sessions per database - Provides tools for running concurrent SQL queries - Fully lazy initialization @@ -37,25 +37,25 @@ from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database -from models import ExampleTable # just some model for example +from models import ExampleTable # a model for example async def some_func() -> None: - # Created a session (no connection to the database yet) + # Creates a session with no connection to the database yet session = await db_session(connection) stmt = insert(ExampleTable).values(text="example_with_db_session") - # On the first request, a connection and transaction were opened + # A connection and transaction open in the first request. await session.execute(stmt) - # If you call db_session again, it will return the same session + # If you call db_session again, it returns the same session # even in child coroutines. session = await db_session(connection) - # The second request will use the same connection and the same transaction + # The second request uses the same connection and the same transaction await session.execute(stmt) - # The commit and closing of the session will occur automatically + # The commit and closing of the session occurs automatically ``` ## How it works @@ -70,5 +70,4 @@ existing ones. 3. The middleware automatically commits or rolls back open transactions. It also closes open sessions and clears the context. -The library also provides the ability to commit, roll back, and close at any -time without waiting for the end of the request. \ No newline at end of file +The library provides the ability to commit, roll back, and close at any \ No newline at end of file diff --git a/context_async_sqlalchemy/session.py b/context_async_sqlalchemy/session.py index 1603a4e..f7181ed 100644 --- a/context_async_sqlalchemy/session.py +++ b/context_async_sqlalchemy/session.py @@ -40,12 +40,12 @@ async def atomic_db_session( current_transaction: _current_transaction_choices = "commit", ) -> AsyncGenerator[AsyncSession, None]: """ - A context manager that can be used to wrap another function which - uses a context session, making that call isolated within its + A context manager that you can use to wrap another function which + uses a context session, isolating the call within its own transaction. - There are several options that define how the function will handle - an already open transaction. + There are several options that define how the function handles + an open transaction. current_transaction: "commit" - commits the open transaction and starts a new one "rollback" - rolls back the open transaction and starts a new one @@ -80,7 +80,7 @@ async def atomic_db_session( async def commit_db_session(connect: DBConnect) -> None: """ - Commits the active session, if there is one. + Commits the active session. example of use: await your_function_with_db_session() @@ -93,7 +93,7 @@ async def commit_db_session(connect: DBConnect) -> None: async def rollback_db_session(connect: DBConnect) -> None: """ - Rollbacks the active session, if there is one. + Rolls back the active session. example of use: await your_function_with_db_session() @@ -108,9 +108,7 @@ async def close_db_session(connect: DBConnect) -> None: """ Closes the active session (and connection), if there is one. - This is useful if, for example, at the beginning of the handle a - database query is needed, and then there is some other long-term work - and you don't want to keep the connection opened. + Use if you have more work you need to complete without keeping the connection open. example of use: await your_function_with_db_session() diff --git a/context_async_sqlalchemy/test_utils.py b/context_async_sqlalchemy/test_utils.py index 047c35e..0cc6a68 100644 --- a/context_async_sqlalchemy/test_utils.py +++ b/context_async_sqlalchemy/test_utils.py @@ -64,7 +64,7 @@ async def put_savepoint_session_in_ctx( a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. - It is important to use this function inside set_test_context. + Use this function inside set_test_context. """ session_maker = await connection.session_maker() async with session_maker( diff --git a/docs/api/index.html b/docs/api/index.html index d72bcd8..f9df919 100644 --- a/docs/api/index.html +++ b/docs/api/index.html @@ -76,7 +76,7 @@
DBConnect is responsible for managing the engine and the session_maker. -You need to define two factories. -Optionally, you can specify a host to connect to. -You can also specify a handler that runs before a session is created - -this handler can be used to connect to the host for the first time or -to reconnect to a new one.
+DBConnect is responsible for managing the engine and session_maker.
+You need to define two factories:
+
def __init__(
self: DBConnect,
@@ -208,39 +210,36 @@ connect
async def connect(self: DBConnect, host: str) -> None:
Establishes a connection to the specified host.
-This method doesn’t need to be called explicitly.
-If it isn’t called, the first session request will automatically
-establish the connection.
+It doesn’t need to be called explicitly.
+The first session request automatically
+establishes the connection.
change_host
async def change_host(self: DBConnect, host: str) -> None:
-Establishes a connection to the specified host, but first
-checks under a lock that the currently connected host is different
+Establishes a connection to the specified host. It validates that the currently connected host is different
from the target host.
create_session
async def create_session(self: DBConnect) -> AsyncSession:
-Creates a new session. Used internally by the library -
-you’ll probably never need to call it directly, but it’s
-good to know it exists.
+Creates a new session the library uses internally.
+You never need to call it directly.
session_maker
async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]:
-Provides access to the session_maker currently used to create sessions.
+Provides access to the session_maker currently used to create sessions.
close
async def close(self: DBConnect) -> None:
-Completely closes and cleans up all resources, freeing the connection pool.
-This should be called at the end of your application’s lifecycle.
+Closes and cleans up all resources, freeing the connection pool.
+Use this call at the end of your application’s life cycle.
Middlewares
-Most of the work - and the “magic” - happens inside the middleware.
-You can check out how it works and implement your
+
Most of the work the “magic” happens inside the middleware. Check out how it works and implement your
own.
-Fastapi
+FastAPI
from context_async_sqlalchemy.fastapi_utils import (
fastapi_http_db_session_middleware,
add_fastapi_http_db_session_middleware,
@@ -285,7 +284,7 @@ Pure ASGI
app.add_middleware(ASGIHTTPDBSessionMiddleware)
Sessions
-Here are the functions you’ll use most often from the library.
+
Here are the library functions you will use most often.
They allow you to work with sessions directly from your asynchronous code.
db_session
async def db_session(connect: DBConnect) -> AsyncSession:
@@ -303,8 +302,7 @@ atomic_db_session
A context manager that can be used to wrap another function which
uses a context session, making that call isolated within its own transaction.
-There are several options that define how the function will handle
-an already open transaction.
+Several options define how a function handles an open transaction.
current_transaction:
commit - commits the open transaction and starts a new one
@@ -316,21 +314,20 @@ atomic_db_session
commit_db_session
async def commit_db_session(connect: DBConnect) -> None:
-Commits the active session, if there is one.
+Commits the active session.
rollback_db_session
async def rollback_db_session(connect: DBConnect) -> None:
-Rollbacks the active session, if there is one.
+Rolls back an active session.
close_db_session
async def close_db_session(connect: DBConnect) -> None:
-Closes the current context session. The connection is returned to the pool.
-If you close an uncommitted transaction, the connection will be rolled back.
-This is useful if, for example, at the beginning of the handle a
- database query is needed, and then there is some other long-term work
- and you don't want to keep the connection opened.
+Closes the current context session and returns the connection to the pool.
+If you close an uncommitted transaction, the connection rolls back.
+This is useful if a database querly is needed at the beginning of the handle.
+You have have more work you need to complete over time without being able to keep the connection open.
new_non_ctx_session
@asynccontextmanager
@@ -338,9 +335,7 @@ new_non_ctx_session
connect: DBConnect,
) -> AsyncGenerator[AsyncSession, None]:
-A context manager that allows you to create a new session without placing
-it in a context. It's used for manual session management when you
-don't want to use a context.
+A context manager that allows you to create a new session without placing it in a context.
new_non_ctx_atomic_session
@asynccontextmanager
@@ -359,11 +354,11 @@ run_in_new_ctx
**kwargs: Any,
) -> AsyncCallableResult:
-Runs a function in a new context with new sessions that have their
- own connection.
-It will commit the transaction automatically if callable_func does not
- raise exceptions. Otherwise, the transaction will be rolled back.
-The intended use is to run multiple database queries concurrently.
+Runs a function in a new context with new session(s) that have a
+ separate connection.
+It commits the transaction automatically if callable_func does not
+ raise exceptions. Otherwise, the transaction rolls back.
+It is intended to allow you to run multiple database queries concurrently.
example of use:
await asyncio.gather(
your_function_with_db_session(...),
@@ -374,16 +369,15 @@ run_in_new_ctx
)
Testing
-You can read more about testing here: Testing
+
rollback_session
@asynccontextmanager
async def rollback_session(
connection: DBConnect,
) -> AsyncGenerator[AsyncSession, None]:
-A context manager that creates a session which is automatically rolled
-back at the end.
-It’s intended for use in fixtures to execute SQL queries during tests.
+A context manager that creates a session which automatically rolls back at the end of the session.
+It is intended for you to use in fixtures to execute SQL queries during tests.
set_test_context
@asynccontextmanager
@@ -391,13 +385,12 @@ set_test_context
A context manager that creates a new context in which you can place a
dedicated test session.
-It’s intended for use in tests where the test and the application share
-a single transaction.
+It is intended to test the application when it shares a single transaction.
Use auto_close=False if you’re using a test session and transaction
that you close elsewhere in your code.
-Use auto_close=True if you want to call a function
+
Use auto_close=True to call a function
in a test that uses a context while the middleware is not
-active, and you want all sessions to be closed automatically.
+active. All sessions close automatically.
put_savepoint_session_in_ctx
async def put_savepoint_session_in_ctx(
@@ -408,8 +401,11 @@ put_savepoint_session_in_ctx
Sets the context to a session that uses a save point instead of creating
a transaction. You need to pass the session you're using inside
your tests to attach a new session to the same connection.
+
It is important to use this function inside set_test_context.
+
+Learn more about testing.
from fastapi import HTTPException
@@ -346,7 +341,7 @@ Rollback
await session.execute(stmt)
raise HTTPException(status_code=500)
- # transaction will be automatically rolled back by status code
+ # transaction rolls back automatically by status code
from context_async_sqlalchemy import db_session, rollback_db_session
from sqlalchemy import insert
diff --git a/docs/problem/index.html b/docs/problem/index.html
index ce28930..d593fd8 100644
--- a/docs/problem/index.html
+++ b/docs/problem/index.html
@@ -241,7 +241,7 @@ Solution
- Easy integration with web frameworks
- Automatic management of engine, session, and transaction lifecycles
-- Ability to manually close a session at any time, without waiting for the
+
- Manually close a session at any time without waiting for the
end of process
- Access to a session from the context only where it’s actually needed
diff --git a/docs/search/search_index.json b/docs/search/search_index.json
index 4daaae3..39c3926 100644
--- a/docs/search/search_index.json
+++ b/docs/search/search_index.json
@@ -1 +1 @@
-{"config":{"indexing":"full","lang":["en"],"min_search_length":3,"prebuild_index":false,"separator":"[\\s\\-]+"},"docs":[{"location":"","text":"context-async-sqlalchemy No AI was used in the creation of this library. SOURCE CODE Provides a super convenient way to work with SQLAlchemy in asynchronous applications. It handles the lifecycle management of the engine, sessions, and transactions. The main goal is to provide quick and easy access to a session, without worrying about opening or closing it when it\u2019s not necessary. Key features: Extremely easy to use Automatically manages the lifecycle of the engine, sessions, and transactions (autocommit / autorollback) Does not interfere with manually opening or closing sessions and transactions when needed Framework-agnostic - works with any web framework Not a wrapper around SQLAlchemy Convenient for testing Runtime host switching Supports multiple databases and multiple sessions per database Provides tools for running concurrent SQL queries Fully lazy initialization What does usage look like? from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically How it works Here is a very simplified diagram of how everything works: Before executing your code, the middleware will prepare a container in which the sessions required by your code will be stored. The container is saved in contextvars Your code accesses the library to create new sessions and retrieve existing ones After your code, middleware will automatically commit or roll back open transactions. Closes open sessions and clears the context. The library also provides the ability to commit, rollback, and close at any time, without waiting for the end of the request, without any problems.","title":"context-async-sqlalchemy"},{"location":"#context-async-sqlalchemy","text":"No AI was used in the creation of this library. SOURCE CODE Provides a super convenient way to work with SQLAlchemy in asynchronous applications. It handles the lifecycle management of the engine, sessions, and transactions. The main goal is to provide quick and easy access to a session, without worrying about opening or closing it when it\u2019s not necessary. Key features: Extremely easy to use Automatically manages the lifecycle of the engine, sessions, and transactions (autocommit / autorollback) Does not interfere with manually opening or closing sessions and transactions when needed Framework-agnostic - works with any web framework Not a wrapper around SQLAlchemy Convenient for testing Runtime host switching Supports multiple databases and multiple sessions per database Provides tools for running concurrent SQL queries Fully lazy initialization","title":"context-async-sqlalchemy"},{"location":"#what-does-usage-look-like","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically","title":"What does usage look like?"},{"location":"#how-it-works","text":"Here is a very simplified diagram of how everything works: Before executing your code, the middleware will prepare a container in which the sessions required by your code will be stored. The container is saved in contextvars Your code accesses the library to create new sessions and retrieve existing ones After your code, middleware will automatically commit or roll back open transactions. Closes open sessions and clears the context. The library also provides the ability to commit, rollback, and close at any time, without waiting for the end of the request, without any problems.","title":"How it works"},{"location":"api/","text":"API Reference DBConnect DBConnect is responsible for managing the engine and the session_maker. You need to define two factories. Optionally, you can specify a host to connect to. You can also specify a handler that runs before a session is created - this handler can be used to connect to the host for the first time or to reconnect to a new one. init def __init__( self: DBConnect, engine_creator: EngineCreatorFunc, session_maker_creator: SessionMakerCreatorFunc, host: str | None = None, before_create_session_handler: AsyncFunc | None = None, ) -> None: engine_creator is a factory function for creating engines. It\u2019s an asynchronous callable that takes a host as input and returns an async engine. Example session_maker_creator is a factory function for creating an asynchronous session_maker. It\u2019s an asynchronous callable that takes an async engine as input and returns an async session_maker. Example host is an optional parameter. You can specify only this parameter to make your connection always work with a single host, without dynamic switching. However, you can still change the host in the handler if needed - it won\u2019t cause any issues. before_create_session_handler is a handler that allows you to execute custom logic before creating a session. For example, you can switch the host to another one - this is useful for dynamically determining the master if the previous master fails and a replica takes its place. The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing. Example: async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) connect async def connect(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. This method doesn\u2019t need to be called explicitly. If it isn\u2019t called, the first session request will automatically establish the connection. change_host async def change_host(self: DBConnect, host: str) -> None: Establishes a connection to the specified host, but first checks under a lock that the currently connected host is different from the target host. create_session async def create_session(self: DBConnect) -> AsyncSession: Creates a new session. Used internally by the library - you\u2019ll probably never need to call it directly, but it\u2019s good to know it exists. session_maker async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]: Provides access to the session_maker currently used to create sessions. close async def close(self: DBConnect) -> None: Completely closes and cleans up all resources, freeing the connection pool. This should be called at the end of your application\u2019s lifecycle. Middlewares Most of the work - and the \u201cmagic\u201d - happens inside the middleware. You can check out how it works and implement your own. Fastapi from context_async_sqlalchemy.fastapi_utils import ( fastapi_http_db_session_middleware, add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware ) Starlette from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, starlette_http_db_session_middleware, StarletteHTTPDBSessionMiddleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware ) # OR app.add_middleware(StarletteHTTPDBSessionMiddleware) Pure ASGI from context_async_sqlalchemy import ( ASGIHTTPDBSessionMiddleware, ) app = SomeASGIApp(...) app.add_middleware(ASGIHTTPDBSessionMiddleware) Sessions Here are the functions you\u2019ll use most often from the library. They allow you to work with sessions directly from your asynchronous code. db_session async def db_session(connect: DBConnect) -> AsyncSession: The most important function for obtaining a session in your code. When called for the first time, it returns a new session; subsequent calls return the same session. atomic_db_session @asynccontextmanager async def atomic_db_session( connect: DBConnect, current_transaction: Literal[\"commit\", \"rollback\", \"append\", \"raise\"] = \"commit\", ) -> AsyncGenerator[AsyncSession, None]: A context manager that can be used to wrap another function which uses a context session, making that call isolated within its own transaction. There are several options that define how the function will handle an already open transaction. current_transaction: - commit - commits the open transaction and starts a new one - rollback - rolls back the open transaction and starts a new one - append - continues using the current transaction and commits it - raise - raises an InvalidRequestError commit_db_session async def commit_db_session(connect: DBConnect) -> None: Commits the active session, if there is one. rollback_db_session async def rollback_db_session(connect: DBConnect) -> None: Rollbacks the active session, if there is one. close_db_session async def close_db_session(connect: DBConnect) -> None: Closes the current context session. The connection is returned to the pool. If you close an uncommitted transaction, the connection will be rolled back. This is useful if, for example, at the beginning of the handle a database query is needed, and then there is some other long-term work and you don't want to keep the connection opened. new_non_ctx_session @asynccontextmanager async def new_non_ctx_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session without placing it in a context. It's used for manual session management when you don't want to use a context. new_non_ctx_atomic_session @asynccontextmanager async def new_non_ctx_atomic_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session with a new transaction that isn't placed in a context. It's used for manual session management when you don't want to use a context. Context run_in_new_ctx async def run_in_new_ctx( callable_func: AsyncCallable[AsyncCallableResult], *args: Any, **kwargs: Any, ) -> AsyncCallableResult: Runs a function in a new context with new sessions that have their own connection. It will commit the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction will be rolled back. The intended use is to run multiple database queries concurrently. example of use: await asyncio.gather( your_function_with_db_session(...), run_in_new_ctx( your_function_with_db_session, some_arg, some_kwarg=123, ), run_in_new_ctx(your_function_with_db_session, ...), ) Testing You can read more about testing here: Testing rollback_session @asynccontextmanager async def rollback_session( connection: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that creates a session which is automatically rolled back at the end. It\u2019s intended for use in fixtures to execute SQL queries during tests. set_test_context @asynccontextmanager async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]: A context manager that creates a new context in which you can place a dedicated test session. It\u2019s intended for use in tests where the test and the application share a single transaction. Use auto_close=False if you\u2019re using a test session and transaction that you close manually elsewhere in your code. Use auto_close=True if, for example, you want to call a function in a test that uses a context bypassing the middleware, and you want all sessions to be closed automatically. put_savepoint_session_in_ctx async def put_savepoint_session_in_ctx( connection: DBConnect, session: AsyncSession, ) -> AsyncGenerator[None, None]: Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. It is important to use this function inside set_test_context.","title":"API Reference"},{"location":"api/#api-reference","text":"","title":"API Reference"},{"location":"api/#dbconnect","text":"DBConnect is responsible for managing the engine and the session_maker. You need to define two factories. Optionally, you can specify a host to connect to. You can also specify a handler that runs before a session is created - this handler can be used to connect to the host for the first time or to reconnect to a new one.","title":"DBConnect"},{"location":"api/#init","text":"def __init__( self: DBConnect, engine_creator: EngineCreatorFunc, session_maker_creator: SessionMakerCreatorFunc, host: str | None = None, before_create_session_handler: AsyncFunc | None = None, ) -> None: engine_creator is a factory function for creating engines. It\u2019s an asynchronous callable that takes a host as input and returns an async engine. Example session_maker_creator is a factory function for creating an asynchronous session_maker. It\u2019s an asynchronous callable that takes an async engine as input and returns an async session_maker. Example host is an optional parameter. You can specify only this parameter to make your connection always work with a single host, without dynamic switching. However, you can still change the host in the handler if needed - it won\u2019t cause any issues. before_create_session_handler is a handler that allows you to execute custom logic before creating a session. For example, you can switch the host to another one - this is useful for dynamically determining the master if the previous master fails and a replica takes its place. The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing. Example: async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host)","title":"init"},{"location":"api/#connect","text":"async def connect(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. This method doesn\u2019t need to be called explicitly. If it isn\u2019t called, the first session request will automatically establish the connection.","title":"connect"},{"location":"api/#change_host","text":"async def change_host(self: DBConnect, host: str) -> None: Establishes a connection to the specified host, but first checks under a lock that the currently connected host is different from the target host.","title":"change_host"},{"location":"api/#create_session","text":"async def create_session(self: DBConnect) -> AsyncSession: Creates a new session. Used internally by the library - you\u2019ll probably never need to call it directly, but it\u2019s good to know it exists.","title":"create_session"},{"location":"api/#session_maker","text":"async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]: Provides access to the session_maker currently used to create sessions.","title":"session_maker"},{"location":"api/#close","text":"async def close(self: DBConnect) -> None: Completely closes and cleans up all resources, freeing the connection pool. This should be called at the end of your application\u2019s lifecycle.","title":"close"},{"location":"api/#middlewares","text":"Most of the work - and the \u201cmagic\u201d - happens inside the middleware. You can check out how it works and implement your own.","title":"Middlewares"},{"location":"api/#fastapi","text":"from context_async_sqlalchemy.fastapi_utils import ( fastapi_http_db_session_middleware, add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware )","title":"Fastapi"},{"location":"api/#starlette","text":"from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, starlette_http_db_session_middleware, StarletteHTTPDBSessionMiddleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware ) # OR app.add_middleware(StarletteHTTPDBSessionMiddleware)","title":"Starlette"},{"location":"api/#pure-asgi","text":"from context_async_sqlalchemy import ( ASGIHTTPDBSessionMiddleware, ) app = SomeASGIApp(...) app.add_middleware(ASGIHTTPDBSessionMiddleware)","title":"Pure ASGI"},{"location":"api/#sessions","text":"Here are the functions you\u2019ll use most often from the library. They allow you to work with sessions directly from your asynchronous code.","title":"Sessions"},{"location":"api/#db_session","text":"async def db_session(connect: DBConnect) -> AsyncSession: The most important function for obtaining a session in your code. When called for the first time, it returns a new session; subsequent calls return the same session.","title":"db_session"},{"location":"api/#atomic_db_session","text":"@asynccontextmanager async def atomic_db_session( connect: DBConnect, current_transaction: Literal[\"commit\", \"rollback\", \"append\", \"raise\"] = \"commit\", ) -> AsyncGenerator[AsyncSession, None]: A context manager that can be used to wrap another function which uses a context session, making that call isolated within its own transaction. There are several options that define how the function will handle an already open transaction. current_transaction: - commit - commits the open transaction and starts a new one - rollback - rolls back the open transaction and starts a new one - append - continues using the current transaction and commits it - raise - raises an InvalidRequestError","title":"atomic_db_session"},{"location":"api/#commit_db_session","text":"async def commit_db_session(connect: DBConnect) -> None: Commits the active session, if there is one.","title":"commit_db_session"},{"location":"api/#rollback_db_session","text":"async def rollback_db_session(connect: DBConnect) -> None: Rollbacks the active session, if there is one.","title":"rollback_db_session"},{"location":"api/#close_db_session","text":"async def close_db_session(connect: DBConnect) -> None: Closes the current context session. The connection is returned to the pool. If you close an uncommitted transaction, the connection will be rolled back. This is useful if, for example, at the beginning of the handle a database query is needed, and then there is some other long-term work and you don't want to keep the connection opened.","title":"close_db_session"},{"location":"api/#new_non_ctx_session","text":"@asynccontextmanager async def new_non_ctx_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session without placing it in a context. It's used for manual session management when you don't want to use a context.","title":"new_non_ctx_session"},{"location":"api/#new_non_ctx_atomic_session","text":"@asynccontextmanager async def new_non_ctx_atomic_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session with a new transaction that isn't placed in a context. It's used for manual session management when you don't want to use a context.","title":"new_non_ctx_atomic_session"},{"location":"api/#context","text":"","title":"Context"},{"location":"api/#run_in_new_ctx","text":"async def run_in_new_ctx( callable_func: AsyncCallable[AsyncCallableResult], *args: Any, **kwargs: Any, ) -> AsyncCallableResult: Runs a function in a new context with new sessions that have their own connection. It will commit the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction will be rolled back. The intended use is to run multiple database queries concurrently. example of use: await asyncio.gather( your_function_with_db_session(...), run_in_new_ctx( your_function_with_db_session, some_arg, some_kwarg=123, ), run_in_new_ctx(your_function_with_db_session, ...), )","title":"run_in_new_ctx"},{"location":"api/#testing","text":"You can read more about testing here: Testing","title":"Testing"},{"location":"api/#rollback_session","text":"@asynccontextmanager async def rollback_session( connection: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that creates a session which is automatically rolled back at the end. It\u2019s intended for use in fixtures to execute SQL queries during tests.","title":"rollback_session"},{"location":"api/#set_test_context","text":"@asynccontextmanager async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]: A context manager that creates a new context in which you can place a dedicated test session. It\u2019s intended for use in tests where the test and the application share a single transaction. Use auto_close=False if you\u2019re using a test session and transaction that you close manually elsewhere in your code. Use auto_close=True if, for example, you want to call a function in a test that uses a context bypassing the middleware, and you want all sessions to be closed automatically.","title":"set_test_context"},{"location":"api/#put_savepoint_session_in_ctx","text":"async def put_savepoint_session_in_ctx( connection: DBConnect, session: AsyncSession, ) -> AsyncGenerator[None, None]: Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. It is important to use this function inside set_test_context.","title":"put_savepoint_session_in_ctx"},{"location":"concurrent_queries/","text":"Concurrent sql queries Concurrent query execution deserves special attention. In SQLAlchemy, you can\u2019t run multiple queries concurrently within the same session - you need to create a new one. The library provides two simple ways to execute queries concurrently: Run a function in a new context - run_in_new_ctx Create a new session that is completely independent of the current context - new_non_ctx_atomic_session or new_non_ctx_session import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" In some situations, you need to have multiple sessions running simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # You can manually commit the transaction if you want, but it is not # necessary await commit_db_session(connection) # You can manually close the session if you want, but it is not necessary await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" You don't have to use the context to work with sessions at all \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" You don't have to use the context to work with sessions at all \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Concurrent SQL Queries"},{"location":"concurrent_queries/#concurrent-sql-queries","text":"Concurrent query execution deserves special attention. In SQLAlchemy, you can\u2019t run multiple queries concurrently within the same session - you need to create a new one. The library provides two simple ways to execute queries concurrently: Run a function in a new context - run_in_new_ctx Create a new session that is completely independent of the current context - new_non_ctx_atomic_session or new_non_ctx_session import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" In some situations, you need to have multiple sessions running simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # You can manually commit the transaction if you want, but it is not # necessary await commit_db_session(connection) # You can manually close the session if you want, but it is not necessary await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" You don't have to use the context to work with sessions at all \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" You don't have to use the context to work with sessions at all \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Concurrent sql queries"},{"location":"examples/","text":"Usage examples You can see not only fragments of examples, but also web application examples . Basic usage from sqlalchemy import insert from context_async_sqlalchemy import db_session from ..database import connection from ..models import ExampleTable async def handler_with_db_session() -> None: \"\"\" An example of a typical handle that uses a context session to work with a database. Autocommit or autorollback occurs automatically at the end of a request (in middleware). \"\"\" # Created a session (no connection to the database yet) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # Commit will happen automatically Atomic from context_async_sqlalchemy import atomic_db_session, db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_atomic() -> None: \"\"\" Let's imagine you already have a function that works with a contextual session, and its use case calls autocommit at the end of the request. You want to reuse this function, but you need to commit immediately, rather than wait for the request to complete. \"\"\" # the transaction will be committed or rolled back automatically # using the context manager async with atomic_db_session(connection): await _insert_1() # This is a new transaction in the same connection await _insert_1() async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_atomic\" ) await session.execute(stmt) Manually close the transaction and session from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_close() -> None: \"\"\" An example of a handle that uses a session in context, but commits manually and even closes the session to release the connection. \"\"\" # new connect -> new transaction -> commit await _insert_1() # old connect -> new transaction -> commit -> close connect await _insert_2() # new connect -> new transaction await _insert_3() # same connect -> same transaction await _insert_3() # autocommit async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # Here we closed the transaction await session.commit() # or await commit_db_session() async def _insert_2() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # Here we closed the transaction await commit_db_session(connection) # And here we closed the session = returned the connection to the pool # This is useful if, for example, at the beginning of the handle a # database query is needed, and then there is some other long-term work # and you don't want to keep the connection opened. await close_db_session(connection) async def _insert_3() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) Multiple sessions and concurrent execution import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" In some situations, you need to have multiple sessions running simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # You can manually commit the transaction if you want, but it is not # necessary await commit_db_session(connection) # You can manually close the session if you want, but it is not necessary await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" You don't have to use the context to work with sessions at all \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" You don't have to use the context to work with sessions at all \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit() Rollback from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_exception() -> None: \"\"\" let's imagine that an exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise Exception(\"Some exception\") # transaction will be automatically rolled back from fastapi import HTTPException from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_http_exception() -> None: \"\"\" let's imagine that an http exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise HTTPException(status_code=500) # transaction will be automatically rolled back by status code from context_async_sqlalchemy import db_session, rollback_db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_rollback() -> None: \"\"\" An example of a handle that uses a rollback \"\"\" # it's convenient this way await _insert() await rollback_db_session(connection) # but it's possible this way too await _insert() session = await db_session(connection) await session.rollback() async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Usage Examples"},{"location":"examples/#usage-examples","text":"You can see not only fragments of examples, but also web application examples .","title":"Usage examples"},{"location":"examples/#basic-usage","text":"from sqlalchemy import insert from context_async_sqlalchemy import db_session from ..database import connection from ..models import ExampleTable async def handler_with_db_session() -> None: \"\"\" An example of a typical handle that uses a context session to work with a database. Autocommit or autorollback occurs automatically at the end of a request (in middleware). \"\"\" # Created a session (no connection to the database yet) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # Commit will happen automatically","title":"Basic usage"},{"location":"examples/#atomic","text":"from context_async_sqlalchemy import atomic_db_session, db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_atomic() -> None: \"\"\" Let's imagine you already have a function that works with a contextual session, and its use case calls autocommit at the end of the request. You want to reuse this function, but you need to commit immediately, rather than wait for the request to complete. \"\"\" # the transaction will be committed or rolled back automatically # using the context manager async with atomic_db_session(connection): await _insert_1() # This is a new transaction in the same connection await _insert_1() async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_atomic\" ) await session.execute(stmt)","title":"Atomic"},{"location":"examples/#manually-close-the-transaction-and-session","text":"from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_close() -> None: \"\"\" An example of a handle that uses a session in context, but commits manually and even closes the session to release the connection. \"\"\" # new connect -> new transaction -> commit await _insert_1() # old connect -> new transaction -> commit -> close connect await _insert_2() # new connect -> new transaction await _insert_3() # same connect -> same transaction await _insert_3() # autocommit async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # Here we closed the transaction await session.commit() # or await commit_db_session() async def _insert_2() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # Here we closed the transaction await commit_db_session(connection) # And here we closed the session = returned the connection to the pool # This is useful if, for example, at the beginning of the handle a # database query is needed, and then there is some other long-term work # and you don't want to keep the connection opened. await close_db_session(connection) async def _insert_3() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Manually close the transaction and session"},{"location":"examples/#multiple-sessions-and-concurrent-execution","text":"import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" In some situations, you need to have multiple sessions running simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # You can manually commit the transaction if you want, but it is not # necessary await commit_db_session(connection) # You can manually close the session if you want, but it is not necessary await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" You don't have to use the context to work with sessions at all \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" You don't have to use the context to work with sessions at all \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Multiple sessions and concurrent execution"},{"location":"examples/#rollback","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_exception() -> None: \"\"\" let's imagine that an exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise Exception(\"Some exception\") # transaction will be automatically rolled back from fastapi import HTTPException from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_http_exception() -> None: \"\"\" let's imagine that an http exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise HTTPException(status_code=500) # transaction will be automatically rolled back by status code from context_async_sqlalchemy import db_session, rollback_db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_rollback() -> None: \"\"\" An example of a handle that uses a rollback \"\"\" # it's convenient this way await _insert() await rollback_db_session(connection) # but it's possible this way too await _insert() session = await db_session(connection) await session.rollback() async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Rollback"},{"location":"getting_started/","text":"Getting started Configure the connection to the database For example, for PostgreSQL - database.py: from sqlalchemy.ext.asyncio import ( async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine, ) from context_async_sqlalchemy import DBConnect def create_engine(host: str) -> AsyncEngine: \"\"\" database connection parameters. \"\"\" # In production code, you will probably take these parameters from env pg_user = \"krylosov-aa\" pg_password = \"\" pg_port = 6432 pg_db = \"test\" return create_async_engine( f\"postgresql+asyncpg://\" f\"{pg_user}:{pg_password}\" f\"@{host}:{pg_port}\" f\"/{pg_db}\", future=True, pool_pre_ping=True, ) def create_session_maker( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: \"\"\"session parameters\"\"\" return async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) connection = DBConnect( engine_creator=create_engine, session_maker_creator=create_session_maker, host=\"127.0.0.1\", # optional ) The host parameter is optional if you use a handler before creating a session - before_create_session_handler . In that case, you can dynamically set the host. Read more in Master/Replica or several databases at the same time Manage Database connection lifecycle Close resources at the end of your application's lifecycle. Example for FastAPI: from contextlib import asynccontextmanager from typing import Any, AsyncGenerator from fastapi import FastAPI from database import connection @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: \"\"\"Database connection lifecycle management\"\"\" yield await connection.close() # Close the engine if it was open Setup middleware Middleware handles the most important and complex part - managing context and sessions. You can use the ready-made middleware components: Pure ASGI from context_async_sqlalchemy import ASGIHTTPDBSessionMiddleware app.add_middleware(ASGIHTTPDBSessionMiddleware) FastAPI from context_async_sqlalchemy.fastapi_utils import ( add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) Starlette from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) Write own If there\u2019s no ready-made solution that fits your needs - don\u2019t worry! You can check out how it works and implement your own. Use it from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically Examples The repository includes an example integration with FastAPI, demonstrating various workflows: FastAPI example It also contains two types of test setups that you can use in your own projects. All library tests are included within the examples - because we aim to test the functionality not in isolation, but in the context of a real asynchronous web application. FastAPI tests example","title":"Getting Started"},{"location":"getting_started/#getting-started","text":"","title":"Getting started"},{"location":"getting_started/#configure-the-connection-to-the-database","text":"For example, for PostgreSQL - database.py: from sqlalchemy.ext.asyncio import ( async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine, ) from context_async_sqlalchemy import DBConnect def create_engine(host: str) -> AsyncEngine: \"\"\" database connection parameters. \"\"\" # In production code, you will probably take these parameters from env pg_user = \"krylosov-aa\" pg_password = \"\" pg_port = 6432 pg_db = \"test\" return create_async_engine( f\"postgresql+asyncpg://\" f\"{pg_user}:{pg_password}\" f\"@{host}:{pg_port}\" f\"/{pg_db}\", future=True, pool_pre_ping=True, ) def create_session_maker( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: \"\"\"session parameters\"\"\" return async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) connection = DBConnect( engine_creator=create_engine, session_maker_creator=create_session_maker, host=\"127.0.0.1\", # optional ) The host parameter is optional if you use a handler before creating a session - before_create_session_handler . In that case, you can dynamically set the host. Read more in Master/Replica or several databases at the same time","title":"Configure the connection to the database"},{"location":"getting_started/#manage-database-connection-lifecycle","text":"Close resources at the end of your application's lifecycle. Example for FastAPI: from contextlib import asynccontextmanager from typing import Any, AsyncGenerator from fastapi import FastAPI from database import connection @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: \"\"\"Database connection lifecycle management\"\"\" yield await connection.close() # Close the engine if it was open","title":"Manage Database connection lifecycle"},{"location":"getting_started/#setup-middleware","text":"Middleware handles the most important and complex part - managing context and sessions. You can use the ready-made middleware components:","title":"Setup middleware"},{"location":"getting_started/#pure-asgi","text":"from context_async_sqlalchemy import ASGIHTTPDBSessionMiddleware app.add_middleware(ASGIHTTPDBSessionMiddleware)","title":"Pure ASGI"},{"location":"getting_started/#fastapi","text":"from context_async_sqlalchemy.fastapi_utils import ( add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app)","title":"FastAPI"},{"location":"getting_started/#starlette","text":"from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app)","title":"Starlette"},{"location":"getting_started/#write-own","text":"If there\u2019s no ready-made solution that fits your needs - don\u2019t worry! You can check out how it works and implement your own.","title":"Write own"},{"location":"getting_started/#use-it","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically","title":"Use it"},{"location":"getting_started/#examples","text":"The repository includes an example integration with FastAPI, demonstrating various workflows: FastAPI example It also contains two types of test setups that you can use in your own projects. All library tests are included within the examples - because we aim to test the functionality not in isolation, but in the context of a real asynchronous web application. FastAPI tests example","title":"Examples"},{"location":"how_middleware_works/","text":"How middleware works Most of the work - and the \u201cmagic\u201d - happens inside the middleware. Here is a diagram describing how it works. At the beginning of a request, the middleware initializes a new asynchronous context. This asynchronous context is implemented using a contextvar. It holds a mutable container that stores sessions. A mutable container is used so that any coroutine, at any level, can create, modify, or close sessions, and those changes will affect the execution of the entire request. Whenever your code accesses the library\u2019s functionality, it interacts with this container. Finally, the middleware checks the container for any active sessions and open transactions. If transactions are open, they are either committed when the query execution is successful or rolled back if it fails. After that, all sessions are closed. That\u2019s precisely why you can safely close transactions and sessions early. The middleware simply works with whatever is in the container: if there\u2019s anything left, it will close it properly; if you\u2019ve already handled it yourself, the middleware only needs to reset the context. Code example The library aims to provide ready-made solutions so that you don\u2019t have to worry about these details - but they\u2019re not always available. So, let\u2019s take a look at how Starlette middleware works. You can use this example as a reference to implement your own. CODE from starlette.middleware.base import ( # type: ignore[attr-defined] Request, Response, RequestResponseEndpoint, BaseHTTPMiddleware, ) from context_async_sqlalchemy import ( init_db_session_ctx, is_context_initiated, reset_db_session_ctx, auto_commit_by_status_code, rollback_all_sessions, ) async def starlette_http_db_session_middleware( request: Request, call_next: RequestResponseEndpoint ) -> Response: \"\"\" Database session lifecycle management. The session itself is created on demand in db_session(). Transaction auto-commit is implemented if there is no exception and the response status is < 400. Otherwise, a rollback is performed. But you can commit or rollback manually in the handler. \"\"\" # Tests have different session management rules # so if the context variable is already set, we do nothing if is_context_initiated(): return await call_next(request) # We set the context here, meaning all child coroutines will receive the # same context. And even if a child coroutine requests the # session first, the container itself is shared, and this coroutine will # add the session to container = shared context. token = init_db_session_ctx() try: response = await call_next(request) # using the status code, we decide to commit or rollback all sessions await auto_commit_by_status_code(response.status_code) return response except Exception: # If an exception occurs, we roll all sessions back await rollback_all_sessions() raise finally: # Close all sessions and clear the context await reset_db_session_ctx(token)","title":"How middleware works"},{"location":"how_middleware_works/#how-middleware-works","text":"Most of the work - and the \u201cmagic\u201d - happens inside the middleware. Here is a diagram describing how it works. At the beginning of a request, the middleware initializes a new asynchronous context. This asynchronous context is implemented using a contextvar. It holds a mutable container that stores sessions. A mutable container is used so that any coroutine, at any level, can create, modify, or close sessions, and those changes will affect the execution of the entire request. Whenever your code accesses the library\u2019s functionality, it interacts with this container. Finally, the middleware checks the container for any active sessions and open transactions. If transactions are open, they are either committed when the query execution is successful or rolled back if it fails. After that, all sessions are closed. That\u2019s precisely why you can safely close transactions and sessions early. The middleware simply works with whatever is in the container: if there\u2019s anything left, it will close it properly; if you\u2019ve already handled it yourself, the middleware only needs to reset the context.","title":"How middleware works"},{"location":"how_middleware_works/#code-example","text":"The library aims to provide ready-made solutions so that you don\u2019t have to worry about these details - but they\u2019re not always available. So, let\u2019s take a look at how Starlette middleware works. You can use this example as a reference to implement your own. CODE from starlette.middleware.base import ( # type: ignore[attr-defined] Request, Response, RequestResponseEndpoint, BaseHTTPMiddleware, ) from context_async_sqlalchemy import ( init_db_session_ctx, is_context_initiated, reset_db_session_ctx, auto_commit_by_status_code, rollback_all_sessions, ) async def starlette_http_db_session_middleware( request: Request, call_next: RequestResponseEndpoint ) -> Response: \"\"\" Database session lifecycle management. The session itself is created on demand in db_session(). Transaction auto-commit is implemented if there is no exception and the response status is < 400. Otherwise, a rollback is performed. But you can commit or rollback manually in the handler. \"\"\" # Tests have different session management rules # so if the context variable is already set, we do nothing if is_context_initiated(): return await call_next(request) # We set the context here, meaning all child coroutines will receive the # same context. And even if a child coroutine requests the # session first, the container itself is shared, and this coroutine will # add the session to container = shared context. token = init_db_session_ctx() try: response = await call_next(request) # using the status code, we decide to commit or rollback all sessions await auto_commit_by_status_code(response.status_code) return response except Exception: # If an exception occurs, we roll all sessions back await rollback_all_sessions() raise finally: # Close all sessions and clear the context await reset_db_session_ctx(token)","title":"Code example"},{"location":"master_replica/","text":"Master/Replica or several databases at the same time This is why db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously - for example, with both a master and a replica. DBConnect can also accept factory functions instead of ready-made objects, making it easy to switch hosts when needed. For example, libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. You\u2019ll need third-party functionality to determine which host is the master or the replica. Hopefully, I\u2019ll be able to provide a ready-made solution for this soon. . The engine is not created immediately when DBConnect is initialized - it is created only on the first request. The library uses lazy initialization in many places. from context_async_sqlalchemy import DBConnect from master_replica_helper import get_master, get_replica async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) master = DBConnect( ..., before_create_session_handler=renew_master_connect, ) async def renew_replica_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the replica has changed\"\"\" replica_host = await get_replica() if replica_host != connect.host: await connect.change_host(replica_host) replica = DBConnect( ..., before_create_session_handler=renew_replica_connect, )","title":"Master/Replica or several databases at the same time"},{"location":"master_replica/#masterreplica-or-several-databases-at-the-same-time","text":"This is why db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously - for example, with both a master and a replica. DBConnect can also accept factory functions instead of ready-made objects, making it easy to switch hosts when needed. For example, libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. You\u2019ll need third-party functionality to determine which host is the master or the replica. Hopefully, I\u2019ll be able to provide a ready-made solution for this soon. . The engine is not created immediately when DBConnect is initialized - it is created only on the first request. The library uses lazy initialization in many places. from context_async_sqlalchemy import DBConnect from master_replica_helper import get_master, get_replica async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) master = DBConnect( ..., before_create_session_handler=renew_master_connect, ) async def renew_replica_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the replica has changed\"\"\" replica_host = await get_replica() if replica_host != connect.host: await connect.change_host(replica_host) replica = DBConnect( ..., before_create_session_handler=renew_replica_connect, )","title":"Master/Replica or several databases at the same time"},{"location":"problem/","text":"The Problem It Solves SQLAlchemy uses an engine that manages the connection pool. The engine must remain active for as long as the application is running, so it can quickly provide ready-to-use connections whenever the application needs them. In the application, we work with sessions. A session obtains a single connection from the pool and should have a short lifespan - usually lasting only for the duration of a single request, or even less. Let's see what existing solutions are available to manage sessions: Manual solution This is how the code ends up being duplicated, and two connections and two transactions are used - even though in many cases only one connection and one transaction are actually needed. @app.post(\"/users/\") async def create_user(name): await insert_user(name) await insert_user_profile(name) async def insert_user(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) async def insert_user_profile(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) You can move the duplicated code to a higher level, which will result in using a single connection and a single transaction. @app.post(\"/users/\") async def create_user(name:): async with get_async_session() as session: async with session.begin(): await insert_user(name, session) await insert_user_profile(name, session) async def insert_user(name, session): await session.execute(stmt) async def insert_user_profile(name, session): await session.execute(stmt) But if you look at it more broadly, the code duplication doesn\u2019t actually go away - you still have to do this in every handler. @app.post(\"/dogs/\") async def create_dog(name): async with get_async_session() as session: async with session.begin(): ... @app.post(\"/cats\") async def create_cat(name): async with get_async_session() as session: async with session.begin(): ... You also have to set everything up yourself. No ready-made integration solutions are used - which means freedom on one hand, but a lot of code on the other. Dependency You can use a dependency. For example, in FastAPI, it looks like this: async def get_atomic_session(): async with session_maker() as session: async with session.begin(): yield session @app.post(\"/dogs/\") async def create_dog(name, session=Depends(get_atomic_session)): ... @app.post(\"/cats/\") async def create_cat(name, session=Depends(get_atomic_session)): ... There are two problems here: You can\u2019t close the session or transaction prematurely, because the dependency is responsible for that. The session has to be passed all the way down the stack to the place where it\u2019s actually needed. By the way, there\u2019s no ready-made solution for integrating with the framework - you have to implement the dependency yourself. Wrappers over sqlalachemy There are various wrappers that often provide more convenient integration. Litestar, for example, has the same advantages and disadvantages as using dependencies: config = SQLAlchemyAsyncConfig( connection_string=URL ) sqlalchemy_plugin = SQLAlchemyInitPlugin(config) class UserRepository(SQLAlchemyAsyncRepository[User]): model_type = User @post(\"/users\") async def create_user(data: User, repo: UserRepository): await repo.add(data) # <- insert into User Here\u2019s an example using Ormar: class BaseMeta(ormar.ModelMeta): ... class User(ormar.Model): ... @app.post(\"/users/\") async def create_user(name): await User.objects.create(name=name) The main problem with wrappers is that they require developers to learn something new. They introduce their own syntax - so even if a developer is familiar with SQLAlchemy, it doesn\u2019t mean they\u2019ll understand the wrapper. Wrappers are also often designed for convenience when working with simple CRUD operations, but writing complex SQL queries with them can be very challenging. Solution And this library solves all of these issues: Easy integration with web frameworks Automatic management of engine, session, and transaction lifecycles Ability to manually close a session at any time, without waiting for the end of process Access to a session from the context only where it\u2019s actually needed","title":"The Problem It Solves"},{"location":"problem/#the-problem-it-solves","text":"SQLAlchemy uses an engine that manages the connection pool. The engine must remain active for as long as the application is running, so it can quickly provide ready-to-use connections whenever the application needs them. In the application, we work with sessions. A session obtains a single connection from the pool and should have a short lifespan - usually lasting only for the duration of a single request, or even less. Let's see what existing solutions are available to manage sessions:","title":"The Problem It Solves"},{"location":"problem/#manual-solution","text":"This is how the code ends up being duplicated, and two connections and two transactions are used - even though in many cases only one connection and one transaction are actually needed. @app.post(\"/users/\") async def create_user(name): await insert_user(name) await insert_user_profile(name) async def insert_user(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) async def insert_user_profile(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) You can move the duplicated code to a higher level, which will result in using a single connection and a single transaction. @app.post(\"/users/\") async def create_user(name:): async with get_async_session() as session: async with session.begin(): await insert_user(name, session) await insert_user_profile(name, session) async def insert_user(name, session): await session.execute(stmt) async def insert_user_profile(name, session): await session.execute(stmt) But if you look at it more broadly, the code duplication doesn\u2019t actually go away - you still have to do this in every handler. @app.post(\"/dogs/\") async def create_dog(name): async with get_async_session() as session: async with session.begin(): ... @app.post(\"/cats\") async def create_cat(name): async with get_async_session() as session: async with session.begin(): ... You also have to set everything up yourself. No ready-made integration solutions are used - which means freedom on one hand, but a lot of code on the other.","title":"Manual solution"},{"location":"problem/#dependency","text":"You can use a dependency. For example, in FastAPI, it looks like this: async def get_atomic_session(): async with session_maker() as session: async with session.begin(): yield session @app.post(\"/dogs/\") async def create_dog(name, session=Depends(get_atomic_session)): ... @app.post(\"/cats/\") async def create_cat(name, session=Depends(get_atomic_session)): ... There are two problems here: You can\u2019t close the session or transaction prematurely, because the dependency is responsible for that. The session has to be passed all the way down the stack to the place where it\u2019s actually needed. By the way, there\u2019s no ready-made solution for integrating with the framework - you have to implement the dependency yourself.","title":"Dependency"},{"location":"problem/#wrappers-over-sqlalachemy","text":"There are various wrappers that often provide more convenient integration. Litestar, for example, has the same advantages and disadvantages as using dependencies: config = SQLAlchemyAsyncConfig( connection_string=URL ) sqlalchemy_plugin = SQLAlchemyInitPlugin(config) class UserRepository(SQLAlchemyAsyncRepository[User]): model_type = User @post(\"/users\") async def create_user(data: User, repo: UserRepository): await repo.add(data) # <- insert into User Here\u2019s an example using Ormar: class BaseMeta(ormar.ModelMeta): ... class User(ormar.Model): ... @app.post(\"/users/\") async def create_user(name): await User.objects.create(name=name) The main problem with wrappers is that they require developers to learn something new. They introduce their own syntax - so even if a developer is familiar with SQLAlchemy, it doesn\u2019t mean they\u2019ll understand the wrapper. Wrappers are also often designed for convenience when working with simple CRUD operations, but writing complex SQL queries with them can be very challenging.","title":"Wrappers over sqlalachemy"},{"location":"problem/#solution","text":"And this library solves all of these issues: Easy integration with web frameworks Automatic management of engine, session, and transaction lifecycles Ability to manually close a session at any time, without waiting for the end of process Access to a session from the context only where it\u2019s actually needed","title":"Solution"},{"location":"testing/","text":"Testing When testing with a real database, one important problem needs to be solved - ensuring data isolation between tests. There are basically two approaches: Separate sessions The test has its own session that it uses to prepare data and verify results after execution. The application also has its own session. Data isolation is achieved by clearing all tables at the end of each test (and once before running all tests). Shared session and transaction The test and the application share the same session and transaction. Data isolation is achieved by rolling back the transaction at the end of the test. Personally, I prefer the first option, because it is a more \"honest\" way to test the application. We can verify how it handles sessions and transactions on its own. It\u2019s also convenient to inspect the database state when a test is paused. Sometimes, there are complex session management scenarios (for example, concurrent query execution) where other types of testing are either impossible or very difficult. The main disadvantage of this approach is the slower execution speed. Since we clear all tables after each test, this process takes additional time. This is where the second approach comes in - its main advantage is speed, as rolling back a transaction is very fast. In my projects, I use both approaches at the same time: For most tests with simple or common logic, I use a shared transaction for the test and the application For more complex cases, or ones that cannot be tested this way, I use separate transactions. This combination allows for both good performance and convenient testing. The library provides several utilities that can be used in tests - for example, in fixtures. They help create tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback. You can see these capabilities in the examples: Here are tests with a common transaction between the application and the tests. And here's an example with different transactions. Create session with autorollback rollback_session creates a session that always rolls back automatically. from context_async_sqlalchemy.test_utils import rollback_session @pytest_asyncio.fixture async def db_session_test() -> AsyncGenerator[AsyncSession, None]: \"\"\"The session that is used inside the test\"\"\" async with rollback_session(connection) as session: yield session Override context set_test_context creates a new context put_savepoint_session_in_ctx puts into context a session that uses the same connection as db_session_test , but if you commit in this session, then the transaction will not be committed, but save point will be released from context_async_sqlalchemy.test_utils import ( put_savepoint_session_in_ctx, set_test_context, ) @pytest_asyncio.fixture(autouse=True) async def db_session_override( db_session_test: AsyncSession, ) -> AsyncGenerator[None, None]: \"\"\" The key thing about these tests is that we override the context in advance. The middleware has a special check that won't initialize the context if it already exists. \"\"\" async with set_test_context(): async with put_savepoint_session_in_ctx(connection, db_session_test): yield","title":"Testing"},{"location":"testing/#testing","text":"When testing with a real database, one important problem needs to be solved - ensuring data isolation between tests. There are basically two approaches: Separate sessions The test has its own session that it uses to prepare data and verify results after execution. The application also has its own session. Data isolation is achieved by clearing all tables at the end of each test (and once before running all tests). Shared session and transaction The test and the application share the same session and transaction. Data isolation is achieved by rolling back the transaction at the end of the test. Personally, I prefer the first option, because it is a more \"honest\" way to test the application. We can verify how it handles sessions and transactions on its own. It\u2019s also convenient to inspect the database state when a test is paused. Sometimes, there are complex session management scenarios (for example, concurrent query execution) where other types of testing are either impossible or very difficult. The main disadvantage of this approach is the slower execution speed. Since we clear all tables after each test, this process takes additional time. This is where the second approach comes in - its main advantage is speed, as rolling back a transaction is very fast. In my projects, I use both approaches at the same time: For most tests with simple or common logic, I use a shared transaction for the test and the application For more complex cases, or ones that cannot be tested this way, I use separate transactions. This combination allows for both good performance and convenient testing. The library provides several utilities that can be used in tests - for example, in fixtures. They help create tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback. You can see these capabilities in the examples: Here are tests with a common transaction between the application and the tests. And here's an example with different transactions.","title":"Testing"},{"location":"testing/#create-session-with-autorollback","text":"rollback_session creates a session that always rolls back automatically. from context_async_sqlalchemy.test_utils import rollback_session @pytest_asyncio.fixture async def db_session_test() -> AsyncGenerator[AsyncSession, None]: \"\"\"The session that is used inside the test\"\"\" async with rollback_session(connection) as session: yield session","title":"Create session with autorollback"},{"location":"testing/#override-context","text":"set_test_context creates a new context put_savepoint_session_in_ctx puts into context a session that uses the same connection as db_session_test , but if you commit in this session, then the transaction will not be committed, but save point will be released from context_async_sqlalchemy.test_utils import ( put_savepoint_session_in_ctx, set_test_context, ) @pytest_asyncio.fixture(autouse=True) async def db_session_override( db_session_test: AsyncSession, ) -> AsyncGenerator[None, None]: \"\"\" The key thing about these tests is that we override the context in advance. The middleware has a special check that won't initialize the context if it already exists. \"\"\" async with set_test_context(): async with put_savepoint_session_in_ctx(connection, db_session_test): yield","title":"Override context"}]}
\ No newline at end of file
+{"config":{"indexing":"full","lang":["en"],"min_search_length":3,"prebuild_index":false,"separator":"[\\s\\-]+"},"docs":[{"location":"","text":"context-async-sqlalchemy No AI was used in the creation of this library. SOURCE CODE Provides a super convenient way to work with SQLAlchemy in asynchronous applications. It handles the lifecycle management of the engine, sessions, and transactions. The main goal is to provide quick and easy access to a session, without worrying about opening or closing it when it\u2019s not necessary. Key features: Extremely easy to use Automatically manages the lifecycle of the engine, sessions, and transactions (autocommit / autorollback) Does not interfere with manually opening or closing sessions and transactions when needed Framework-agnostic - works with any web framework Not a wrapper around SQLAlchemy Convenient for testing Runtime host switching Supports multiple databases and multiple sessions per database Provides tools for running concurrent SQL queries Fully lazy initialization What does usage look like? from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it returns the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically How it works Here is a very simplified diagram of how everything works: Before executing your code, the middleware will prepare a container in which the sessions required by your code will be stored. The container is saved in contextvars Your code accesses the library to create new sessions and retrieve existing ones After your code, middleware will automatically commit or roll back open transactions. Closes open sessions and clears the context. The library also provides the ability to commit, rollback, and close at any time, without waiting for the end of the request, without any problems.","title":"context-async-sqlalchemy"},{"location":"#context-async-sqlalchemy","text":"No AI was used in the creation of this library. SOURCE CODE Provides a super convenient way to work with SQLAlchemy in asynchronous applications. It handles the lifecycle management of the engine, sessions, and transactions. The main goal is to provide quick and easy access to a session, without worrying about opening or closing it when it\u2019s not necessary. Key features: Extremely easy to use Automatically manages the lifecycle of the engine, sessions, and transactions (autocommit / autorollback) Does not interfere with manually opening or closing sessions and transactions when needed Framework-agnostic - works with any web framework Not a wrapper around SQLAlchemy Convenient for testing Runtime host switching Supports multiple databases and multiple sessions per database Provides tools for running concurrent SQL queries Fully lazy initialization","title":"context-async-sqlalchemy"},{"location":"#what-does-usage-look-like","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically","title":"What does usage look like?"},{"location":"#how-it-works","text":"Here is a very simplified diagram of how everything works: Before executing your code, the middleware will prepare a container in which the sessions required by your code will be stored. The container is saved in contextvars Your code accesses the library to create new sessions and retrieve existing ones After your code, middleware will automatically commit or roll back open transactions. Closes open sessions and clears the context. The library also provides the ability to commit, rollback, and close at any time, without waiting for the end of the request, without any problems.","title":"How it works"},{"location":"api/","text":"API Reference DBConnect DBConnect is responsible for managing the engine and the session_maker. You need to define two factories. Optionally, you can specify a host to connect to. You can also specify a handler that runs before a session is created - this handler can be used to connect to the host for the first time or to reconnect to a new one. init def __init__( self: DBConnect, engine_creator: EngineCreatorFunc, session_maker_creator: SessionMakerCreatorFunc, host: str | None = None, before_create_session_handler: AsyncFunc | None = None, ) -> None: engine_creator is a factory function for creating engines. It\u2019s an asynchronous callable that takes a host as input and returns an async engine. Example session_maker_creator is a factory function for creating an asynchronous session_maker. It\u2019s an asynchronous callable that takes an async engine as input and returns an async session_maker. Example host is an optional parameter. You can specify only this parameter to make your connection always work with a single host, without dynamic switching. However, you can still change the host in the handler if needed - it won\u2019t cause any issues. before_create_session_handler is a handler that allows you to execute custom logic before creating a session. For example, you can switch the host to another one - this is useful for dynamically determining the master if the previous master fails and a replica takes its place. The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing. Example: async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) connect async def connect(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. This method doesn\u2019t need to be called explicitly. If it isn\u2019t called, the first session request will automatically establish the connection. change_host async def change_host(self: DBConnect, host: str) -> None: Establishes a connection to the specified host, but first checks under a lock that the currently connected host is different from the target host. create_session async def create_session(self: DBConnect) -> AsyncSession: Creates a new session. Used internally by the library - you\u2019ll probably never need to call it directly, but it\u2019s good to know it exists. session_maker async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]: Provides access to the session_maker currently used to create sessions. close async def close(self: DBConnect) -> None: Completely closes and cleans up all resources, freeing the connection pool. This should be called at the end of your application\u2019s lifecycle. Middlewares Most of the work - and the \u201cmagic\u201d - happens inside the middleware. You can check out how it works and implement your own. Fastapi from context_async_sqlalchemy.fastapi_utils import ( fastapi_http_db_session_middleware, add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware ) Starlette from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, starlette_http_db_session_middleware, StarletteHTTPDBSessionMiddleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware ) # OR app.add_middleware(StarletteHTTPDBSessionMiddleware) Pure ASGI from context_async_sqlalchemy import ( ASGIHTTPDBSessionMiddleware, ) app = SomeASGIApp(...) app.add_middleware(ASGIHTTPDBSessionMiddleware) Sessions Here are the functions you\u2019ll use most often from the library. They allow you to work with sessions directly from your asynchronous code. db_session async def db_session(connect: DBConnect) -> AsyncSession: The most important function for obtaining a session in your code. When called for the first time, it returns a new session; subsequent calls return the same session. atomic_db_session @asynccontextmanager async def atomic_db_session( connect: DBConnect, current_transaction: Literal[\"commit\", \"rollback\", \"append\", \"raise\"] = \"commit\", ) -> AsyncGenerator[AsyncSession, None]: A context manager that can be used to wrap another function which uses a context session, making that call isolated within its own transaction. There are several options that define how the function will handle an already open transaction. current_transaction: - commit - commits the open transaction and starts a new one - rollback - rolls back the open transaction and starts a new one - append - continues using the current transaction and commits it - raise - raises an InvalidRequestError commit_db_session async def commit_db_session(connect: DBConnect) -> None: Commits the active session, if there is one. rollback_db_session async def rollback_db_session(connect: DBConnect) -> None: Rollbacks the active session, if there is one. close_db_session async def close_db_session(connect: DBConnect) -> None: Closes the current context session. The connection is returned to the pool. If you close an uncommitted transaction, the connection will be rolled back. This is useful if, for example, at the beginning of the handle a database query is needed, and then there is some other long-term work and you don't want to keep the connection opened. new_non_ctx_session @asynccontextmanager async def new_non_ctx_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session without placing it in a context. It's used for manual session management when you don't want to use a context. new_non_ctx_atomic_session @asynccontextmanager async def new_non_ctx_atomic_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session with a new transaction that isn't placed in a context. It's used for manual session management when you don't want to use a context. Context run_in_new_ctx async def run_in_new_ctx( callable_func: AsyncCallable[AsyncCallableResult], *args: Any, **kwargs: Any, ) -> AsyncCallableResult: Runs a function in a new context with new sessions that have their own connection. It will commit the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction will be rolled back. The intended use is to run multiple database queries concurrently. example of use: await asyncio.gather( your_function_with_db_session(...), run_in_new_ctx( your_function_with_db_session, some_arg, some_kwarg=123, ), run_in_new_ctx(your_function_with_db_session, ...), ) Testing You can read more about testing here: Testing rollback_session @asynccontextmanager async def rollback_session( connection: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that creates a session which is automatically rolled back at the end. It\u2019s intended for use in fixtures to execute SQL queries during tests. set_test_context @asynccontextmanager async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]: A context manager that creates a new context in which you can place a dedicated test session. It\u2019s intended for use in tests where the test and the application share a single transaction. Use auto_close=False if you\u2019re using a test session and transaction that you close manually elsewhere in your code. Use auto_close=True if, for example, you want to call a function in a test that uses a context bypassing the middleware, and you want all sessions to be closed automatically. put_savepoint_session_in_ctx async def put_savepoint_session_in_ctx( connection: DBConnect, session: AsyncSession, ) -> AsyncGenerator[None, None]: Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. It is important to use this function inside set_test_context.","title":"API Reference"},{"location":"api/#api-reference","text":"","title":"API Reference"},{"location":"api/#dbconnect","text":"DBConnect is responsible for managing the engine and the session_maker. You need to define two factories. Optionally, you can specify a host to connect to. You can also specify a handler that runs before a session is created - this handler can be used to connect to the host for the first time or to reconnect to a new one.","title":"DBConnect"},{"location":"api/#init","text":"def __init__( self: DBConnect, engine_creator: EngineCreatorFunc, session_maker_creator: SessionMakerCreatorFunc, host: str | None = None, before_create_session_handler: AsyncFunc | None = None, ) -> None: engine_creator is a factory function for creating engines. It\u2019s an asynchronous callable that takes a host as input and returns an async engine. Example session_maker_creator is a factory function for creating an asynchronous session_maker. It\u2019s an asynchronous callable that takes an async engine as input and returns an async session_maker. Example host is an optional parameter. You can specify only this parameter to make your connection always work with a single host, without dynamic switching. However, you can still change the host in the handler if needed - it won\u2019t cause any issues. before_create_session_handler is a handler that allows you to execute custom logic before creating a session. For example, you can switch the host to another one - this is useful for dynamically determining the master if the previous master fails and a replica takes its place. The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing. Example: async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host)","title":"init"},{"location":"api/#connect","text":"async def connect(self: DBConnect, host: str) -> None: Establishes a connection to the specified host. This method doesn\u2019t need to be called explicitly. If it isn\u2019t called, the first session request will automatically establish the connection.","title":"connect"},{"location":"api/#change_host","text":"async def change_host(self: DBConnect, host: str) -> None: Establishes a connection to the specified host, but first checks under a lock that the currently connected host is different from the target host.","title":"change_host"},{"location":"api/#create_session","text":"async def create_session(self: DBConnect) -> AsyncSession: Creates a new session. Used internally by the library - you\u2019ll probably never need to call it directly, but it\u2019s good to know it exists.","title":"create_session"},{"location":"api/#session_maker","text":"async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]: Provides access to the session_maker currently used to create sessions.","title":"session_maker"},{"location":"api/#close","text":"async def close(self: DBConnect) -> None: Completely closes and cleans up all resources, freeing the connection pool. This should be called at the end of your application\u2019s lifecycle.","title":"close"},{"location":"api/#middlewares","text":"Most of the work - and the \u201cmagic\u201d - happens inside the middleware. You can check out how it works and implement your own.","title":"Middlewares"},{"location":"api/#fastapi","text":"from context_async_sqlalchemy.fastapi_utils import ( fastapi_http_db_session_middleware, add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware )","title":"Fastapi"},{"location":"api/#starlette","text":"from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, starlette_http_db_session_middleware, StarletteHTTPDBSessionMiddleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) # OR app.add_middleware( BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware ) # OR app.add_middleware(StarletteHTTPDBSessionMiddleware)","title":"Starlette"},{"location":"api/#pure-asgi","text":"from context_async_sqlalchemy import ( ASGIHTTPDBSessionMiddleware, ) app = SomeASGIApp(...) app.add_middleware(ASGIHTTPDBSessionMiddleware)","title":"Pure ASGI"},{"location":"api/#sessions","text":"Here are the functions you\u2019ll use most often from the library. They allow you to work with sessions directly from your asynchronous code.","title":"Sessions"},{"location":"api/#db_session","text":"async def db_session(connect: DBConnect) -> AsyncSession: The most important function for obtaining a session in your code. When called for the first time, it returns a new session; subsequent calls return the same session.","title":"db_session"},{"location":"api/#atomic_db_session","text":"@asynccontextmanager async def atomic_db_session( connect: DBConnect, current_transaction: Literal[\"commit\", \"rollback\", \"append\", \"raise\"] = \"commit\", ) -> AsyncGenerator[AsyncSession, None]: A context manager that can be used to wrap another function which uses a context session, making that call isolated within its own transaction. There are several options that define how the function will handle an already open transaction. current_transaction: - commit - commits the open transaction and starts a new one - rollback - rolls back the open transaction and starts a new one - append - continues using the current transaction and commits it - raise - raises an InvalidRequestError","title":"atomic_db_session"},{"location":"api/#commit_db_session","text":"async def commit_db_session(connect: DBConnect) -> None: Commits the active session, if there is one.","title":"commit_db_session"},{"location":"api/#rollback_db_session","text":"async def rollback_db_session(connect: DBConnect) -> None: Rollbacks the active session, if there is one.","title":"rollback_db_session"},{"location":"api/#close_db_session","text":"async def close_db_session(connect: DBConnect) -> None: Closes the current context session. The connection is returned to the pool. If you close an uncommitted transaction, the connection will be rolled back. This is useful if, for example, at the beginning of the handle a database query is needed, and then there is some other long-term work and you don't want to keep the connection opened.","title":"close_db_session"},{"location":"api/#new_non_ctx_session","text":"@asynccontextmanager async def new_non_ctx_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session without placing it in a context. It's used for manual session management when you don't want to use a context.","title":"new_non_ctx_session"},{"location":"api/#new_non_ctx_atomic_session","text":"@asynccontextmanager async def new_non_ctx_atomic_session( connect: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that allows you to create a new session with a new transaction that isn't placed in a context. It's used for manual session management when you don't want to use a context.","title":"new_non_ctx_atomic_session"},{"location":"api/#context","text":"","title":"Context"},{"location":"api/#run_in_new_ctx","text":"async def run_in_new_ctx( callable_func: AsyncCallable[AsyncCallableResult], *args: Any, **kwargs: Any, ) -> AsyncCallableResult: Runs a function in a new context with new sessions that have their own connection. It will commit the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction will be rolled back. The intended use is to run multiple database queries concurrently. example of use: await asyncio.gather( your_function_with_db_session(...), run_in_new_ctx( your_function_with_db_session, some_arg, some_kwarg=123, ), run_in_new_ctx(your_function_with_db_session, ...), )","title":"run_in_new_ctx"},{"location":"api/#testing","text":"You can read more about testing here: Testing","title":"Testing"},{"location":"api/#rollback_session","text":"@asynccontextmanager async def rollback_session( connection: DBConnect, ) -> AsyncGenerator[AsyncSession, None]: A context manager that creates a session which is automatically rolled back at the end. It\u2019s intended for use in fixtures to execute SQL queries during tests.","title":"rollback_session"},{"location":"api/#set_test_context","text":"@asynccontextmanager async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]: A context manager that creates a new context in which you can place a dedicated test session. It\u2019s intended for use in tests where the test and the application share a single transaction. Use auto_close=False if you\u2019re using a test session and transaction that you close manually elsewhere in your code. Use auto_close=True if, for example, you want to call a function in a test that uses a context bypassing the middleware, and you want all sessions to be closed automatically.","title":"set_test_context"},{"location":"api/#put_savepoint_session_in_ctx","text":"async def put_savepoint_session_in_ctx( connection: DBConnect, session: AsyncSession, ) -> AsyncGenerator[None, None]: Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection. It is important to use this function inside set_test_context.","title":"put_savepoint_session_in_ctx"},{"location":"concurrent_queries/","text":"Concurrent sql queries Concurrent query execution deserves special attention. In SQLAlchemy, you can\u2019t run multiple queries concurrently within the same session - you need to create a new one. The library provides two simple ways to execute queries concurrently: Run a function in a new context - run_in_new_ctx Create a new session that is completely independent of the current context - new_non_ctx_atomic_session or new_non_ctx_session import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commit the transaction optional await commit_db_session(connection) # manually close the session optional await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" While working with sessions, using context is optional \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" While working with sessions, using context is optional \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Concurrent SQL Queries"},{"location":"concurrent_queries/#concurrent-sql-queries","text":"Concurrent query execution deserves special attention. In SQLAlchemy, you can\u2019t run multiple queries concurrently within the same session - you need to create a new one. The library provides two simple ways to execute queries concurrently: Run a function in a new context - run_in_new_ctx Create a new session that is completely independent of the current context - new_non_ctx_atomic_session or new_non_ctx_session import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commit the transaction optional await commit_db_session(connection) # manually close the session optional await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Concurrent sql queries"},{"location":"examples/","text":"Usage examples You can see not only fragments of examples, but also web application examples . Basic usage from sqlalchemy import insert from context_async_sqlalchemy import db_session from ..database import connection from ..models import ExampleTable async def handler_with_db_session() -> None: \"\"\" An example of a typical handle that uses a context session to work with a database. Autocommit or autorollback occurs automatically at the end of a request (in middleware). \"\"\" # Created a session (no connection to the database yet) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # Commit will happen automatically Atomic from context_async_sqlalchemy import atomic_db_session, db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_atomic() -> None: \"\"\" Let's imagine you already have a function that works with a contextual session, and its use case calls autocommit at the end of the request. You want to reuse this function, but you need to commit immediately, rather than wait for the request to complete. \"\"\" # the transaction will be committed or rolled back automatically # using the context manager async with atomic_db_session(connection): await _insert_1() # This is a new transaction in the same connection await _insert_1() async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_atomic\" ) await session.execute(stmt) Manually close the transaction and session from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_close() -> None: \"\"\" but commits manually but commits manually and closes the session to release the connection. \"\"\" # new connect -> new transaction -> commit await _insert_1() # old connect -> new transaction -> commit -> close connect await _insert_2() # new connect -> new transaction await _insert_3() # same connect -> same transaction await _insert_3() # autocommit async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await session.commit() # or await commit_db_session() async def _insert_2() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await commit_db_session(connection) # We closed the session and returned the connection to the pool # This is useful if, for example, at the beginning of the handle a # database query is needed, and then there is some other long-term work # and you don't want to keep the connection opened. await close_db_session(connection) async def _insert_3() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) Multiple sessions and concurrent execution import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commit the transaction (optional) await commit_db_session(connection) # manually close the session (optional) await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit() Rollback from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_exception() -> None: \"\"\" let's imagine that an exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise Exception(\"Some exception\") # transaction rolls back automatically from fastapi import HTTPException from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_http_exception() -> None: \"\"\" let's imagine that an http exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise HTTPException(status_code=500) # transaction rolls back automatically by status code from context_async_sqlalchemy import db_session, rollback_db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_rollback() -> None: \"\"\" An example of a handle that uses a rollback \"\"\" # it's convenient this way await _insert() await rollback_db_session(connection) # but it's possible this way too await _insert() session = await db_session(connection) await session.rollback() async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Usage Examples"},{"location":"examples/#usage-examples","text":"You can see not only fragments of examples, but also web application examples .","title":"Usage examples"},{"location":"examples/#basic-usage","text":"from sqlalchemy import insert from context_async_sqlalchemy import db_session from ..database import connection from ..models import ExampleTable async def handler_with_db_session() -> None: \"\"\" An example of a typical handle that uses a context session to work with a database. Autocommit or autorollback occurs automatically at the end of a request (in middleware). \"\"\" # Created a session (no connection to the database yet) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # Commit will happen automatically","title":"Basic usage"},{"location":"examples/#atomic","text":"from context_async_sqlalchemy import atomic_db_session, db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_atomic() -> None: \"\"\" Let's imagine you already have a function that works with a contextual session, and its use case calls autocommit at the end of the request. You want to reuse this function, but you need to commit immediately, rather than wait for the request to complete. \"\"\" # the transaction will be committed or rolled back automatically # using the context manager async with atomic_db_session(connection): await _insert_1() # This is a new transaction in the same connection await _insert_1() async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_atomic\" ) await session.execute(stmt)","title":"Atomic"},{"location":"examples/#manually-close-the-transaction-and-session","text":"from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_close() -> None: \"\"\" An example of a handle that uses a session in context but commits manually and but commits manually and closes the session to release the connection. \"\"\" # new connect -> new transaction -> commit await _insert_1() # old connect -> new transaction -> commit -> close connect await _insert_2() # new connect -> new transaction await _insert_3() # same connect -> same transaction await _insert_3() # autocommit async def _insert_1() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await session.commit() # or await commit_db_session() async def _insert_2() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt) # We closed the transaction await commit_db_session(connection) # We closed the session returned the connection to the pool # This is useful if, for example, at the beginning of the handle a # database query is needed, and then there is some other long-term work # and you don't want to keep the connection opened. await close_db_session(connection) async def _insert_3() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Manually close the transaction and session"},{"location":"examples/#multiple-sessions-and-concurrent-execution","text":"import asyncio from context_async_sqlalchemy import ( close_db_session, commit_db_session, db_session, new_non_ctx_atomic_session, new_non_ctx_session, run_in_new_ctx, ) from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_multiple_sessions() -> None: \"\"\" You may need to run multiple sessions simultaneously. For example, to run several queries concurrently. You can also use these same techniques to create new sessions whenever you need them. Not necessarily just because of the concurrent processing. \"\"\" await asyncio.gather( _insert(), # context session run_in_new_ctx(_insert), # new context and session with autocommit run_in_new_ctx( # new context and session with manual commit _insert_manual, \"example_multiple_sessions\", ), _insert_non_ctx(), # new non context session _insert_non_ctx_manual(), # new non context session ) async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_manual(text: str) -> None: session = await db_session(connection) stmt = insert(ExampleTable).values(text=text) await session.execute(stmt) # manually commit the transaction (optional) await commit_db_session(connection) # manually close the session (optional)) await close_db_session(connection) async def _insert_non_ctx() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_atomic_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) async def _insert_non_ctx_manual() -> None: \"\"\" Using the context is option while working with sessions \"\"\" async with new_non_ctx_session(connection) as session: stmt = insert(ExampleTable).values(text=\"example_multiple_sessions\") await session.execute(stmt) await session.commit()","title":"Multiple sessions and concurrent execution"},{"location":"examples/#rollback","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_exception() -> None: \"\"\" let's imagine that an exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise Exception(\"Some exception\") # transaction rolls back automatically from fastapi import HTTPException from context_async_sqlalchemy import db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_http_exception() -> None: \"\"\" let's imagine that an http exception occurred. \"\"\" session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") await session.execute(stmt) raise HTTPException(status_code=500) # transaction rolls back automatically by status code from context_async_sqlalchemy import db_session, rollback_db_session from sqlalchemy import insert from ..database import connection from ..models import ExampleTable async def handler_with_db_session_and_manual_rollback() -> None: \"\"\" An example of a handle that uses a rollback \"\"\" # it's convenient this way await _insert() await rollback_db_session(connection) # but it's possible this way too await _insert() session = await db_session(connection) await session.rollback() async def _insert() -> None: session = await db_session(connection) stmt = insert(ExampleTable).values( text=\"example_with_db_session_and_manual_close\" ) await session.execute(stmt)","title":"Rollback"},{"location":"getting_started/","text":"Getting started Configure the connection to the database For example, for PostgreSQL - database.py: from sqlalchemy.ext.asyncio import ( async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine, ) from context_async_sqlalchemy import DBConnect def create_engine(host: str) -> AsyncEngine: \"\"\" database connection parameters. \"\"\" # In production code, you will probably take these parameters from env pg_user = \"krylosov-aa\" pg_password = \"\" pg_port = 6432 pg_db = \"test\" return create_async_engine( f\"postgresql+asyncpg://\" f\"{pg_user}:{pg_password}\" f\"@{host}:{pg_port}\" f\"/{pg_db}\", future=True, pool_pre_ping=True, ) def create_session_maker( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: \"\"\"session parameters\"\"\" return async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) connection = DBConnect( engine_creator=create_engine, session_maker_creator=create_session_maker, host=\"127.0.0.1\", # optional ) The host parameter is optional if you use a handler before creating a session - before_create_session_handler . In that case, you can dynamically set the host. Read more in Master/Replica or several databases at the same time Manage Database connection lifecycle Close resources at the end of your application's lifecycle. Example for FastAPI: from contextlib import asynccontextmanager from typing import Any, AsyncGenerator from fastapi import FastAPI from database import connection @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: \"\"\"Database connection lifecycle management\"\"\" yield await connection.close() # Close the engine if it was open Setup middleware Middleware handles the most important and complex part - managing context and sessions. You can use the ready-made middleware components: Pure ASGI from context_async_sqlalchemy import ASGIHTTPDBSessionMiddleware app.add_middleware(ASGIHTTPDBSessionMiddleware) FastAPI from context_async_sqlalchemy.fastapi_utils import ( add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app) Starlette from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app) Write own If there\u2019s no ready-made solution that fits your needs - don\u2019t worry! You can check out how it works and implement your own. Use it from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically Examples The repository includes an example integration with FastAPI, demonstrating various workflows: FastAPI example It also contains two types of test setups that you can use in your own projects. All library tests are included within the examples - because we aim to test the functionality not in isolation, but in the context of a real asynchronous web application. FastAPI tests example","title":"Getting Started"},{"location":"getting_started/#getting-started","text":"","title":"Getting started"},{"location":"getting_started/#configure-the-connection-to-the-database","text":"For example, for PostgreSQL - database.py: from sqlalchemy.ext.asyncio import ( async_sessionmaker, AsyncEngine, AsyncSession, create_async_engine, ) from context_async_sqlalchemy import DBConnect def create_engine(host: str) -> AsyncEngine: \"\"\" database connection parameters. \"\"\" # In production code, you will probably take these parameters from env pg_user = \"krylosov-aa\" pg_password = \"\" pg_port = 6432 pg_db = \"test\" return create_async_engine( f\"postgresql+asyncpg://\" f\"{pg_user}:{pg_password}\" f\"@{host}:{pg_port}\" f\"/{pg_db}\", future=True, pool_pre_ping=True, ) def create_session_maker( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: \"\"\"session parameters\"\"\" return async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) connection = DBConnect( engine_creator=create_engine, session_maker_creator=create_session_maker, host=\"127.0.0.1\", # optional ) The host parameter is optional if you use a handler before creating a session - before_create_session_handler . In that case, you can dynamically set the host. Read more in Master/Replica or several databases at the same time","title":"Configure the connection to the database"},{"location":"getting_started/#manage-database-connection-lifecycle","text":"Close resources at the end of your application's lifecycle. Example for FastAPI: from contextlib import asynccontextmanager from typing import Any, AsyncGenerator from fastapi import FastAPI from database import connection @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: \"\"\"Database connection lifecycle management\"\"\" yield await connection.close() # Close the engine if it was open","title":"Manage Database connection lifecycle"},{"location":"getting_started/#setup-middleware","text":"Middleware handles the most important and complex part - managing context and sessions. You can use the ready-made middleware components:","title":"Setup middleware"},{"location":"getting_started/#pure-asgi","text":"from context_async_sqlalchemy import ASGIHTTPDBSessionMiddleware app.add_middleware(ASGIHTTPDBSessionMiddleware)","title":"Pure ASGI"},{"location":"getting_started/#fastapi","text":"from context_async_sqlalchemy.fastapi_utils import ( add_fastapi_http_db_session_middleware, ) app = FastAPI(...) add_fastapi_http_db_session_middleware(app)","title":"FastAPI"},{"location":"getting_started/#starlette","text":"from context_async_sqlalchemy.starlette_utils import ( add_starlette_http_db_session_middleware, ) app = Starlette(...) add_starlette_http_db_session_middleware(app)","title":"Starlette"},{"location":"getting_started/#write-own","text":"If there\u2019s no ready-made solution that fits your needs - don\u2019t worry! You can check out how it works and implement your own.","title":"Write own"},{"location":"getting_started/#use-it","text":"from context_async_sqlalchemy import db_session from sqlalchemy import insert from database import connection # your configured connection to the database from models import ExampleTable # just some model for example async def some_func() -> None: # Created a session (no connection to the database yet) session = await db_session(connection) stmt = insert(ExampleTable).values(text=\"example_with_db_session\") # On the first request, a connection and transaction were opened await session.execute(stmt) # If you call db_session again, it will return the same session # even in child coroutines. session = await db_session(connection) # The second request will use the same connection and the same transaction await session.execute(stmt) # The commit and closing of the session will occur automatically","title":"Use it"},{"location":"getting_started/#examples","text":"The repository includes an example integration with FastAPI, demonstrating various workflows: FastAPI example It also contains two types of test setups that you can use in your own projects. All library tests are included within the examples - because we aim to test the functionality not in isolation, but in the context of a real asynchronous web application. FastAPI tests example","title":"Examples"},{"location":"how_middleware_works/","text":"How middleware works Most of the work - and the \u201cmagic\u201d - happens inside the middleware. Here is a diagram describing how it works. At the beginning of a request, the middleware initializes a new asynchronous context. This asynchronous context is implemented using a contextvar. It holds a mutable container that stores sessions. A mutable container is used so that any coroutine, at any level, can create, modify, or close sessions, and those changes will affect the execution of the entire request. Whenever your code accesses the library\u2019s functionality, it interacts with this container. Finally, the middleware checks the container for any active sessions and open transactions. If transactions are open, they are either committed when the query execution is successful or rolled back if it fails. After that, all sessions are closed. That\u2019s precisely why you can safely close transactions and sessions early. The middleware simply works with whatever is in the container: if there\u2019s anything left, it will close it properly; if you\u2019ve already handled it yourself, the middleware only needs to reset the context. Code example The library aims to provide ready-made solutions so that you don\u2019t have to worry about these details - but they\u2019re not always available. So, let\u2019s take a look at how Starlette middleware works. You can use this example as a reference to implement your own. CODE from starlette.middleware.base import ( # type: ignore[attr-defined] Request, Response, RequestResponseEndpoint, BaseHTTPMiddleware, ) from context_async_sqlalchemy import ( init_db_session_ctx, is_context_initiated, reset_db_session_ctx, auto_commit_by_status_code, rollback_all_sessions, ) async def starlette_http_db_session_middleware( request: Request, call_next: RequestResponseEndpoint ) -> Response: \"\"\" Database session lifecycle management. The session itself is created on demand in db_session(). Transaction auto-commit is implemented if there is no exception and the response status is < 400. Otherwise, a rollback is performed. But you can commit or rollback manually in the handler. \"\"\" # Tests have different session management rules # so if the context variable is already set, we do nothing if is_context_initiated(): return await call_next(request) # We set the context here, meaning all child coroutines will receive the # same context. And even if a child coroutine requests the # session first, the container itself is shared, and this coroutine will # add the session to container = shared context. token = init_db_session_ctx() try: response = await call_next(request) # using the status code, we decide to commit or rollback all sessions await auto_commit_by_status_code(response.status_code) return response except Exception: # If an exception occurs, we roll all sessions back await rollback_all_sessions() raise finally: # Close all sessions and clear the context await reset_db_session_ctx(token)","title":"How middleware works"},{"location":"how_middleware_works/#how-middleware-works","text":"Most of the work - and the \u201cmagic\u201d - happens inside the middleware. Here is a diagram describing how it works. At the beginning of a request, the middleware initializes a new asynchronous context. This asynchronous context is implemented using a contextvar. It holds a mutable container that stores sessions. A mutable container is used so that any coroutine, at any level, can create, modify, or close sessions, and those changes will affect the execution of the entire request. Whenever your code accesses the library\u2019s functionality, it interacts with this container. Finally, the middleware checks the container for any active sessions and open transactions. If transactions are open, they are either committed when the query execution is successful or rolled back if it fails. After that, all sessions are closed. That\u2019s precisely why you can safely close transactions and sessions early. The middleware simply works with whatever is in the container: if there\u2019s anything left, it will close it properly; if you\u2019ve already handled it yourself, the middleware only needs to reset the context.","title":"How middleware works"},{"location":"how_middleware_works/#code-example","text":"The library aims to provide ready-made solutions so that you don\u2019t have to worry about these details - but they\u2019re not always available. So, let\u2019s take a look at how Starlette middleware works. You can use this example as a reference to implement your own. CODE from starlette.middleware.base import ( # type: ignore[attr-defined] Request, Response, RequestResponseEndpoint, BaseHTTPMiddleware, ) from context_async_sqlalchemy import ( init_db_session_ctx, is_context_initiated, reset_db_session_ctx, auto_commit_by_status_code, rollback_all_sessions, ) async def starlette_http_db_session_middleware( request: Request, call_next: RequestResponseEndpoint ) -> Response: \"\"\" Database session lifecycle management. The session itself is created on demand in db_session(). Transaction auto-commit is implemented if there is no exception and the response status is < 400. Otherwise, a rollback is performed. But you can commit or rollback manually in the handler. \"\"\" # Tests have different session management rules # so if the context variable is already set, we do nothing if is_context_initiated(): return await call_next(request) # We set the context here, meaning all child coroutines will receive the # same context. And even if a child coroutine requests the # session first, the container itself is shared, and this coroutine will # add the session to container = shared context. token = init_db_session_ctx() try: response = await call_next(request) # using the status code, we decide to commit or rollback all sessions await auto_commit_by_status_code(response.status_code) return response except Exception: # If an exception occurs, we roll all sessions back await rollback_all_sessions() raise finally: # Close all sessions and clear the context await reset_db_session_ctx(token)","title":"Code example"},{"location":"master_replica/","text":"Master/Replica or several databases at the same time This is why db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously - for example, with both a master and a replica. DBConnect can also accept factory functions instead of ready-made objects, making it easy to switch hosts when needed. For example, libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. You\u2019ll need third-party functionality to determine which host is the master or the replica. Hopefully, I\u2019ll be able to provide a ready-made solution for this soon. . The engine is not created immediately when DBConnect is initialized - it is created only on the first request. The library uses lazy initialization in many places. from context_async_sqlalchemy import DBConnect from master_replica_helper import get_master, get_replica async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) master = DBConnect( ..., before_create_session_handler=renew_master_connect, ) async def renew_replica_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the replica has changed\"\"\" replica_host = await get_replica() if replica_host != connect.host: await connect.change_host(replica_host) replica = DBConnect( ..., before_create_session_handler=renew_replica_connect, )","title":"Master/Replica or several databases at the same time"},{"location":"master_replica/#masterreplica-or-several-databases-at-the-same-time","text":"This is why db_session and other functions accept a DBConnect instance as input. This approach allows you to work with multiple hosts simultaneously - for example, with both a master and a replica. DBConnect can also accept factory functions instead of ready-made objects, making it easy to switch hosts when needed. For example, libpq can detect the master and replica when creating an engine, but it only does this once - at creation time. The before_create_session_handler hook allows you to change the host at runtime if the master or replica changes. You\u2019ll need third-party functionality to determine which host is the master or the replica. Hopefully, I\u2019ll be able to provide a ready-made solution for this soon. . The engine is not created immediately when DBConnect is initialized - it is created only on the first request. The library uses lazy initialization in many places. from context_async_sqlalchemy import DBConnect from master_replica_helper import get_master, get_replica async def renew_master_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the master has changed\"\"\" master_host = await get_master() if master_host != connect.host: await connect.change_host(master_host) master = DBConnect( ..., before_create_session_handler=renew_master_connect, ) async def renew_replica_connect(connect: DBConnect) -> None: \"\"\"Updates the host if the replica has changed\"\"\" replica_host = await get_replica() if replica_host != connect.host: await connect.change_host(replica_host) replica = DBConnect( ..., before_create_session_handler=renew_replica_connect, )","title":"Master/Replica or several databases at the same time"},{"location":"problem/","text":"The Problem It Solves SQLAlchemy uses an engine that manages the connection pool. The engine must remain active for as long as the application is running, so it can quickly provide ready-to-use connections whenever the application needs them. In the application, we work with sessions. A session obtains a single connection from the pool and should have a short lifespan - usually lasting only for the duration of a single request, or even less. Let's see what existing solutions are available to manage sessions: Manual solution This is how the code ends up being duplicated, and two connections and two transactions are used - even though in many cases only one connection and one transaction are actually needed. @app.post(\"/users/\") async def create_user(name): await insert_user(name) await insert_user_profile(name) async def insert_user(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) async def insert_user_profile(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) You can move the duplicated code to a higher level, which will result in using a single connection and a single transaction. @app.post(\"/users/\") async def create_user(name:): async with get_async_session() as session: async with session.begin(): await insert_user(name, session) await insert_user_profile(name, session) async def insert_user(name, session): await session.execute(stmt) async def insert_user_profile(name, session): await session.execute(stmt) But if you look at it more broadly, the code duplication doesn\u2019t actually go away - you still have to do this in every handler. @app.post(\"/dogs/\") async def create_dog(name): async with get_async_session() as session: async with session.begin(): ... @app.post(\"/cats\") async def create_cat(name): async with get_async_session() as session: async with session.begin(): ... You also have to set everything up yourself. No ready-made integration solutions are used - which means freedom on one hand, but a lot of code on the other. Dependency You can use a dependency. For example, in FastAPI, it looks like this: async def get_atomic_session(): async with session_maker() as session: async with session.begin(): yield session @app.post(\"/dogs/\") async def create_dog(name, session=Depends(get_atomic_session)): ... @app.post(\"/cats/\") async def create_cat(name, session=Depends(get_atomic_session)): ... There are two problems here: You can\u2019t close the session or transaction prematurely, because the dependency is responsible for that. The session has to be passed all the way down the stack to the place where it\u2019s actually needed. By the way, there\u2019s no ready-made solution for integrating with the framework - you have to implement the dependency yourself. Wrappers over sqlalachemy There are various wrappers that often provide more convenient integration. Litestar, for example, has the same advantages and disadvantages as using dependencies: config = SQLAlchemyAsyncConfig( connection_string=URL ) sqlalchemy_plugin = SQLAlchemyInitPlugin(config) class UserRepository(SQLAlchemyAsyncRepository[User]): model_type = User @post(\"/users\") async def create_user(data: User, repo: UserRepository): await repo.add(data) # <- insert into User Here\u2019s an example using Ormar: class BaseMeta(ormar.ModelMeta): ... class User(ormar.Model): ... @app.post(\"/users/\") async def create_user(name): await User.objects.create(name=name) The main problem with wrappers is that they require developers to learn something new. They introduce their own syntax - so even if a developer is familiar with SQLAlchemy, it doesn\u2019t mean they\u2019ll understand the wrapper. Wrappers are also often designed for convenience when working with simple CRUD operations, but writing complex SQL queries with them can be very challenging. Solution And this library solves all of these issues: Easy integration with web frameworks Automatic management of engine, session, and transaction lifecycles Ability to manually close a session at any time, without waiting for the end of process Access to a session from the context only where it\u2019s actually needed","title":"The Problem It Solves"},{"location":"problem/#the-problem-it-solves","text":"SQLAlchemy uses an engine that manages the connection pool. The engine must remain active for as long as the application is running, so it can quickly provide ready-to-use connections whenever the application needs them. In the application, we work with sessions. A session obtains a single connection from the pool and should have a short lifespan - usually lasting only for the duration of a single request, or even less. Let's see what existing solutions are available to manage sessions:","title":"The Problem It Solves"},{"location":"problem/#manual-solution","text":"This is how the code ends up being duplicated, and two connections and two transactions are used - even though in many cases only one connection and one transaction are actually needed. @app.post(\"/users/\") async def create_user(name): await insert_user(name) await insert_user_profile(name) async def insert_user(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) async def insert_user_profile(name): async with get_async_session() as session: async with session.begin(): await session.execute(stmt) You can move the duplicated code to a higher level, which will result in using a single connection and a single transaction. @app.post(\"/users/\") async def create_user(name:): async with get_async_session() as session: async with session.begin(): await insert_user(name, session) await insert_user_profile(name, session) async def insert_user(name, session): await session.execute(stmt) async def insert_user_profile(name, session): await session.execute(stmt) But if you look at it more broadly, the code duplication doesn\u2019t actually go away - you still have to do this in every handler. @app.post(\"/dogs/\") async def create_dog(name): async with get_async_session() as session: async with session.begin(): ... @app.post(\"/cats\") async def create_cat(name): async with get_async_session() as session: async with session.begin(): ... You also have to set everything up yourself. No ready-made integration solutions are used - which means freedom on one hand, but a lot of code on the other.","title":"Manual solution"},{"location":"problem/#dependency","text":"You can use a dependency. For example, in FastAPI, it looks like this: async def get_atomic_session(): async with session_maker() as session: async with session.begin(): yield session @app.post(\"/dogs/\") async def create_dog(name, session=Depends(get_atomic_session)): ... @app.post(\"/cats/\") async def create_cat(name, session=Depends(get_atomic_session)): ... There are two problems here: You can\u2019t close the session or transaction prematurely, because the dependency is responsible for that. The session has to be passed all the way down the stack to the place where it\u2019s actually needed. By the way, there\u2019s no ready-made solution for integrating with the framework - you have to implement the dependency yourself.","title":"Dependency"},{"location":"problem/#wrappers-over-sqlalachemy","text":"There are various wrappers that often provide more convenient integration. Litestar, for example, has the same advantages and disadvantages as using dependencies: config = SQLAlchemyAsyncConfig( connection_string=URL ) sqlalchemy_plugin = SQLAlchemyInitPlugin(config) class UserRepository(SQLAlchemyAsyncRepository[User]): model_type = User @post(\"/users\") async def create_user(data: User, repo: UserRepository): await repo.add(data) # <- insert into User Here\u2019s an example using Ormar: class BaseMeta(ormar.ModelMeta): ... class User(ormar.Model): ... @app.post(\"/users/\") async def create_user(name): await User.objects.create(name=name) The main problem with wrappers is that they require developers to learn something new. They introduce their own syntax - so even if a developer is familiar with SQLAlchemy, it doesn\u2019t mean they\u2019ll understand the wrapper. Wrappers are also often designed for convenience when working with simple CRUD operations, but writing complex SQL queries with them can be very challenging.","title":"Wrappers over sqlalachemy"},{"location":"problem/#solution","text":"And this library solves all of these issues: Easy integration with web frameworks Automatic management of engine, session, and transaction lifecycles Ability to manually close a session at any time, without waiting for the end of process Access to a session from the context only where it\u2019s actually needed","title":"Solution"},{"location":"testing/","text":"Testing When testing with a real database, one important problem needs to be solved - ensuring data isolation between tests. There are basically two approaches: Separate sessions The test has its own session that it uses to prepare data and verify results after execution. The application also has its own session. Data isolation is achieved by clearing all tables at the end of each test (and once before running all tests). Shared session and transaction The test and the application share the same session and transaction. Data isolation is achieved by rolling back the transaction at the end of the test. Personally, I prefer the first option, because it is a more \"honest\" way to test the application. We can verify how it handles sessions and transactions on its own. It\u2019s also convenient to inspect the database state when a test is paused. Sometimes, there are complex session management scenarios (for example, concurrent query execution) where other types of testing are either impossible or very difficult. The main disadvantage of this approach is the slower execution speed. Since we clear all tables after each test, this process takes additional time. This is where the second approach comes in - its main advantage is speed, as rolling back a transaction is very fast. In my projects, I use both approaches at the same time: For most tests with simple or common logic, I use a shared transaction for the test and the application For more complex cases, or ones that cannot be tested this way, I use separate transactions. This combination allows for both good performance and convenient testing. The library provides several utilities that can be used in tests - for example, in fixtures. They help create tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback. You can see these capabilities in the examples: Here are tests with a common transaction between the application and the tests. And here's an example with different transactions. Create session with autorollback rollback_session creates a session that always rolls back automatically. from context_async_sqlalchemy.test_utils import rollback_session @pytest_asyncio.fixture async def db_session_test() -> AsyncGenerator[AsyncSession, None]: \"\"\"The session that is used inside the test\"\"\" async with rollback_session(connection) as session: yield session Override context set_test_context creates a new context put_savepoint_session_in_ctx puts into context a session that uses the same connection as db_session_test , but if you commit in this session, then the transaction will not be committed, but save point will be released from context_async_sqlalchemy.test_utils import ( put_savepoint_session_in_ctx, set_test_context, ) @pytest_asyncio.fixture(autouse=True) async def db_session_override( db_session_test: AsyncSession, ) -> AsyncGenerator[None, None]: \"\"\" The key thing about these tests is that we override the context in advance. The middleware has a special check that won't initialize the context if it already exists. \"\"\" async with set_test_context(): async with put_savepoint_session_in_ctx(connection, db_session_test): yield","title":"Testing"},{"location":"testing/#testing","text":"When testing with a real database, one important problem needs to be solved - ensuring data isolation between tests. There are basically two approaches: Separate sessions The test has its own session that it uses to prepare data and verify results after execution. The application also has its own session. Data isolation is achieved by clearing all tables at the end of each test (and once before running all tests). Shared session and transaction The test and the application share the same session and transaction. Data isolation is achieved by rolling back the transaction at the end of the test. Personally, I prefer the first option, because it is a more \"honest\" way to test the application. We can verify how it handles sessions and transactions on its own. It\u2019s also convenient to inspect the database state when a test is paused. Sometimes, there are complex session management scenarios (for example, concurrent query execution) where other types of testing are either impossible or very difficult. The main disadvantage of this approach is the slower execution speed. Since we clear all tables after each test, this process takes additional time. This is where the second approach comes in - its main advantage is speed, as rolling back a transaction is very fast. In my projects, I use both approaches at the same time: For most tests with simple or common logic, I use a shared transaction for the test and the application For more complex cases, or ones that cannot be tested this way, I use separate transactions. This combination allows for both good performance and convenient testing. The library provides several utilities that can be used in tests - for example, in fixtures. They help create tests that share a common transaction between the test and the application, so data isolation between tests is achieved through fast transaction rollback. You can see these capabilities in the examples: Here are tests with a common transaction between the application and the tests. And here's an example with different transactions.","title":"Testing"},{"location":"testing/#create-session-with-autorollback","text":"rollback_session creates a session that always rolls back automatically. from context_async_sqlalchemy.test_utils import rollback_session @pytest_asyncio.fixture async def db_session_test() -> AsyncGenerator[AsyncSession, None]: \"\"\"The session that is used inside the test\"\"\" async with rollback_session(connection) as session: yield session","title":"Create session with autorollback"},{"location":"testing/#override-context","text":"set_test_context creates a new context put_savepoint_session_in_ctx puts into context a session that uses the same connection as db_session_test , but if you commit in this session, then the transaction will not be committed, but save point will be released from context_async_sqlalchemy.test_utils import ( put_savepoint_session_in_ctx, set_test_context, ) @pytest_asyncio.fixture(autouse=True) async def db_session_override( db_session_test: AsyncSession, ) -> AsyncGenerator[None, None]: \"\"\" The key thing about these tests is that we override the context in advance. The middleware has a special check that won't initialize the context if it already exists. \"\"\" async with set_test_context(): async with put_savepoint_session_in_ctx(connection, db_session_test): yield","title":"Override context"}]}
\ No newline at end of file
diff --git a/docs_sources/docs/api.md b/docs_sources/docs/api.md
index f0bd0f9..e32b620 100644
--- a/docs_sources/docs/api.md
+++ b/docs_sources/docs/api.md
@@ -67,9 +67,9 @@ async def renew_master_connect(connect: DBConnect) -> None:
async def connect(self: DBConnect, host: str) -> None:
```
Establishes a connection to the specified host.
-This method doesn’t need to be called explicitly.
-If it isn’t called, the first session request will automatically
-establish the connection.
+It doesn’t need to be called explicitly.
+If you don't use the call, the first session request will automatically
+establishes the connection.
---
@@ -78,8 +78,7 @@ establish the connection.
```python
async def change_host(self: DBConnect, host: str) -> None:
```
-Establishes a connection to the specified host, but first
-checks under a lock that the currently connected host is different
+Establishes a connection to the specified host. It then validates that the currently connected host is different
from the target host.
---
@@ -89,10 +88,7 @@ from the target host.
```python
async def create_session(self: DBConnect) -> AsyncSession:
```
-Creates a new session. Used internally by the library -
-you’ll probably never need to call it directly, but it’s
-good to know it exists.
-
+Creates a new session. Used internally by the library. You may never need to call it directly.
---
### session_maker
@@ -177,7 +173,7 @@ app.add_middleware(ASGIHTTPDBSessionMiddleware)
## Sessions
-Here are the functions you’ll use most often from the library.
+Here are the library functions you will use most often.
They allow you to work with sessions directly from your asynchronous code.
### db_session
@@ -185,7 +181,7 @@ They allow you to work with sessions directly from your asynchronous code.
async def db_session(connect: DBConnect) -> AsyncSession:
```
The most important function for obtaining a session in your code.
-When called for the first time, it returns a new session; subsequent
+Returns a new session when you call it for the first time; subsequent
calls return the same session.
---
@@ -198,11 +194,11 @@ async def atomic_db_session(
current_transaction: Literal["commit", "rollback", "append", "raise"] = "commit",
) -> AsyncGenerator[AsyncSession, None]:
```
-A context manager that can be used to wrap another function which
+A context manager you can use to wrap another function which
uses a context session, making that call isolated within its own transaction.
There are several options that define how the function will handle
-an already open transaction.
+an open transaction.
current_transaction:
@@ -217,7 +213,7 @@ current_transaction:
```python
async def commit_db_session(connect: DBConnect) -> None:
```
-Commits the active session, if there is one.
+Commits the active session.
---
@@ -225,7 +221,7 @@ Commits the active session, if there is one.
```python
async def rollback_db_session(connect: DBConnect) -> None:
```
-Rollbacks the active session, if there is one.
+Rollbacks the active session.
---
@@ -234,11 +230,9 @@ Rollbacks the active session, if there is one.
async def close_db_session(connect: DBConnect) -> None:
```
Closes the current context session. The connection is returned to the pool.
-If you close an uncommitted transaction, the connection will be rolled back.
+If you close an uncommitted transaction, the connection rolls back.
-This is useful if, for example, at the beginning of the handle a
- database query is needed, and then there is some other long-term work
- and you don't want to keep the connection opened.
+Use if you have more work you need to complete without keeping the connection open.
---
diff --git a/docs_sources/docs/concurrent_queries.md b/docs_sources/docs/concurrent_queries.md
index 7104b9f..b729962 100644
--- a/docs_sources/docs/concurrent_queries.md
+++ b/docs_sources/docs/concurrent_queries.md
@@ -31,11 +31,10 @@ from ..models import ExampleTable
async def handler_multiple_sessions() -> None:
"""
- In some situations, you need to have multiple sessions running
- simultaneously. For example, to run several queries concurrently.
+ You may need to to run multiple sessions. For example, to run several queries concurrently.
- You can also use these same techniques to create new sessions whenever you
- need them. Not necessarily just because of the concurrent processing.
+ You can also use the same techniques to create new sessions whenever you
+ need them, ot necessarily because of the concurrent processing.
"""
await asyncio.gather(
_insert(), # context session
@@ -59,17 +58,16 @@ async def _insert_manual(text: str) -> None:
stmt = insert(ExampleTable).values(text=text)
await session.execute(stmt)
- # You can manually commit the transaction if you want, but it is not
- # necessary
+ # manually commits the transaction (optional)
await commit_db_session(connection)
- # You can manually close the session if you want, but it is not necessary
+ # manually closes the session (optional)
await close_db_session(connection)
async def _insert_non_ctx() -> None:
"""
- You don't have to use the context to work with sessions at all
+ Using context to work with sessions is optional.
"""
async with new_non_ctx_atomic_session(connection) as session:
stmt = insert(ExampleTable).values(text="example_multiple_sessions")
@@ -78,7 +76,7 @@ async def _insert_non_ctx() -> None:
async def _insert_non_ctx_manual() -> None:
"""
- You don't have to use the context to work with sessions at all
+ Using context to work with sessions is optional.
"""
async with new_non_ctx_session(connection) as session:
stmt = insert(ExampleTable).values(text="example_multiple_sessions")
diff --git a/docs_sources/docs/examples.md b/docs_sources/docs/examples.md
index f1dd4a8..7677e17 100644
--- a/docs_sources/docs/examples.md
+++ b/docs_sources/docs/examples.md
@@ -18,22 +18,22 @@ from ..models import ExampleTable
async def handler_with_db_session() -> None:
"""
- An example of a typical handle that uses a context session to work with
+ A typical handle that uses a context session to work with
a database.
Autocommit or autorollback occurs automatically at the end of a request
- (in middleware).
+ in middleware.
"""
- # Created a session (no connection to the database yet)
- # If you call db_session again, it will return the same session
+ # Creates a session with no connection to the database yet
+ # If you call db_session again, it returns the same session
# even in child coroutines.
session = await db_session(connection)
stmt = insert(ExampleTable).values(text="example_with_db_session")
- # On the first request, a connection and transaction were opened
+ # On the first request, a connection and transaction are opened
await session.execute(stmt)
- # Commit will happen automatically
+ # Commits automatically
```
### Atomic
@@ -48,17 +48,17 @@ from ..models import ExampleTable
async def handler_with_db_session_and_atomic() -> None:
"""
- Let's imagine you already have a function that works with a contextual
+ You have a function that works with a contextual
session, and its use case calls autocommit at the end of the request.
You want to reuse this function, but you need to commit immediately,
- rather than wait for the request to complete.
+ instead of wait for the request to complete.
"""
- # the transaction will be committed or rolled back automatically
+ # the transaction commits or rolls back automatically
# using the context manager
async with atomic_db_session(connection):
await _insert_1()
- # This is a new transaction in the same connection
+ # a new transaction in the same connection
await _insert_1()
@@ -87,8 +87,7 @@ from ..models import ExampleTable
async def handler_with_db_session_and_manual_close() -> None:
"""
An example of a handle that uses a session in context,
- but commits manually and even closes the session to release the
- connection.
+ but commits manually and closes the session to release the connection.
"""
# new connect -> new transaction -> commit
await _insert_1()
@@ -108,7 +107,7 @@ async def _insert_1() -> None:
)
await session.execute(stmt)
- # Here we closed the transaction
+ # We closed the transaction
await session.commit() # or await commit_db_session()
@@ -119,13 +118,11 @@ async def _insert_2() -> None:
)
await session.execute(stmt)
- # Here we closed the transaction
+ # We closed the transaction
await commit_db_session(connection)
- # And here we closed the session = returned the connection to the pool
- # This is useful if, for example, at the beginning of the handle a
- # database query is needed, and then there is some other long-term work
- # and you don't want to keep the connection opened.
+ # We closed the session and returned the connection to the pool.
+ # Use if you have more work you need to complete without keeping the connection open.
await close_db_session(connection)
@@ -158,11 +155,10 @@ from ..models import ExampleTable
async def handler_multiple_sessions() -> None:
"""
- In some situations, you need to have multiple sessions running
- simultaneously. For example, to run several queries concurrently.
+ You may need to to run multiple sessions. For example, to run several queries concurrently.
- You can also use these same techniques to create new sessions whenever you
- need them. Not necessarily just because of the concurrent processing.
+ You can also use the same techniques to create new sessions whenever you
+ need them, ot necessarily because of the concurrent processing.
"""
await asyncio.gather(
_insert(), # context session
@@ -186,17 +182,16 @@ async def _insert_manual(text: str) -> None:
stmt = insert(ExampleTable).values(text=text)
await session.execute(stmt)
- # You can manually commit the transaction if you want, but it is not
- # necessary
+ # manually commits the transaction (optional)
await commit_db_session(connection)
- # You can manually close the session if you want, but it is not necessary
+ # manually closes the session (optional)
await close_db_session(connection)
async def _insert_non_ctx() -> None:
"""
- You don't have to use the context to work with sessions at all
+ Using context to work with sessions is optional.
"""
async with new_non_ctx_atomic_session(connection) as session:
stmt = insert(ExampleTable).values(text="example_multiple_sessions")
@@ -205,7 +200,7 @@ async def _insert_non_ctx() -> None:
async def _insert_non_ctx_manual() -> None:
"""
- You don't have to use the context to work with sessions at all
+ Using context to work with sessions is optional.
"""
async with new_non_ctx_session(connection) as session:
stmt = insert(ExampleTable).values(text="example_multiple_sessions")
@@ -232,7 +227,7 @@ async def handler_with_db_session_and_exception() -> None:
await session.execute(stmt)
raise Exception("Some exception")
- # transaction will be automatically rolled back
+ # transaction rolls back automatically
```
```python
@@ -254,7 +249,7 @@ async def handler_with_db_session_and_http_exception() -> None:
await session.execute(stmt)
raise HTTPException(status_code=500)
- # transaction will be automatically rolled back by status code
+ # transaction rolls back automatically by status code
```
```python
diff --git a/docs_sources/docs/problem.md b/docs_sources/docs/problem.md
index b1a8da3..bd44870 100644
--- a/docs_sources/docs/problem.md
+++ b/docs_sources/docs/problem.md
@@ -171,6 +171,6 @@ And this library solves all of these issues:
- Easy integration with web frameworks
- Automatic management of engine, session, and transaction lifecycles
-- Ability to manually close a session at any time, without waiting for the
+- Manually close a session at any time without waiting for the
end of process
-- Access to a session from the context only where it’s actually needed
+- Access a session from the context only where it’s actually needed
diff --git a/examples/fastapi_example/routes/auto_rollback_by_exception.py b/examples/fastapi_example/routes/auto_rollback_by_exception.py
index 3a3c736..525873f 100644
--- a/examples/fastapi_example/routes/auto_rollback_by_exception.py
+++ b/examples/fastapi_example/routes/auto_rollback_by_exception.py
@@ -14,4 +14,4 @@ async def auto_rollback_by_exception() -> None:
await session.execute(stmt)
raise Exception("Some exception")
- # transaction will be automatically rolled back
+ # transaction rolls back automatically
\ No newline at end of file
diff --git a/examples/fastapi_example/routes/auto_rollback_by_status_code.py b/examples/fastapi_example/routes/auto_rollback_by_status_code.py
index 5eb83bf..baf081e 100644
--- a/examples/fastapi_example/routes/auto_rollback_by_status_code.py
+++ b/examples/fastapi_example/routes/auto_rollback_by_status_code.py
@@ -16,4 +16,4 @@ async def auto_rollback_by_status_code() -> None:
await session.execute(stmt)
raise HTTPException(status_code=500)
- # transaction will be automatically rolled back by status code
+ # transaction rolls back automatically by status code
diff --git a/examples/fastapi_example/routes/councurrent_queries.py b/examples/fastapi_example/routes/councurrent_queries.py
index 79ffa07..bb007b4 100644
--- a/examples/fastapi_example/routes/councurrent_queries.py
+++ b/examples/fastapi_example/routes/councurrent_queries.py
@@ -16,11 +16,10 @@
async def concurrent_queries() -> None:
"""
- In some situations, you need to have multiple sessions running
- simultaneously. For example, to run several queries concurrently.
+ You may need to to run multiple sessions multiple sessions
You can also use these same techniques to create new sessions whenever you
- need them. Not necessarily just because of the concurrent processing.
+ need themm, not necessarily because of the concurrent processing.
"""
await asyncio.gather(
_insert(), # context session
@@ -49,17 +48,16 @@ async def _insert_manual(text: str) -> None:
stmt = insert(ExampleTable).values(text=text)
await session.execute(stmt)
- # You can manually commit the transaction if you want, but it is not
- # necessary
+ # manually commit the transaction (optional)
await commit_db_session(connection)
- # You can manually close the session if you want, but it is not necessary
+ # manually close the session (optional)
await close_db_session(connection)
async def _insert_non_ctx() -> None:
"""
- You don't have to use the context to work with sessions at all
+ Using context to work with sessions is optional.
"""
async with new_non_ctx_atomic_session(connection) as session:
stmt = insert(ExampleTable)
@@ -68,7 +66,7 @@ async def _insert_non_ctx() -> None:
async def _insert_non_ctx_manual() -> None:
"""
- You don't have to use the context to work with sessions at all
+ Using context to work with sessions is optional.
"""
async with new_non_ctx_session(connection) as session:
stmt = insert(ExampleTable)
diff --git a/examples/fastapi_example/routes/early_commit.py b/examples/fastapi_example/routes/early_commit.py
index 181e8c1..e34c107 100644
--- a/examples/fastapi_example/routes/early_commit.py
+++ b/examples/fastapi_example/routes/early_commit.py
@@ -11,8 +11,8 @@
async def early_commit() -> None:
"""
- An example of a handle that uses a session in context,
- but commits manually and even closes the session to release the
+ A handle that uses a session in context,
+ but commits manually and the session to release the
connection.
"""
# new connect -> new transaction -> commit
@@ -31,7 +31,7 @@ async def _insert_1() -> None:
stmt = insert(ExampleTable)
await session.execute(stmt)
- # Here we closed the transaction
+ # We closed the transaction
await session.commit() # or await commit_db_session()
@@ -40,13 +40,11 @@ async def _insert_2() -> None:
stmt = insert(ExampleTable)
await session.execute(stmt)
- # Here we closed the transaction
+ # We closed the transaction
await commit_db_session(connection)
- # And here we closed the session = returned the connection to the pool
- # This is useful if, for example, at the beginning of the handle a
- # database query is needed, and then there is some other long-term work
- # and you don't want to keep the connection opened.
+ # We closed the session and returned the connection to the pool
+ # Use if you have more work you need to complete without keeping the connection open.
await close_db_session(connection)
diff --git a/examples/fastapi_with_pure_asgi_example/routes/auto_rollback_by_exception.py b/examples/fastapi_with_pure_asgi_example/routes/auto_rollback_by_exception.py
index 3a3c736..17bf774 100644
--- a/examples/fastapi_with_pure_asgi_example/routes/auto_rollback_by_exception.py
+++ b/examples/fastapi_with_pure_asgi_example/routes/auto_rollback_by_exception.py
@@ -14,4 +14,4 @@ async def auto_rollback_by_exception() -> None:
await session.execute(stmt)
raise Exception("Some exception")
- # transaction will be automatically rolled back
+ # transaction rolls back automatically
diff --git a/examples/fastapi_with_pure_asgi_example/routes/auto_rollback_by_status_code.py b/examples/fastapi_with_pure_asgi_example/routes/auto_rollback_by_status_code.py
index 230572e..d3a7a14 100644
--- a/examples/fastapi_with_pure_asgi_example/routes/auto_rollback_by_status_code.py
+++ b/examples/fastapi_with_pure_asgi_example/routes/auto_rollback_by_status_code.py
@@ -16,4 +16,4 @@ async def auto_rollback_by_status_code() -> None:
await session.execute(stmt)
raise HTTPException(status_code=500)
- # transaction will be automatically rolled back by status code
+ # transaction rolls back automatically by status code
diff --git a/examples/fastapi_with_pure_asgi_example/routes/councurrent_queries.py b/examples/fastapi_with_pure_asgi_example/routes/councurrent_queries.py
index 79ffa07..b8a21ac 100644
--- a/examples/fastapi_with_pure_asgi_example/routes/councurrent_queries.py
+++ b/examples/fastapi_with_pure_asgi_example/routes/councurrent_queries.py
@@ -16,11 +16,10 @@
async def concurrent_queries() -> None:
"""
- In some situations, you need to have multiple sessions running
- simultaneously. For example, to run several queries concurrently.
+ You may need to to run multiple sessions. For example, to run several queries concurrently.
- You can also use these same techniques to create new sessions whenever you
- need them. Not necessarily just because of the concurrent processing.
+ You can also use the same techniques to create new sessions whenever you
+ need them, ot necessarily because of the concurrent processing.
"""
await asyncio.gather(
_insert(), # context session
@@ -49,17 +48,16 @@ async def _insert_manual(text: str) -> None:
stmt = insert(ExampleTable).values(text=text)
await session.execute(stmt)
- # You can manually commit the transaction if you want, but it is not
- # necessary
+ # commits the transaction (optional)
await commit_db_session(connection)
- # You can manually close the session if you want, but it is not necessary
+ # manually closes session (optional)
await close_db_session(connection)
async def _insert_non_ctx() -> None:
"""
- You don't have to use the context to work with sessions at all
+ Using context to work with sessions is optional.
"""
async with new_non_ctx_atomic_session(connection) as session:
stmt = insert(ExampleTable)
@@ -68,7 +66,7 @@ async def _insert_non_ctx() -> None:
async def _insert_non_ctx_manual() -> None:
"""
- You don't have to use the context to work with sessions at all
+ Using context to work with sessions is optional.
"""
async with new_non_ctx_session(connection) as session:
stmt = insert(ExampleTable)
diff --git a/examples/fastapi_with_pure_asgi_example/routes/early_commit.py b/examples/fastapi_with_pure_asgi_example/routes/early_commit.py
index 181e8c1..0661e23 100644
--- a/examples/fastapi_with_pure_asgi_example/routes/early_commit.py
+++ b/examples/fastapi_with_pure_asgi_example/routes/early_commit.py
@@ -11,8 +11,8 @@
async def early_commit() -> None:
"""
- An example of a handle that uses a session in context,
- but commits manually and even closes the session to release the
+ A handle that uses a session in context,
+ but commits manually and closes the session to release the
connection.
"""
# new connect -> new transaction -> commit
@@ -31,7 +31,7 @@ async def _insert_1() -> None:
stmt = insert(ExampleTable)
await session.execute(stmt)
- # Here we closed the transaction
+ # We closed the transaction
await session.commit() # or await commit_db_session()
@@ -40,13 +40,11 @@ async def _insert_2() -> None:
stmt = insert(ExampleTable)
await session.execute(stmt)
- # Here we closed the transaction
+ # We closed the transaction
await commit_db_session(connection)
- # And here we closed the session = returned the connection to the pool
- # This is useful if, for example, at the beginning of the handle a
- # database query is needed, and then there is some other long-term work
- # and you don't want to keep the connection opened.
+ # We closed the session and returned the connection to the pool
+ # Use if you have more work you need to complete without keeping the connection open.
await close_db_session(connection)
diff --git a/examples/starlette_example/routes/auto_rollback_by_exception.py b/examples/starlette_example/routes/auto_rollback_by_exception.py
index a5ab35b..58061f5 100644
--- a/examples/starlette_example/routes/auto_rollback_by_exception.py
+++ b/examples/starlette_example/routes/auto_rollback_by_exception.py
@@ -15,4 +15,4 @@ async def auto_rollback_by_exception(_: Request) -> None:
await session.execute(stmt)
raise Exception("Some exception")
- # transaction will be automatically rolled back
+ # transaction rolls back automatically
diff --git a/examples/starlette_example/routes/auto_rollback_by_status_code.py b/examples/starlette_example/routes/auto_rollback_by_status_code.py
index 15eee5b..ab60b6c 100644
--- a/examples/starlette_example/routes/auto_rollback_by_status_code.py
+++ b/examples/starlette_example/routes/auto_rollback_by_status_code.py
@@ -17,4 +17,4 @@ async def auto_rollback_by_status_code(_: Request) -> None:
await session.execute(stmt)
raise HTTPException(status_code=500)
- # transaction will be automatically rolled back by status code
+ # transaction rolls back automatically by status code
diff --git a/examples/starlette_example/routes/councurrent_queries.py b/examples/starlette_example/routes/councurrent_queries.py
index a107995..5d04f81 100644
--- a/examples/starlette_example/routes/councurrent_queries.py
+++ b/examples/starlette_example/routes/councurrent_queries.py
@@ -17,8 +17,7 @@
async def concurrent_queries(_: Request) -> JSONResponse:
"""
- In some situations, you need to have multiple sessions running
- simultaneously. For example, to run several queries concurrently.
+ You may need to run multiple sessions simultaneously For example, to run several queries concurrently.
You can also use these same techniques to create new sessions whenever you
need them. Not necessarily just because of the concurrent processing.
@@ -51,17 +50,16 @@ async def _insert_manual(text: str) -> None:
stmt = insert(ExampleTable).values(text=text)
await session.execute(stmt)
- # You can manually commit the transaction if you want, but it is not
- # necessary
+ # commits the transaction (optional)
await commit_db_session(connection)
- # You can manually close the session if you want, but it is not necessary
+ # manually closes the session (optional)
await close_db_session(connection)
async def _insert_non_ctx() -> None:
"""
- You don't have to use the context to work with sessions at all
+ While working with sessions, using context is optional
"""
async with new_non_ctx_atomic_session(connection) as session:
stmt = insert(ExampleTable)
@@ -70,7 +68,7 @@ async def _insert_non_ctx() -> None:
async def _insert_non_ctx_manual() -> None:
"""
- You don't have to use the context to work with sessions at all
+ While working with sessions, using context is optional
"""
async with new_non_ctx_session(connection) as session:
stmt = insert(ExampleTable)
diff --git a/examples/starlette_example/routes/early_commit.py b/examples/starlette_example/routes/early_commit.py
index 06d99da..1baf17d 100644
--- a/examples/starlette_example/routes/early_commit.py
+++ b/examples/starlette_example/routes/early_commit.py
@@ -13,8 +13,8 @@
async def early_commit(_: Request) -> JSONResponse:
"""
- An example of a handle that uses a session in context,
- but commits manually and even closes the session to release the
+ A handle that uses a session in context,
+ but commits manually and closes the session to release the
connection.
"""
# new connect -> new transaction -> commit
@@ -35,7 +35,7 @@ async def _insert_1() -> None:
stmt = insert(ExampleTable)
await session.execute(stmt)
- # Here we closed the transaction
+ # We closed the transaction
await session.commit() # or await commit_db_session()
@@ -44,13 +44,11 @@ async def _insert_2() -> None:
stmt = insert(ExampleTable)
await session.execute(stmt)
- # Here we closed the transaction
+ # We closed the transaction
await commit_db_session(connection)
- # And here we closed the session = returned the connection to the pool
- # This is useful if, for example, at the beginning of the handle a
- # database query is needed, and then there is some other long-term work
- # and you don't want to keep the connection opened.
+ # We closed the session and returned the connection to the pool
+ # Use if you have more work you need to complete without keeping the connection open.
await close_db_session(connection)