Skip to content

Commit

Permalink
Merge pull request #729 from roboflow/feature/caching_compilation_res…
Browse files Browse the repository at this point in the history
…ults

Caching compilation results
  • Loading branch information
PawelPeczek-Roboflow authored Oct 9, 2024
2 parents 3f99310 + 8fb0883 commit e7c2508
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 62 deletions.
58 changes: 58 additions & 0 deletions inference/core/workflows/execution_engine/v1/compiler/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import hashlib
from collections import deque
from threading import Lock
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar

from inference.core.workflows.errors import WorkflowEnvironmentConfigurationError

V = TypeVar("V")


class BasicWorkflowsCache(Generic[V]):
"""
Base cache which is capable of hashing compound payloads based on
list of injected hash functions. Hash functions are to produce stable hashing strings.
Each function is invoked on `get_hash_key(...)` kwarg (use named args only!),
output string is concatenated and md5 value is calculated.
Cache is size bounded, each entry lives until `cache_size` new entries appear.
Raises `WorkflowEnvironmentConfigurationError` when `get_hash_key(...)` is not
provided with params corresponding to all hash functions.
Thread safe thanks to thread lock on `get(...)` and `cache(...)`.
"""

def __init__(
self,
cache_size: int,
hash_functions: List[Tuple[str, Callable[[Any], str]]],
):
self._keys_buffer = deque(maxlen=max(cache_size, 1))
self._cache: Dict[str, V] = {}
self._hash_functions = hash_functions
self._cache_lock = Lock()

def get_hash_key(self, **kwargs) -> str:
hash_chunks = []
for key_name, hashing_function in self._hash_functions:
if key_name not in kwargs:
raise WorkflowEnvironmentConfigurationError(
public_message=f"Cache is miss configured.",
context="workflows_cache | hash_key_generation",
)
hash_value = hashing_function(kwargs[key_name])
hash_chunks.append(hash_value)
return hashlib.md5("<|>".join(hash_chunks).encode("utf-8")).hexdigest()

def get(self, key: str) -> Optional[V]:
with self._cache_lock:
return self._cache.get(key)

def cache(self, key: str, value: V) -> None:
with self._cache_lock:
if len(self._keys_buffer) == self._keys_buffer.maxlen:
to_pop = self._keys_buffer.popleft()
del self._cache[to_pop]
self._keys_buffer.append(key)
self._cache[key] = value
96 changes: 79 additions & 17 deletions inference/core/workflows/execution_engine/v1/compiler/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import json
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Union

import networkx as nx
from packaging.version import Version

from inference.core.workflows.execution_engine.entities.base import WorkflowParameter
Expand All @@ -11,7 +15,11 @@
WorkflowsProfiler,
execution_phase,
)
from inference.core.workflows.execution_engine.v1.compiler.cache import (
BasicWorkflowsCache,
)
from inference.core.workflows.execution_engine.v1.compiler.entities import (
BlockSpecification,
CompiledWorkflow,
InputSubstitution,
ParsedWorkflowDefinition,
Expand All @@ -36,10 +44,31 @@
)
from inference.core.workflows.execution_engine.v1.dynamic_blocks.block_assembler import (
compile_dynamic_blocks,
ensure_dynamic_blocks_allowed,
)
from inference.core.workflows.prototypes.block import WorkflowBlockManifest


@dataclass(frozen=True)
class GraphCompilationResult:
execution_graph: nx.DiGraph
parsed_workflow_definition: ParsedWorkflowDefinition
available_blocks: List[BlockSpecification]
initializers: Dict[str, Union[Any, Callable[[None], Any]]]


COMPILATION_CACHE = BasicWorkflowsCache[GraphCompilationResult](
cache_size=256,
hash_functions=[
(
"workflow_definition",
partial(json.dumps, sort_keys=True),
),
("execution_engine_version", lambda version: str(version)),
],
)


@execution_phase(
name="workflow_compilation",
categories=["execution_engine_operation"],
Expand All @@ -50,6 +79,51 @@ def compile_workflow(
execution_engine_version: Optional[Version] = None,
profiler: Optional[WorkflowsProfiler] = None,
) -> CompiledWorkflow:
graph_compilation_results = compile_workflow_graph(
workflow_definition=workflow_definition,
execution_engine_version=execution_engine_version,
profiler=profiler,
)
steps = initialise_steps(
steps_manifest=graph_compilation_results.parsed_workflow_definition.steps,
available_blocks=graph_compilation_results.available_blocks,
explicit_init_parameters=init_parameters,
initializers=graph_compilation_results.initializers,
profiler=profiler,
)
input_substitutions = collect_input_substitutions(
workflow_definition=graph_compilation_results.parsed_workflow_definition,
)
steps_by_name = {step.manifest.name: step for step in steps}
dump_execution_graph(execution_graph=graph_compilation_results.execution_graph)
return CompiledWorkflow(
workflow_definition=graph_compilation_results.parsed_workflow_definition,
workflow_json=workflow_definition,
init_parameters=init_parameters,
execution_graph=graph_compilation_results.execution_graph,
steps=steps_by_name,
input_substitutions=input_substitutions,
)


def compile_workflow_graph(
workflow_definition: dict,
execution_engine_version: Optional[Version] = None,
profiler: Optional[WorkflowsProfiler] = None,
) -> GraphCompilationResult:
key = COMPILATION_CACHE.get_hash_key(
workflow_definition=workflow_definition,
execution_engine_version=execution_engine_version,
)
cached_value = COMPILATION_CACHE.get(key=key)
if cached_value is not None:
dynamic_blocks_definitions = workflow_definition.get(
"dynamic_blocks_definitions", []
)
ensure_dynamic_blocks_allowed(
dynamic_blocks_definitions=dynamic_blocks_definitions
)
return cached_value
statically_defined_blocks = load_workflow_blocks(
execution_engine_version=execution_engine_version,
profiler=profiler,
Expand All @@ -75,26 +149,14 @@ def compile_workflow(
workflow_definition=parsed_workflow_definition,
profiler=profiler,
)
steps = initialise_steps(
steps_manifest=parsed_workflow_definition.steps,
result = GraphCompilationResult(
execution_graph=execution_graph,
parsed_workflow_definition=parsed_workflow_definition,
available_blocks=available_blocks,
explicit_init_parameters=init_parameters,
initializers=initializers,
profiler=profiler,
)
input_substitutions = collect_input_substitutions(
workflow_definition=parsed_workflow_definition,
)
steps_by_name = {step.manifest.name: step for step in steps}
dump_execution_graph(execution_graph=execution_graph)
return CompiledWorkflow(
workflow_definition=parsed_workflow_definition,
workflow_json=workflow_definition,
init_parameters=init_parameters,
execution_graph=execution_graph,
steps=steps_by_name,
input_substitutions=input_substitutions,
)
COMPILATION_CACHE.cache(key=key, value=result)
return result


def collect_input_substitutions(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Dict, List, Optional, Type, Union

import pydantic
from packaging.version import Version
from pydantic import BaseModel, Field, create_model
from typing_extensions import Annotated

Expand All @@ -15,43 +14,25 @@
WorkflowsProfiler,
execution_phase,
)
from inference.core.workflows.execution_engine.v1.compiler.cache import (
BasicWorkflowsCache,
)
from inference.core.workflows.execution_engine.v1.compiler.entities import (
BlockSpecification,
ParsedWorkflowDefinition,
)


class WorkflowDefinitionEntitiesCache:

def __init__(self, cache_size: int):
self._cache_size = max(1, cache_size)
self._entries: Dict[str, Type[BaseModel]] = {}

def add_entry(
self, blocks: List[BlockSpecification], entry: Type[BaseModel]
) -> None:
hash_value = self._hash_blocks(blocks=blocks)
if hash_value in self._entries:
self._entries[hash_value] = entry
return None
if len(self._entries) == self._cache_size:
key_to_drop = next(self._entries.__iter__())
self._entries.pop(key_to_drop)
self._entries[hash_value] = entry

def cache_hit(self, blocks: List[BlockSpecification]) -> bool:
hash_value = self._hash_blocks(blocks=blocks)
return hash_value in self._entries

def get(self, blocks: List[BlockSpecification]) -> Type[BaseModel]:
hash_value = self._hash_blocks(blocks=blocks)
return self._entries[hash_value]

def _hash_blocks(self, blocks: List[BlockSpecification]) -> str:
return "<|>".join(block.block_source + block.identifier for block in blocks)


WORKFLOW_DEFINITION_ENTITIES_CACHE = WorkflowDefinitionEntitiesCache(cache_size=64)
WORKFLOW_DEFINITION_ENTITIES_CACHE = BasicWorkflowsCache[Type[BaseModel]](
cache_size=64,
hash_functions=[
(
"available_blocks",
lambda blocks: "<|>".join(
block.block_source + block.identifier for block in blocks
),
)
],
)


@execution_phase(
Expand Down Expand Up @@ -87,8 +68,12 @@ def parse_workflow_definition(
def build_workflow_definition_entity(
available_blocks: List[BlockSpecification],
) -> Type[BaseModel]:
if WORKFLOW_DEFINITION_ENTITIES_CACHE.cache_hit(blocks=available_blocks):
return WORKFLOW_DEFINITION_ENTITIES_CACHE.get(blocks=available_blocks)
cache_key = WORKFLOW_DEFINITION_ENTITIES_CACHE.get_hash_key(
available_blocks=available_blocks
)
cached_value = WORKFLOW_DEFINITION_ENTITIES_CACHE.get(key=cache_key)
if cached_value is not None:
return cached_value
steps_manifests = tuple(block.manifest_class for block in available_blocks)
block_manifest_types_union = Union[steps_manifests]
block_type = Annotated[block_manifest_types_union, Field(discriminator="type")]
Expand All @@ -99,9 +84,9 @@ def build_workflow_definition_entity(
steps=(List[block_type], ...),
outputs=(List[JsonField], ...),
)
WORKFLOW_DEFINITION_ENTITIES_CACHE.add_entry(
blocks=available_blocks,
entry=entity,
WORKFLOW_DEFINITION_ENTITIES_CACHE.cache(
key=cache_key,
value=entity,
)
return entity

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,7 @@ def compile_dynamic_blocks(
) -> List[BlockSpecification]:
if not dynamic_blocks_definitions:
return []
if not ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS:
raise WorkflowEnvironmentConfigurationError(
public_message="Cannot use dynamic blocks with custom Python code in this installation of `workflows`. "
"This can be changed by setting environmental variable "
"`ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS=True`",
context="workflow_compilation | dynamic_blocks_compilation",
)
ensure_dynamic_blocks_allowed(dynamic_blocks_definitions=dynamic_blocks_definitions)
all_defined_kinds = load_all_defined_kinds()
kinds_lookup = {kind.name: kind for kind in all_defined_kinds}
dynamic_blocks = [
Expand All @@ -79,6 +73,16 @@ def compile_dynamic_blocks(
return compiled_blocks


def ensure_dynamic_blocks_allowed(dynamic_blocks_definitions: List[dict]) -> None:
if dynamic_blocks_definitions and not ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS:
raise WorkflowEnvironmentConfigurationError(
public_message="Cannot use dynamic blocks with custom Python code in this installation of `workflows`. "
"This can be changed by setting environmental variable "
"`ALLOW_CUSTOM_PYTHON_EXECUTION_IN_WORKFLOWS=True`",
context="workflow_compilation | dynamic_blocks_compilation",
)


def create_dynamic_block_specification(
dynamic_block_definition: DynamicBlockDefinition,
kinds_lookup: Dict[str, Kind],
Expand Down
Loading

0 comments on commit e7c2508

Please sign in to comment.