Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ jobs:
with:
enable-cache: true
# Keep in sync with pyproject.toml's `tool.uv.required-version`.
version: "0.8.13"
version: "0.8.22"
# Keep in sync with pyproject.toml's `project.requires-python`.
- run: uv python install 3.10
- run: uv python install 3.11
- run: uv sync --locked
- run: uv run -m skeleton
- run: uv run ruff format --check
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Keep Python version in sync with:
# - pyproject.toml's `project.requires-python`.
# - the main stage below.
FROM ghcr.io/astral-sh/uv:0.8.13-python3.10-bookworm-slim AS builder
FROM ghcr.io/astral-sh/uv:0.8.22-python3.11-bookworm-slim AS builder

ENV UV_COMPILE_BYTECODE=1 UV_LINK_MODE=copy

Expand All @@ -21,7 +21,7 @@ RUN --mount=type=bind,source=skeleton,target=skeleton_tmp \
rm -r app

# Keep this synced with the `builder` stage above.
FROM python:3.10-slim-bookworm
FROM python:3.11-slim-bookworm

COPY --from=builder /venv app

Expand Down
6 changes: 3 additions & 3 deletions app/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import asyncio
from asyncio import run, to_thread
from urllib.parse import urlparse

from . import Config, start_app
Expand All @@ -8,7 +8,7 @@ async def main() -> None:
async with start_app(config=Config()) as session:
port = urlparse(session.url).port or 80
print(f"Session listening on port {port}") # noqa: T201
await asyncio.to_thread(session.wait)
await to_thread(session.wait)


asyncio.run(main())
run(main())
10 changes: 5 additions & 5 deletions app/create_and_join_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def create_station_status_table(session: tt.Session, /) -> None:
)


def create_station_details_table(session: tt.Session, /) -> None:
skeleton = Skeleton.tables.STATION_DETAILS
def create_station_information_table(session: tt.Session, /) -> None:
skeleton = Skeleton.tables.STATION_INFORMATION
session.create_table(
skeleton.name,
data_types={
Expand All @@ -46,13 +46,13 @@ def create_station_details_table(session: tt.Session, /) -> None:
def join_tables(session: tt.Session, /) -> None:
skeleton = Skeleton.tables
session.tables[skeleton.STATION_STATUS.name].join(
session.tables[skeleton.STATION_DETAILS.name],
session.tables[skeleton.STATION_INFORMATION.name],
column(session, skeleton.STATION_STATUS.STATION_ID)
== column(session, skeleton.STATION_DETAILS.ID),
== column(session, skeleton.STATION_INFORMATION.ID),
)


def create_and_join_tables(session: tt.Session, /) -> None:
create_station_status_table(session)
create_station_details_table(session)
create_station_information_table(session)
join_tables(session)
22 changes: 11 additions & 11 deletions app/create_cubes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ def create_station_cube(session: tt.Session, /) -> None:
[
fact_based_hierarchy(
session,
skeleton.dimensions.STATION_DETAILS.LOCATION,
skeleton.dimensions.STATION_INFORMATION.LOCATION,
lambda hierarchy: {
hierarchy.DEPARTMENT: Skeleton.tables.STATION_DETAILS.DEPARTMENT,
hierarchy.CITY: Skeleton.tables.STATION_DETAILS.CITY,
hierarchy.POSTCODE: Skeleton.tables.STATION_DETAILS.POSTCODE,
hierarchy.STREET: Skeleton.tables.STATION_DETAILS.STREET,
hierarchy.HOUSE_NUMBER: Skeleton.tables.STATION_DETAILS.HOUSE_NUMBER,
hierarchy.DEPARTMENT: Skeleton.tables.STATION_INFORMATION.DEPARTMENT,
hierarchy.CITY: Skeleton.tables.STATION_INFORMATION.CITY,
hierarchy.POSTCODE: Skeleton.tables.STATION_INFORMATION.POSTCODE,
hierarchy.STREET: Skeleton.tables.STATION_INFORMATION.STREET,
hierarchy.HOUSE_NUMBER: Skeleton.tables.STATION_INFORMATION.HOUSE_NUMBER,
},
),
fact_based_hierarchy(
session,
skeleton.dimensions.STATION_DETAILS.STATION,
skeleton.dimensions.STATION_INFORMATION.STATION,
lambda hierarchy: {
hierarchy.NAME: Skeleton.tables.STATION_DETAILS.NAME,
hierarchy.ID: Skeleton.tables.STATION_DETAILS.ID,
hierarchy.NAME: Skeleton.tables.STATION_INFORMATION.NAME,
hierarchy.ID: Skeleton.tables.STATION_INFORMATION.ID,
},
),
fact_based_hierarchy(
Expand All @@ -51,10 +51,10 @@ def create_station_cube(session: tt.Session, /) -> None:
)
m[skeleton.measures.CAPACITY.name] = tt.agg.sum(
tt.agg.single_value(
column(session, Skeleton.tables.STATION_DETAILS.CAPACITY)
column(session, Skeleton.tables.STATION_INFORMATION.CAPACITY)
),
scope=tt.OriginScope(
{l[skeleton.dimensions.STATION_DETAILS.STATION.ID.key]}
{l[skeleton.dimensions.STATION_INFORMATION.STATION.ID.key]}
),
)

Expand Down
70 changes: 39 additions & 31 deletions app/load_tables.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from asyncio import gather, to_thread
from collections.abc import Iterable, Mapping
from logging import getLogger
from pathlib import Path
from typing import Any, cast

Expand All @@ -13,24 +14,25 @@
from .skeleton import Skeleton
from .util import read_json, reverse_geocode

_LOGGER = getLogger(__name__)

async def read_station_details(

async def read_station_information(
*,
http_client: httpx.AsyncClient,
reverse_geocoding_path: HttpUrl | Path,
velib_data_base_path: HttpUrl | Path,
) -> pd.DataFrame:
skeleton = Skeleton.tables.STATION_DETAILS
skeleton = Skeleton.tables.STATION_INFORMATION

stations_data: Any = cast(
Any,
await read_json(
velib_data_base_path,
Path("station_information.json"),
http_client=http_client,
),
)["data"]["stations"]
station_information_df = pd.DataFrame(stations_data)[ # ty: ignore[no-matching-overload]
station_information_data = await read_json(
velib_data_base_path,
Path("station_information.json"),
http_client=http_client,
)
station_information_data = cast(Any, station_information_data)["data"]["stations"]

station_information_df = pd.DataFrame(station_information_data)[ # ty: ignore[no-matching-overload]
["station_id", "name", "capacity", "lat", "lon"]
].rename(
columns={
Expand All @@ -51,9 +53,12 @@ async def read_station_details(
),
)

reverse_geocoded_df = reverse_geocode(
coordinates, reverse_geocoding_path=reverse_geocoding_path
).rename(
reverse_geocoded_df = await reverse_geocode(
coordinates,
http_client=http_client,
reverse_geocoding_path=reverse_geocoding_path,
)
reverse_geocoded_df = reverse_geocoded_df.rename(
columns={
"department": skeleton.DEPARTMENT.name,
"city": skeleton.CITY.name,
Expand All @@ -76,16 +81,15 @@ async def read_station_status(
) -> pd.DataFrame:
skeleton = Skeleton.tables.STATION_STATUS

stations_data = cast(
Any,
await read_json(
velib_data_base_path,
Path("station_status.json"),
http_client=http_client,
),
)["data"]["stations"]
stations_status_data = await read_json(
velib_data_base_path,
Path("station_status.json"),
http_client=http_client,
)
stations_status_data = cast(Any, stations_status_data)["data"]["stations"]

station_statuses: list[Mapping[str, Any]] = []
for station_status in stations_data:
for station_status in stations_status_data:
for num_bikes_available_types in station_status["num_bikes_available_types"]:
if len(num_bikes_available_types) != 1:
raise ValueError(
Expand All @@ -109,6 +113,7 @@ async def load_tables(
config: Config,
http_client: httpx.AsyncClient,
) -> None:
_LOGGER.info("Loading tables.")
if config.data_refresh_period is None:
reverse_geocoding_path: HttpUrl | FilePath = (
RESOURCES_DIRECTORY / "station_location.csv"
Expand All @@ -122,8 +127,8 @@ async def load_tables(
"https://velib-metropole-opendata.smovengo.cloud/opendata/Velib_Metropole"
)

station_details_df, station_status_df = await asyncio.gather(
read_station_details(
station_information_df, station_status_df = await gather(
read_station_information(
http_client=http_client,
reverse_geocoding_path=reverse_geocoding_path,
velib_data_base_path=velib_data_base_path,
Expand All @@ -138,11 +143,14 @@ async def load_tables(
tt.mapping_lookup(check=config.check_mapping_lookups),
session.tables.data_transaction(),
):
await asyncio.gather(
session.tables[Skeleton.tables.STATION_DETAILS.name].load_async(
station_details_df
await gather(
to_thread(
session.tables[Skeleton.tables.STATION_INFORMATION.name].load,
station_information_df,
),
session.tables[Skeleton.tables.STATION_STATUS.name].load_async(
station_status_df
to_thread(
session.tables[Skeleton.tables.STATION_STATUS.name].load,
station_status_df,
),
)
_LOGGER.info("Tables loaded.")
25 changes: 20 additions & 5 deletions app/start_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import sys
from asyncio import to_thread
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from logging import getLogger
from pathlib import Path

import atoti as tt
Expand All @@ -12,8 +14,10 @@
from .create_cubes import create_cubes
from .load_tables import load_tables

_LOGGER = getLogger(__name__)

def get_session_config(config: Config, /) -> tt.SessionConfig:

def _get_session_config(config: Config, /) -> tt.SessionConfig:
user_content_storage: Path | UserContentStorageConfig | None = None

if config.user_content_storage is not None:
Expand All @@ -30,17 +34,28 @@ def get_session_config(config: Config, /) -> tt.SessionConfig:
)


def _create_data_model(session: tt.Session, /) -> None:
create_and_join_tables(session)
create_cubes(session)


@asynccontextmanager
async def start_session(
*,
config: Config,
http_client: httpx.AsyncClient,
) -> AsyncGenerator[tt.Session]:
"""Start the session, declare the data model and load the initial data."""
session_config = get_session_config(config)
with tt.Session.start(session_config) as session:
session_config = _get_session_config(config)
_LOGGER.info("Starting Atoti Session.")
session = await to_thread(tt.Session.start, session_config)
_LOGGER.info("Atoti Session started.")
try:
with tt.mapping_lookup(check=config.check_mapping_lookups):
create_and_join_tables(session)
create_cubes(session)
_LOGGER.info("Creating data model.")
await to_thread(_create_data_model, session)
_LOGGER.info("Data model created.")
await load_tables(session, config=config, http_client=http_client)
yield session
finally:
await to_thread(session.close)
14 changes: 9 additions & 5 deletions app/util/read_json.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
from asyncio import to_thread
from pathlib import Path

import anyio
import httpx
from pydantic import HttpUrl

Expand All @@ -13,8 +15,10 @@ async def read_json(
http_client: httpx.AsyncClient,
) -> object:
if isinstance(base_path, Path):
return json.loads((base_path / file_path).read_bytes())

url = f"{base_path}/{file_path.as_posix()}"
response = await http_client.get(url)
return response.raise_for_status().json()
json_bytes = await anyio.Path(base_path / file_path).read_bytes()
else:
url = f"{base_path}/{file_path.as_posix()}"
response = await http_client.get(url)
response.raise_for_status()
json_bytes = await response.aread()
return await to_thread(json.loads, json_bytes)
Loading
Loading