diff --git a/docs/source/changelog.md b/docs/source/changelog.md index c7e375ea..218f677d 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -1,5 +1,8 @@ # Changelog +## 0.10.12 (2025-XX-XX) +- Feat: Automatically check cache-validity of polars DataFrame tasks marked as lazy + ## 0.10.11 (2025-09-08) - Fix: Late initialization of ParquetTableCache instance_id allows use of multi-config `@input_stage_versions` diff --git a/docs/source/examples/realistic_pipeline.md b/docs/source/examples/realistic_pipeline.md index d4142839..9620f0a9 100644 --- a/docs/source/examples/realistic_pipeline.md +++ b/docs/source/examples/realistic_pipeline.md @@ -323,14 +323,18 @@ are `sqlalchemy.Table`, `pandas.DataFrame`, `polars.DataFrame`, or `polars.LazyF ### Controlling automatic cache invalidation -For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set lazy=True. This means the task is always +For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set `lazy=True`. This means the task is always executed because producing a query is fast, but the query is only executed when it is actually needed. For `pl.LazyFrame`, `version=AUTO_VERSION` is a good choice, because then the task is executed once with empty input -dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data. For -`pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the -user needs to help manually bumpig a version number like `version="1.0.0"`. For development, `version=None` simply +dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data. + +For `pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the +user needs to help manually bumping a version number like `version="1.0.0"`. For development, `version=None` simply deactivates caching until the code is more stable. It is recommended to always develop with small pipeline instances -anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)). +anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)). Setting `lazy=True` and `version=None` +for `pl.DataFrame` executes the task, but hashes the result to determine the cache-validity of the task output and hence +the cache invalidation of downstream tasks. This is a good choice for small tasks which are quick to compute and +where the bumping the version number adds unwanted complexity to the development process. ### Integration with pydiverse colspec (same as dataframely but with pydiverse transform based SQL support) diff --git a/docs/source/quickstart.md b/docs/source/quickstart.md index 7731d993..b48650dd 100644 --- a/docs/source/quickstart.md +++ b/docs/source/quickstart.md @@ -187,9 +187,12 @@ In this case, the task must produce a SQLAlchemy expression for all tabular outputs without executing them. Pipedag can render the query and will only produce a table based on this query expression if the query changed or one of the inputs to the task changed. +For tasks returning Polars DataFrame, the hash of the resulting DataFrame is used to determine whether to +cache-invalidate downstream tasks. + ### Manual cache invalidation with `version` parameter -For non-SQL tasks, the `version` parameter of the {py:func}`@materialize ` decorator must +For non-lazy tasks, the `version` parameter of the {py:func}`@materialize ` decorator must be used for manual cache invalidation. As long as the version stays the same, it is assumed that the code of the task did not materially change and will produce the same outputs given the same inputs. We refrained from automatically inspecting any python code changes since this would break at shared code changes where it is very hard to distinguish diff --git a/src/pydiverse/pipedag/backend/table/sql/hooks.py b/src/pydiverse/pipedag/backend/table/sql/hooks.py index 32b3667b..ead13453 100644 --- a/src/pydiverse/pipedag/backend/table/sql/hooks.py +++ b/src/pydiverse/pipedag/backend/table/sql/hooks.py @@ -19,7 +19,7 @@ from pydiverse.common import Date, Dtype, PandasBackend from pydiverse.common.util.computation_tracing import ComputationTracer -from pydiverse.common.util.hashing import stable_hash +from pydiverse.common.util.hashing import hash_polars_dataframe, stable_hash from pydiverse.pipedag import ConfigContext from pydiverse.pipedag._typing import T from pydiverse.pipedag.backend.table.sql.ddl import ( @@ -568,6 +568,11 @@ def materialize( def retrieve(cls, *args, **kwargs): raise RuntimeError("This should never get called.") + @classmethod + def lazy_query_str(cls, store: SQLTableStore, obj: ExternalTableReference) -> str: + obj_hash = stable_hash(obj.name, obj.schema) + return obj_hash + # endregion @@ -1239,6 +1244,12 @@ def dialect_supports_polars_native_read(cls): # for most dialects we find a way return True + @classmethod + def lazy_query_str(cls, store: SQLTableStore, obj: pl.DataFrame) -> str: + _ = store + obj_hash = hash_polars_dataframe(obj) + return obj_hash + @SQLTableStore.register_table(pl) class LazyPolarsTableHook(TableHook[SQLTableStore]): diff --git a/src/pydiverse/pipedag/materialize/core.py b/src/pydiverse/pipedag/materialize/core.py index 0e70c711..95161bf7 100644 --- a/src/pydiverse/pipedag/materialize/core.py +++ b/src/pydiverse/pipedag/materialize/core.py @@ -111,14 +111,40 @@ def materialize( :param lazy: Whether this task is lazy or not. - Unlike a normal task, lazy tasks always get executed. However, if a lazy - task produces a lazy table (e.g. a SQL query), the table store checks if - the same query has been executed before. If this is the case, then the - query doesn't get executed, and instead, the table gets copied from the cache. + Unlike a normal task, lazy tasks always get executed. However, before table + returned by a lazy task gets materialized, the table store checks if + the same table has been materialized before. If this is the case, then the + table doesn't get materialized, and instead, the table gets copied from the cache. + + This is efficient for tasks that return SQL queries, because the query + only gets generated but will not be executed again if the resulting table is cache-valid. + + The same also works for :py:class:`ExternalTableReference `, + where the "query" is just the identifier of the table in the store. + + .. Note:: For tasks returning an ``ExternalTableReference`` pipedag cannot automatically + know if the external tables has changed of not. This should be controlled via a cache function + given via the ``cache`` argument of ``materialize``. + See :py:class:`ExternalTableReference ` + for an example. + + + For tasks returning a Polars DataFrame, the output is deemed cache-valid + if the hash of the resulting DataFrame is the same as the hash of the previous run. + So, even though the task always gets executed, downstream tasks can remain cache-valid + if the DataFrame is the same as before. This is useful for small tasks that are hard to + implement using only LazyFrames, but where the DataFrame generation is cheap. + + + + In both cases, you don't need to manually bump the ``version`` of a lazy task. + + .. Warning:: A task returning a Polars LazyFrame should `not` be marked as lazy. + Use ``version=AUTO_VERSION`` instead. See :py:class:`AUTO_VERSION`. + .. Warning:: A task returning a Pandas DataFrame should `not` be marked as lazy. + No hashing is implemented for Pandas DataFrames, so the task will always + be deemed cache-invalid, and thus, cache-invalidate all downstream tasks. - This behaviour is very useful, because you don't need to manually bump - the `version` of a lazy task. This only works because for lazy tables - generating the query is very cheap compared to executing it. :param group_node_tag: Set a tag that may add this task to a configuration based group node. :param nout: diff --git a/tests/test_cache/test_basic_cache_invalidation.py b/tests/test_cache/test_basic_cache_invalidation.py index 4f17d6c6..5df61aa9 100644 --- a/tests/test_cache/test_basic_cache_invalidation.py +++ b/tests/test_cache/test_basic_cache_invalidation.py @@ -280,12 +280,22 @@ def test_change_task_version_blob(mocker): child_spy.assert_called_once() -def test_change_lazy_query(mocker): +@pytest.mark.parametrize( + "get_tbl_obj", + [ + lambda query_value: select_as(query_value, "x"), + (lambda query_value: pl.DataFrame({"x": [query_value]})) if pl else None, + ], + ids=["sql", "polars"], +) +def test_change_lazy_query(mocker, get_tbl_obj): + if get_tbl_obj is None: + pytest.skip("Polars is not installed, skipping Polars test.") query_value = 1 @materialize(lazy=True, nout=2) def lazy_task(): - return 0, Table(select_as(query_value, "x"), name="lazy_table") + return 0, Table(get_tbl_obj(query_value), name="lazy_table") @materialize(input_type=pd.DataFrame, version="1.0") def get_first(table, col): @@ -991,11 +1001,10 @@ def test_ignore_task_version(mocker): def test_lazy_table_without_query_string(mocker): value = None - @materialize(lazy=True, nout=5) + @materialize(lazy=True, nout=4) def falsely_lazy_task(): return ( Table(pd.DataFrame({"x": [value]}), name="pd_table"), - Table(pl.DataFrame({"x": [value]}), name="pl_table"), Table(pl.DataFrame({"x": [value]}).lazy(), name="pl_lazy_table"), select_as(2, "y"), 3, @@ -1004,42 +1013,34 @@ def falsely_lazy_task(): def get_flow(): with Flow() as flow: with Stage("stage_1"): - pd_tbl, pl_tbl, pl_lazy_tbl, select_tbl, constant = falsely_lazy_task() + pd_tbl, pl_lazy_tbl, select_tbl, constant = falsely_lazy_task() res_pd = m.take_first(pd_tbl, as_int=True) - res_pl = m.take_first(pl_tbl, as_int=True) res_pl_lazy = m.take_first(pl_lazy_tbl, as_int=True) res_select = m.noop(select_tbl) res_constant = m.noop(constant) - return flow, res_pd, res_pl, res_pl_lazy, res_select, res_constant + return flow, res_pd, res_pl_lazy, res_select, res_constant value = 0 - flow, res_pd, res_pl, res_pl_lazy, res_select, res_constant = get_flow() + flow, res_pd, res_pl_lazy, res_select, res_constant = get_flow() with StageLockContext(): result = flow.run() assert result.get(res_pd) == 0 - assert result.get(res_pl) == 0 assert result.get(res_pl_lazy) == 0 value = 1 - flow, res_pd, res_pl, res_pl_lazy, res_select, res_constant = get_flow() + flow, res_pd, res_pl_lazy, res_select, res_constant = get_flow() res_pd_spy = spy_task(mocker, res_pd) - res_pl_spy = spy_task(mocker, res_pl) res_pl_lazy_spy = spy_task(mocker, res_pl_lazy) select_spy = spy_task(mocker, res_select) constant_spy = spy_task(mocker, res_constant) with StageLockContext(): result = flow.run() assert result.get(res_pd) == 1 - assert result.get(res_pl) == 1 assert result.get(res_pl_lazy) == 1 # res_pd is downstream of a pd.DataFrame from a lazy task, # which should always be cache invalid. Hence, it should always be called. res_pd_spy.assert_called_once() - # res_pd is downstream of a pl.DataFrame from a lazy task, - # which should always be cache invalid. Hence, it should always be called. - res_pl_spy.assert_called_once() - # res_pd is downstream of a pl.LazyFrame from a lazy task, # which should always be cache invalid. Hence, it should always be called. # To avoid cache-invalidating the LazyFrame, we should use AUTOVERSION.