Skip to content

Feat: more cores for loading #4427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

Feat: more cores for loading #4427

wants to merge 12 commits into from

Conversation

themisvaltinos
Copy link
Contributor

This update adds a ProcessPoolExecutor for parallel loading of a project's models. It also adds a mock executor for single-process scenarios, such as when the system doesn’t support fork. Also, it refactors optimized_query_cache_pool to use this unified execution logic, eliminating the need for separate sequential and parallel code paths.

Makefile Outdated
@@ -64,7 +64,7 @@ engine-up: engine-clickhouse-up engine-mssql-up engine-mysql-up engine-postgres-
engine-down: engine-clickhouse-down engine-mssql-down engine-mysql-down engine-postgres-down engine-spark-down engine-trino-down

fast-test:
pytest -n auto -m "fast and not cicdonly" && pytest -m "isolated"
pytest -n auto -m "fast and not cicdonly" && pytest -m "isolated" && pytest -m "isolated2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This label name is not very descriptive. Can we be more precise

for row in YAML().load(file.read())
]

cache.put(external_models, path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the fact that we now need to manually manage the cache everywhere by splitting the load from the get. Any way to preserve the previous declarative API?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure. this is the simplest because the loading all happens in separate processes and must all be complete separate, we only communicate through the file system

models[model.fqn] = model

if paths:
defaults = dict(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overlaps with the defaults that is in the dict itself. Can we have a more descriptive name for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to model_loading_defaults for clarity, couldn't think of a better alternative

_selected_gateway = selected_gateway


def load_sql_models(path: Path) -> t.Tuple[Path, list[Model]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the 1st item of the return value being used anywhere. Is it used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no this was leftover from before I added the futures_to_paths dictionary. The path is already accessible so it’s no longer needed. I removed it and updated _load_sql_models to use the simplified return value—loaded = future.result() instead of unpacking with _.

self._track_file(seed_path)
if errors:
error_string = "\n".join(errors)
raise ConfigError(f"Failed to load models\n\n{error_string}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] I believe Failed to load models\n\n is redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it

pytest.ini Outdated
@@ -8,6 +8,7 @@ markers =
remote: test that involves interacting with a remote DB
cicdonly: test that only runs on CI/CD
isolated: tests that need to run sequentially usually because they use fork
isolated2: tests that need to run isolated because they interfere
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use isolated here, btw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed the marker to registry_isolation and updated all references to it, to indicate this test needs isolation because of an issue with the registry. This particular test test_duplicate_python_model_names_raise_error added here: #3945 is successful when it is run in isolation, but it breaks if it is grouped with the rest of the tests, which is why they originally had it with isolated. But having it as before with isolated which are forking tests, since it’s designed to raise an error, leads to these tests failing as well

except ConfigError:
from sqlmesh.core.console import get_console

get_console().log_warning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this respect --ignore-warnings when this runs in a worker process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I tested it and your suspicion is correct it wasn't working because the console wasn’t shared across processes. I added a console parameter to _init_model_defaults to pass self._console to worker processes when creating the process pool, so that each worker used set_console(console) to match the parent’s console and preserves ignore_warnings along with all console settings

pass

@abc.abstractmethod
def get(self, path: Path) -> t.List[Model]:
Copy link
Member

@izeigerman izeigerman May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is only needed in one place but impacts all other places. Can we keep both interfaces (get + put AND get_or_load_models) and revert unrelated changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted to use in all other places except for sqlmodels the previous interface

future.set_exception(e)
return future

def map(self, fn, *iterables, timeout=None, chunksize=1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need a shutdown method too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since shutdown was called in __exit__ and since the synchronous executor runs everything in the main process, there are no resources to release or cleanup and futures are complete from the start this is why I didn't add it. but I revise to have it to keep the api similar to python’s ProcessPoolExecutor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants