Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move warm-up from session to runner #4262

Merged
merged 22 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7c794f4
Move warm-up to runner
ElenaKhaustova Oct 28, 2024
208a24b
Implemented test for running thread runner with patterns
ElenaKhaustova Oct 28, 2024
7c6729a
Added test for new catalog
ElenaKhaustova Oct 28, 2024
9d5b37d
Add line separator to file
ElenaKhaustova Oct 28, 2024
c3229c0
Replaced writing csv manually to writing with pandas
ElenaKhaustova Oct 28, 2024
6c509d9
Merge branch 'main' into fix/4250-move-warm-up-to-runner
ElenaKhaustova Oct 28, 2024
bd878c9
Fixed fixture
ElenaKhaustova Oct 28, 2024
68010aa
Removed new catalog from test
ElenaKhaustova Oct 28, 2024
29d373f
Made catalog type a parameter
ElenaKhaustova Oct 28, 2024
e90cfd7
Removed old catalog from test
ElenaKhaustova Oct 28, 2024
3f1dbe0
Removed new catalog from test
ElenaKhaustova Oct 28, 2024
892cda4
Removed data creation/loading
ElenaKhaustova Oct 29, 2024
e7f2632
Fixed test docstring
ElenaKhaustova Oct 29, 2024
429ca13
Removed extra loop
ElenaKhaustova Oct 29, 2024
3ffd538
Renamed variable for clarifty
ElenaKhaustova Oct 29, 2024
681d3f1
Merge branch 'main' into fix/4250-move-warm-up-to-runner
ElenaKhaustova Oct 29, 2024
01f9b62
Moved warm-up to the top
ElenaKhaustova Oct 29, 2024
069dff4
Moved warm-up to the top
ElenaKhaustova Oct 29, 2024
9d0f579
Merge branch 'main' into fix/4250-move-warm-up-to-runner
ElenaKhaustova Nov 1, 2024
5f6ef85
Updated release notes
ElenaKhaustova Nov 1, 2024
b0f9b0f
Merge branch 'main' into fix/4250-move-warm-up-to-runner
ElenaKhaustova Nov 5, 2024
e481c72
Remaned variable
ElenaKhaustova Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
validate_settings,
)
from kedro.io.core import generate_timestamp
from kedro.runner import AbstractRunner, SequentialRunner, ThreadRunner
from kedro.runner import AbstractRunner, SequentialRunner
from kedro.utils import _find_kedro_project

if TYPE_CHECKING:
Expand Down Expand Up @@ -395,11 +395,6 @@ def run( # noqa: PLR0913
hook_manager.hook.before_pipeline_run(
run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
)

if isinstance(runner, ThreadRunner):
for ds in filtered_pipeline.datasets():
if catalog.config_resolver.match_pattern(ds):
_ = catalog._get_dataset(ds)
try:
run_result = runner.run(
filtered_pipeline, catalog, hook_manager, session_id
Expand Down
4 changes: 4 additions & 0 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ def run(
self._logger.info(
"Asynchronous mode is enabled for loading and saving data"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment about this? I also notice this line is duplicated:
registered_ds = [ds for ds in pipeline.datasets() if ds in catalog]

Maybe we can refactor this into one loop so we only loop pipeline.datasets() once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Kept one loop and added a comment about warm-up
  2. The line is duplicated on purpose, there's a comment explaining the difference but I also renamed the variable to avoid confusion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registered_ds_no_runtime_patterns is a bit of a mouthful, could we come up with a shorter name? E.g. warm_concrete_ds (not necessarily the best option, but just an example)?

Tbh, I don't mind the two loops - they show the purpose better without too much of a slowdown, but the current refactor is also ok.

for ds in pipeline.datasets():
_ = catalog._get_dataset(ds)

self._run(pipeline, catalog, hook_or_null_manager, session_id) # type: ignore[arg-type]

self._logger.info("Pipeline execution completed successfully.")
Expand Down
34 changes: 33 additions & 1 deletion tests/runner/test_thread_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@
import pytest

from kedro.framework.hooks import _create_hook_manager
from kedro.io import AbstractDataset, DataCatalog, DatasetError, MemoryDataset
from kedro.io import (
AbstractDataset,
DataCatalog,
DatasetError,
KedroDataCatalog,
MemoryDataset,
)
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.runner import ThreadRunner
from tests.runner.conftest import exception_fn, identity, return_none, sink, source
Expand Down Expand Up @@ -39,6 +46,31 @@ def test_does_not_log_not_using_async(self, fan_out_fan_in, catalog, caplog):
ThreadRunner().run(fan_out_fan_in, catalog)
assert "Using synchronous mode for loading and saving data." not in caplog.text

@pytest.mark.parametrize("catalog_type", [DataCatalog, KedroDataCatalog])
def test_thread_run_with_patterns(self, catalog_type):
"""Test warm-up is done and patterns are resolved before running pipeline.

Without the warm-up "Dataset 'dummy_1' has already been registered" error
would be raised for this test. We check that the dataset was registered at the
warm-up, and we successfully passed to loading it.
"""
catalog_conf = {"{catch_all}": {"type": "MemoryDataset"}}

catalog = catalog_type.from_config(catalog_conf)

test_pipeline = pipeline(
[
node(identity, inputs="dummy_1", outputs="output_1", name="node_1"),
node(identity, inputs="dummy_2", outputs="output_2", name="node_2"),
node(identity, inputs="dummy_1", outputs="output_3", name="node_3"),
]
)

with pytest.raises(
Exception, match="Data for MemoryDataset has not been saved yet"
):
ThreadRunner().run(test_pipeline, catalog)


class TestMaxWorkers:
@pytest.mark.parametrize(
Expand Down