diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9a16ccc6..b1aa3e59 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -120,7 +120,7 @@ jobs: docker-services: | zoo minio - pytest-arguments: --ibis --pdtransform --no-postgres + pytest-arguments: --ibis --pdtransform --no-postgres --no-lock_tests s3_test: name: S3 Tests diff --git a/docs/source/changelog.md b/docs/source/changelog.md index 6113284a..e5519f85 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -1,6 +1,9 @@ # Changelog -## 0.12.1 (2025-10-DD) +## 0.12.2 (2025-10-DD) +- Support datetime.time and timedelta as task input and output (or anywhere where JSON serialization is needed). + +## 0.12.1 (2025-10-10) - Create all metadata tables even if some metadata tables already exist. This fixes problems with conditional need for sync_views table. - Make table hooks work even without ConfigContext (see example_mssql/download_parquet_files.py) diff --git a/docs/source/reference/api.rst b/docs/source/reference/api.rst index 15445799..f6221cd3 100644 --- a/docs/source/reference/api.rst +++ b/docs/source/reference/api.rst @@ -97,7 +97,7 @@ IBM DB2 .. autoclass:: pydiverse.pipedag.backend.table.sql.dialects.IBMDB2TableStore .. autoclass:: pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2MaterializationDetails -.. autoclass:: pydiverse.pipedag.backend.table.sql.dialects.ibm_db2::IBMDB2CompressionTypes +.. autoclass:: pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2CompressionTypes :members: :undoc-members: diff --git a/src/pydiverse/pipedag/util/json.py b/src/pydiverse/pipedag/util/json.py index 50175913..f8f6a312 100644 --- a/src/pydiverse/pipedag/util/json.py +++ b/src/pydiverse/pipedag/util/json.py @@ -44,6 +44,8 @@ class Type(str, Enum): PATHLIB_PATH = "pathlib:path" DT_DATE = "dt:date" DT_DATETIME = "dt:datetime" + DT_TIME = "dt:time" + DT_TIMEDELTA = "dt:timedelta" def __str__(self): return self.value @@ -142,6 +144,19 @@ def json_default(o): TYPE_KEY: Type.DT_DATE, "date": o.isoformat(), } + if isinstance(o, dt.time): + return { + TYPE_KEY: Type.DT_TIME, + "time": o.isoformat(), + } + if isinstance(o, dt.timedelta): + return { + TYPE_KEY: Type.DT_TIMEDELTA, + "days": o.days, + "seconds": o.seconds, + "microseconds": o.microseconds, + # docstring of timedelta says: Representation: (days, seconds, microseconds). + } if get_origin(o) is not None: # must be GenericAlias # somehow isinstance(o, GenericAlias) did not work reliably @@ -270,6 +285,10 @@ def get_stage(name: str | None): return Path(d["path"]) if type_ == Type.DT_DATE: return dt.date.fromisoformat(d["date"]) + if type_ == Type.DT_TIME: + return dt.time.fromisoformat(d["time"]) + if type_ == Type.DT_TIMEDELTA: + return dt.timedelta(d["days"], d["seconds"], d["microseconds"]) if type_ == Type.DT_DATETIME: return dt.datetime.fromisoformat(d["datetime"]) if type_ == Type.DATA_CLASS: diff --git a/tests/parallelize/session.py b/tests/parallelize/session.py index 6d35acbf..e3afed12 100644 --- a/tests/parallelize/session.py +++ b/tests/parallelize/session.py @@ -2,10 +2,10 @@ # SPDX-License-Identifier: BSD-3-Clause import itertools +import multiprocessing as mp import os import signal import time -from multiprocessing import Process, Queue from queue import Empty from threading import Thread @@ -19,8 +19,9 @@ class Session: def __init__(self, config: Config): self.config = config - self.msg_queue = Queue() - self.work_queue = Queue() + self.ctx = mp.get_context("spawn") # or "forkserver" + self.msg_queue = self.ctx.Queue() + self.work_queue = self.ctx.Queue() self.workers = [] self.worker_id_counter = itertools.count() @@ -148,7 +149,7 @@ def start_workers(self, num_workers): for _ in range(num_workers): worker_id = next(self.worker_id_counter) - worker = Process( + worker = self.ctx.Process( target=start_worker, name=f"pytest-worker-{worker_id:03}", args=( diff --git a/tests/test_annotation_integrations/test_colspec_polars.py b/tests/test_annotation_integrations/test_colspec_polars.py index 9692aec5..57dad31b 100644 --- a/tests/test_annotation_integrations/test_colspec_polars.py +++ b/tests/test_annotation_integrations/test_colspec_polars.py @@ -512,7 +512,9 @@ def consumer_collection(coll: CollectionType): if with_violation: with pytest.raises(cs.exc.MemberValidationError, match="2 members failed validation"): - coll.validate_polars(cast=True) + with structlog.testing.capture_logs() as logs: + coll.validate_polars(cast=True) + assert logs == [{"exc_info": True, "event": "Dataframely validation failed", "log_level": "error"}] else: if with_filter: # it is not really without violation