Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,5 @@ cython_debug/
.DS_Store
.claude/
CLAUDE.md
AGENTS.md
AGENTS.md
.vscode
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ results = conn.query("search index=_internal | head 5")
pytest -m integration
```


### Manual Testing
- Located in dev_env
- Do not use pytest or mocking
- run the docker-compose.yaml to stand up services, change directories into the folder in dev_env you wish to test
- execute the following to test your module
```
poetry run python <your test file here>.py
```

### 🧼 Suppress warnings

Add this to `pytest.ini`:
Expand Down
32 changes: 32 additions & 0 deletions dev_env/postgres/test_asyncpostgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import asyncio
from pyapiary.dbms_connectors.postgres import AsyncPostgresConnector
from pyapiary.helpers import combine_env_configs, setup_logger
from typing import Dict, Any

async def test_async_db():
# Load config and setup logging
env_config: Dict[str, Any] = combine_env_configs()
logger = setup_logger("pg logger")

# Use 'async with' to handle the background worker threads and connection pool
async with AsyncPostgresConnector(conn_str=env_config["PGSQL_DSN"], logger=logger) as conn:

logger.info("inserting one row (async)")
# Await the execution of the insert
await conn.async_bulk_insert("employees", [{"name": "rob", "department": "hr"}])

base_query = "SELECT * FROM employees"
logger.info("Querying with pagination (async):")

# Await the coroutine to get the actual list of results
rows = await conn.async_query(base_query)

# Standard loop through the returned list
for i, row in enumerate(rows):
print(row)
if i >= 9:
break

if __name__ == "__main__":
# Standard entry point for async scripts
asyncio.run(test_async_db())
20 changes: 20 additions & 0 deletions dev_env/postgres/test_postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pyapiary.dbms_connectors.postgres import PostgresConnector, AsyncPostgresConnector
from pyapiary.helpers import combine_env_configs, setup_logger
from typing import Dict, Any

env_config: Dict[str, Any] = combine_env_configs()

logger = setup_logger("pg logger")
with PostgresConnector(conn_str=env_config["PGSQL_DSN"], logger=logger) as conn:

# Optional insert test
logger.info("inserting one row")

conn.bulk_insert("employees", [{"name": "rob", "department": "hr"}])
base_query = "SELECT * FROM employees"

logger.info("Querying with pagination:")
for i, row in enumerate(conn.query(base_query)):
print(row)
if i >= 9:
break
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ services:
- ./dev_env/splunk/init_app.sh:/init_app.sh
entrypoint: [ "/bin/bash", "-c", "/init_app.sh & /sbin/entrypoint.sh start-service" ]

# both PG and ODBC connector use this data source, this is why dev_env/postgres does not have an init.sql
odbc_db:
image: postgres:15-alpine
container_name: odbc_db
Expand Down
869 changes: 513 additions & 356 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ splunk-sdk = "^2.1.1"
httpx = "^0.28.1"
tenacity = "^9.1.2"
pyodbc = "^5.3.0"
psycopg-pool = {extras = ["binary", "pool"], version = "^3.3.1"}
psycopg = {extras = ["pool"], version = "^3.3.4"}
psycopg-binary = "^3.3.4"

[tool.poetry.extras]
odbc = ["pyodbc"]
Expand Down
104 changes: 104 additions & 0 deletions src/pyapiary/dbms_connectors/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import List, Dict, Any, Optional, Generator, Type, Union
from types import TracebackType
from pyapiary.helpers import setup_logger
from psycopg_pool import AsyncConnectionPool, ConnectionPool

class PostgresConnector:
def __init__(self,conn_str, logger=None, min_size=5, max_size=30):
self.dsn = conn_str
self.min_size = min_size
self.max_size = max_size
self.connection_pool = ConnectionPool(self.dsn, kwargs={"autocommit":True}, min_size=self.min_size, max_size=self.max_size)
self.logger = logger if logger else setup_logger(__name__)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

def close(self):
"""Close the PG connection."""
if self.connection_pool:
self.connection_pool.close()
self._log("PG connection closed")

def _log(self, msg: str, level: str = "info"):
if self.logger:
log_method = getattr(self.logger, level, self.logger.info)
log_method(msg)

def query(self, query: str, params=None):
"""
query - string query representing the work the user wants done
params - must be legal for psycopog_pool AsyncConnectionPool object
https://www.psycopg.org/psycopg3/docs/api/pool.html#module-psycopg_pool
"""
with self.connection_pool.connection() as conn:
with conn.transaction():
# claude recommended a transaction wrapper here
return conn.execute(query, params).fetchall()

def bulk_insert(self, table: str, data: List[Dict[str, Any]]):
if not data:
return

self._log(f"Inserting {len(data)} rows into table {table}")

columns = list(data[0].keys())
copy_query = f"COPY {table} ({', '.join(columns)}) FROM STDIN"

with self.connection_pool.connection() as conn:
with conn.cursor() as cur:
with cur.copy(copy_query) as copy:
for row in data:
copy.write_row(tuple(row[col] for col in columns))
# Note: pool is configured for autocommit, commit will happen before the with block ends

# Async Version
## Need to write an async_bulk_insert
class AsyncPostgresConnector:
def __init__(self,conn_str, min_size=5, max_size=30, logger=None):
self.dsn = conn_str
self.min_size = min_size
self.max_size = max_size
self.connection_pool = AsyncConnectionPool(self.dsn, kwargs={"autocommit":True}, min_size=self.min_size, max_size=self.max_size, open=False)
self.logger = logger if logger else setup_logger(__name__)

async def __aenter__(self):
# for async with calls
await self.connection_pool.open()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
# for async with calls
await self.connection_pool.close()

async def async_query(self, query: str, params=None):
"""
query - string query representing the work the user wants done
params - must be legal for psycopog_pool AsyncConnectionPool object
https://www.psycopg.org/psycopg3/docs/api/pool.html#module-psycopg_pool
"""
async with self.connection_pool.connection() as conn:
cur = await conn.execute(query, params)
return await cur.fetchall()

async def async_bulk_insert(self, table_name: str, data: List[Dict[str, Any]]):
if not data:
return

columns = list(data[0].keys())

# Google recommended using an cursor.copy command here to process the dict, better performance over a high volume of rows, more efficient
# than the odbc write/execute_many paradigm.
async with self.connection_pool.connection() as aconn:
async with aconn.cursor() as acur:
# using COPY is the most performative for millions of rows
copy_query = f"COPY {table_name} ({', '.join(columns)}) FROM STDIN"

async with acur.copy(copy_query) as copy:
for record in data:
row = tuple(record[col] for col in columns)
await copy.write_row(row)
# Note: since asyncpool passes autocommit kwarg, commits will happen before the with block ends
Empty file.
Loading
Loading