Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ jobs:
docker-services: |
zoo
minio
pytest-arguments: --ibis --pdtransform --no-postgres
pytest-arguments: --ibis --pdtransform --no-postgres --no-lock_tests

s3_test:
name: S3 Tests
Expand Down
5 changes: 4 additions & 1 deletion docs/source/changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changelog

## 0.12.1 (2025-10-DD)
## 0.12.2 (2025-10-DD)
- Support datetime.time and timedelta as task input and output (or anywhere where JSON serialization is needed).

## 0.12.1 (2025-10-10)
- Create all metadata tables even if some metadata tables already exist.
This fixes problems with conditional need for sync_views table.
- Make table hooks work even without ConfigContext (see example_mssql/download_parquet_files.py)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ IBM DB2

.. autoclass:: pydiverse.pipedag.backend.table.sql.dialects.IBMDB2TableStore
.. autoclass:: pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2MaterializationDetails
.. autoclass:: pydiverse.pipedag.backend.table.sql.dialects.ibm_db2::IBMDB2CompressionTypes
.. autoclass:: pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2CompressionTypes
:members:
:undoc-members:

Expand Down
19 changes: 19 additions & 0 deletions src/pydiverse/pipedag/util/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Type(str, Enum):
PATHLIB_PATH = "pathlib:path"
DT_DATE = "dt:date"
DT_DATETIME = "dt:datetime"
DT_TIME = "dt:time"
DT_TIMEDELTA = "dt:timedelta"

def __str__(self):
return self.value
Expand Down Expand Up @@ -142,6 +144,19 @@ def json_default(o):
TYPE_KEY: Type.DT_DATE,
"date": o.isoformat(),
}
if isinstance(o, dt.time):
return {
TYPE_KEY: Type.DT_TIME,
"time": o.isoformat(),
}
if isinstance(o, dt.timedelta):
return {
TYPE_KEY: Type.DT_TIMEDELTA,
"days": o.days,
"seconds": o.seconds,
"microseconds": o.microseconds,
# docstring of timedelta says: Representation: (days, seconds, microseconds).
}
if get_origin(o) is not None:
# must be GenericAlias
# somehow isinstance(o, GenericAlias) did not work reliably
Expand Down Expand Up @@ -270,6 +285,10 @@ def get_stage(name: str | None):
return Path(d["path"])
if type_ == Type.DT_DATE:
return dt.date.fromisoformat(d["date"])
if type_ == Type.DT_TIME:
return dt.time.fromisoformat(d["time"])
if type_ == Type.DT_TIMEDELTA:
return dt.timedelta(d["days"], d["seconds"], d["microseconds"])
if type_ == Type.DT_DATETIME:
return dt.datetime.fromisoformat(d["datetime"])
if type_ == Type.DATA_CLASS:
Expand Down
9 changes: 5 additions & 4 deletions tests/parallelize/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# SPDX-License-Identifier: BSD-3-Clause

import itertools
import multiprocessing as mp
import os
import signal
import time
from multiprocessing import Process, Queue
from queue import Empty
from threading import Thread

Expand All @@ -19,8 +19,9 @@
class Session:
def __init__(self, config: Config):
self.config = config
self.msg_queue = Queue()
self.work_queue = Queue()
self.ctx = mp.get_context("spawn") # or "forkserver"
self.msg_queue = self.ctx.Queue()
self.work_queue = self.ctx.Queue()

self.workers = []
self.worker_id_counter = itertools.count()
Expand Down Expand Up @@ -148,7 +149,7 @@ def start_workers(self, num_workers):

for _ in range(num_workers):
worker_id = next(self.worker_id_counter)
worker = Process(
worker = self.ctx.Process(
target=start_worker,
name=f"pytest-worker-{worker_id:03}",
args=(
Expand Down
4 changes: 3 additions & 1 deletion tests/test_annotation_integrations/test_colspec_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,9 @@ def consumer_collection(coll: CollectionType):

if with_violation:
with pytest.raises(cs.exc.MemberValidationError, match="2 members failed validation"):
coll.validate_polars(cast=True)
with structlog.testing.capture_logs() as logs:
coll.validate_polars(cast=True)
assert logs == [{"exc_info": True, "event": "Dataframely validation failed", "log_level": "error"}]
else:
if with_filter:
# it is not really without violation
Expand Down