Skip to content

AiiDA WorkGraph engine#234

Merged
GeigerJ2 merged 129 commits intomainfrom
workgraph
Feb 26, 2026
Merged

AiiDA WorkGraph engine#234
GeigerJ2 merged 129 commits intomainfrom
workgraph

Conversation

@GeigerJ2
Copy link
Collaborator

@GeigerJ2 GeigerJ2 commented Oct 31, 2025

Implements a complete AiiDA-based execution engine that translates Sirocco workflows into WorkGraph structures for submission with AiiDA. The engine handles dependency resolution, SLURM job chaining, and rolling window execution.

A full design document can be found in ADR/AiiDA_Engine_Architecture.md, rendered here.

Compiled stubs from the `src/sirocco/engines/aiida` module for API and design discussion

__init__

__all__ = [
    "WorkGraphBuilder",
    "build_sirocco_workgraph",
    "run_sirocco_workgraph",
    "submit_sirocco_workgraph",
]

adapter

__all__ = ["AiidaAdapter"]


class AiidaAdapter:
    @staticmethod
    def validate_workflow(workflow: core.Workflow) -> None: ...
    @staticmethod
    def sanitize_label(label: str) -> str: ...
    @staticmethod
    def build_label_from_graph_item(graph_item: core.GraphItem) -> str: ...
    @staticmethod
    def create_input_data_node(
        core_data: core.AvailableData, *, used_by_icon: bool = False
    ) -> FileNode: ...
    @staticmethod
    def build_scheduler_options(task: core.Task) -> AiidaMetadataOptions: ...
    @staticmethod
    def build_metadata(task: core.Task) -> AiidaMetadata: ...
    @staticmethod
    def translate_mpi_placeholder(
        placeholder: core.MpiCmdPlaceholder,
    ) -> str: ...
    @staticmethod
    def parse_mpi_cmd(mpi_cmd: str) -> str: ...
    @staticmethod
    def substitute_argument_placeholders(
        arguments_template: str | None, placeholder_to_node_key: dict
    ) -> list[str]: ...
    @staticmethod
    def get_default_wrapper_script() -> aiida.orm.SinglefileData: ...
    @staticmethod
    def get_wrapper_script_data(task) -> aiida.orm.SinglefileData: ...

builder

__all__ = ["WorkGraphBuilder", "build_sirocco_workgraph"]


def build_sirocco_workgraph(core_workflow: core.Workflow) -> WorkGraph: ...


class WorkGraphBuilder:
    workflow: Incomplete
    resolved_config_path: Incomplete
    data_nodes: dict[str, FileNode]
    shell_specs: dict[str, AiidaShellTaskSpec]
    icon_specs: dict[str, AiidaIconTaskSpec]
    launcher_parents: dict[str, list[str]]

    def __init__(self, core_workflow: core.Workflow) -> None: ...
    def build(self) -> WorkGraph: ...

calcjob_builders

__all__ = [
    "SlurmDirectiveBuilder",
    "add_slurm_dependencies_to_metadata",
    "build_icon_calcjob_inputs",
]


class SlurmDirectiveBuilder:
    def __init__(self) -> None: ...
    def add_directive(self, directive: str) -> SlurmDirectiveBuilder: ...
    def add_uenv(self, uenv: str | None) -> SlurmDirectiveBuilder: ...
    def add_view(self, view: str | None) -> SlurmDirectiveBuilder: ...
    def add_dependency_afterok(
        self,
        job_ids: list[int] | TaskJobIdMapping,
        *,
        kill_on_invalid: bool = True,
    ) -> SlurmDirectiveBuilder: ...
    def extend_from_string(
        self, commands: str | None
    ) -> SlurmDirectiveBuilder: ...
    def build(self) -> str | None: ...


def build_icon_calcjob_inputs(
    task_spec: AiidaIconTaskSpec,
    input_data_nodes: dict,
    aiida_metadata: AiidaMetadata,
) -> dict: ...
def add_slurm_dependencies_to_metadata(
    base_metadata: AiidaMetadata,
    job_ids: TaskJobIdMapping | None,
    computer: aiida.orm.Computer | None,
    label: str | None = None,
) -> AiidaMetadata: ...

code_factory

__all__ = ["CodeFactory"]


class CodeFactory:
    @staticmethod
    def remove_script_extension(filename: str) -> str: ...
    @staticmethod
    def create_shell_code(
        task: core.ShellTask, computer: aiida.orm.Computer
    ) -> aiida.orm.Code: ...

dependency_resolvers

__all__ = [
    "build_dependency_mapping",
    "resolve_available_data_inputs",
    "resolve_icon_dependency_mapping",
    "resolve_shell_dependency_mappings",
]


def resolve_icon_dependency_mapping(
    parent_folders: TaskFolderMapping | None,
    port_dependency_mapping: PortDependencyMapping,
    master_namelist_pk: int,
    model_namelist_pks: dict,
) -> dict[str, aiida.orm.RemoteData]: ...
def resolve_shell_dependency_mappings(
    parent_folders: TaskFolderMapping,
    port_dependency_mapping: PortDependencyMapping,
    original_filenames: dict,
) -> ShellDependencyMappings: ...
def resolve_available_data_inputs(
    task: core.Task, aiida_data_nodes: PortDataMapping
) -> PortDataMapping: ...
def build_dependency_mapping(
    task: core.Task,
    core_workflow: core.Workflow,
    task_output_mapping: TaskMonitorOutputsMapping,
) -> DependencyMapping: ...

execute

__all__ = ["run_sirocco_workgraph", "submit_sirocco_workgraph"]


def submit_sirocco_workgraph(
    core_workflow: core.Workflow,
    *,
    inputs: None | dict[str, Any] = None,
    wait: bool = False,
    timeout: int = 60,
    metadata: None | dict[str, Any] = None,
) -> aiida.orm.Node: ...
def run_sirocco_workgraph(
    core_workflow: core.Workflow,
    inputs: None | dict[str, Any] = None,
    metadata: None | dict[str, Any] = None,
) -> aiida.orm.Node: ...

models

__all__ = [
    "AiidaIconTaskSpec",
    "AiidaMetadata",
    "AiidaMetadataOptions",
    "AiidaResources",
    "AiidaShellTaskSpec",
    "BaseDataInfo",
    "DependencyInfo",
    "DependencyMapping",
    "InputDataInfo",
    "OutputDataInfo",
    "ShellDependencyMappings",
]


class DependencyMapping(NamedTuple):
    port_mapping: PortDependencyMapping
    task_folders: TaskFolderMapping
    task_job_ids: TaskJobIdMapping


@dataclass
class ShellDependencyMappings:
    nodes: dict[str, aiida.orm.RemoteData]
    placeholders: dict[str, str]
    filenames: dict[str, str]


class DependencyInfo(BaseModel):
    model_config: Incomplete
    dep_label: str
    filename: str | None
    data_label: str


class BaseDataInfo(BaseModel):
    model_config: Incomplete
    name: str
    coordinates: dict[str, Any]
    label: str
    path: str


class InputDataInfo(BaseDataInfo):
    port: str
    is_available: bool


class OutputDataInfo(BaseDataInfo):
    port: str | None


class AiidaResources(BaseModel):
    model_config: Incomplete
    num_machines: int | None
    num_mpiprocs_per_machine: int | None
    num_cores_per_mpiproc: int | None


class AiidaMetadataOptions(BaseModel):
    model_config: Incomplete
    account: str | None
    prepend_text: str | None
    custom_scheduler_commands: str | None
    use_symlinks: bool | None
    resources: AiidaResources | None
    max_wallclock_seconds: int | None
    max_memory_kb: int | None
    queue_name: str | None
    withmpi: bool | None
    additional_retrieve_list: list[str] | None


class AiidaMetadata(BaseModel):
    model_config: Incomplete
    options: AiidaMetadataOptions | None
    computer: aiida.orm.Computer | None
    computer_label: str | None
    call_link_label: str | None


class AiidaShellTaskSpec(BaseModel):
    model_config: Incomplete
    label: str
    code_pk: int
    node_pks: dict[str, int]
    metadata: AiidaMetadata
    arguments_template: str
    filenames: dict[str, str]
    outputs: list[str]
    input_data_info: list[SerializedInputDataInfo]
    output_data_info: list[SerializedOutputDataInfo]
    output_port_mapping: dict[str, str]
    port_dependency_mapping: dict[str, list[SerializedDependencyInfo]] | None

    @classmethod
    def validate_code_pk_not_none(cls, v: int | None) -> int: ...


class AiidaIconTaskSpec(BaseModel):
    model_config: Incomplete
    label: str
    code_pk: int
    master_namelist_pk: int
    model_namelist_pks: dict[str, int]
    wrapper_script_pk: int | None
    metadata: AiidaMetadata
    output_port_mapping: dict[str, str]
    port_dependency_mapping: dict[str, list[dict[str, Any]]] | None

    @classmethod
    def validate_pk_not_none(cls, v: int | None) -> int: ...
    @classmethod
    def validate_model_pks_not_none(
        cls, v: dict[str, int | None]
    ) -> dict[str, int]: ...

monitoring

__all__ = ["get_job_data"]


async def get_job_data(
    workgraph_name: str,
    task_name: str,
    timeout_seconds: int,
    interval: int = 10,
): ...

patches.__init__

__all__ = [
    "patch_firecrest_symlink",
    "patch_slurm_dependency_handling",
    "patch_workgraph_window",
]

patches.firecrest_symlink

logger: Incomplete


def patch_firecrest_symlink() -> None: ...

patches.slurm_dependencies

logger: Incomplete


def patch_slurm_dependency_handling(): ...

patches.workgraph_window

logger: Incomplete


def patch_workgraph_window(): ...

spec_builders

__all__ = ["IconTaskSpecBuilder", "ShellTaskSpecBuilder"]


class TaskSpecBuilder(ABC, metaclass=abc.ABCMeta):
    task: Incomplete
    label: Incomplete
    computer: Incomplete
    metadata: Incomplete

    def __init__(self, task: core.Task) -> None: ...
    @abstractmethod
    def create_or_load_code(self) -> aiida.orm.Code: ...
    @abstractmethod
    def build_output_port_mapping(self) -> dict[str, str]: ...
    @abstractmethod
    def build_spec(self) -> AiidaIconTaskSpec | AiidaShellTaskSpec: ...


class ShellTaskSpecBuilder(TaskSpecBuilder):
    task: core.ShellTask
    metadata: Incomplete

    def __init__(self, task: core.ShellTask) -> None: ...
    def create_or_load_code(self) -> aiida.orm.Code: ...
    def build_input_data_info(self) -> list[InputDataInfo]: ...
    def build_output_data_info(self) -> list[OutputDataInfo]: ...
    def build_output_port_mapping(self) -> dict[str, str]: ...
    def build_spec(self) -> AiidaShellTaskSpec: ...


class IconTaskSpecBuilder(TaskSpecBuilder):
    task: core.IconTask

    def __init__(self, task: core.IconTask) -> None: ...
    def create_or_load_code(self) -> aiida.orm.Code: ...
    def build_output_port_mapping(self) -> dict[str, str]: ...
    def build_spec(self) -> AiidaIconTaskSpec: ...

spec_resolvers

__all__ = [
    "build_icon_task_with_dependencies",
    "build_shell_task_with_dependencies",
]


class TaskSpecResolver(ABC, metaclass=abc.ABCMeta):
    spec: Incomplete

    def __init__(self, task_spec_dict: dict) -> None: ...
    @abstractmethod
    def execute(
        self,
        input_data_nodes: dict | None = None,
        task_folders: dict | None = None,
        task_job_ids: dict | None = None,
    ) -> Annotated[dict, None]: ...


class ShellTaskSpecResolver(TaskSpecResolver):
    spec: AiidaShellTaskSpec

    def execute(
        self,
        input_data_nodes: dict | None = None,
        task_folders: dict | None = None,
        task_job_ids: dict | None = None,
    ) -> Annotated[dict, None]: ...


class IconTaskSpecResolver(TaskSpecResolver):
    spec: AiidaIconTaskSpec

    def execute(
        self,
        input_data_nodes: dict | None = None,
        task_folders: dict | None = None,
        task_job_ids: dict | None = None,
    ) -> Annotated[dict, None]: ...


def build_shell_task_with_dependencies(
    task_spec: dict,
    input_data_nodes: Annotated[dict, None] | None = None,
    task_folders: Annotated[dict, None] | None = None,
    task_job_ids: Annotated[dict, None] | None = None,
) -> Annotated[dict, None]: ...
def build_icon_task_with_dependencies(
    task_spec: dict,
    input_data_nodes: Annotated[dict, None] | None = None,
    task_folders: Annotated[dict, None] | None = None,
    task_job_ids: Annotated[dict, None] | None = None,
) -> Annotated[dict, None]: ...

task_pairs

__all__ = ["LAUNCHER_PREFIX", "MONITOR_PREFIX", "TaskPairContext"]

LAUNCHER_PREFIX: str
MONITOR_PREFIX: str


class TaskPairContext:
    def __init__(self, workgraph: WorkGraph, workflow_name: str) -> None: ...
    def get_dependency_outputs(self) -> TaskMonitorOutputsMapping: ...
    def get_dependency_tasks(self) -> TaskDependencyMapping: ...
    def add_icon_task_pair(
        self,
        task_label: str,
        task_spec: AiidaIconTaskSpec,
        input_data_for_task: PortDataMapping,
        dependencies: DependencyMapping,
    ) -> None: ...
    def add_shell_task_pair(
        self,
        task_label: str,
        task_spec: AiidaShellTaskSpec,
        input_data_for_task: PortDataMapping,
        dependencies: DependencyMapping,
    ) -> None: ...

topology

__all__ = ["compute_topological_levels"]


def compute_topological_levels(
    task_deps: dict[str, list[str]],
) -> dict[str, int]: ...

types

__all__ = [
    "FileNode",
    "LauncherParentsMapping",
    "PortDataMapping",
    "PortDependencyMapping",
    "SerializedDependencyInfo",
    "SerializedInputDataInfo",
    "SerializedOutputDataInfo",
    "TaskDependencyMapping",
    "TaskFolderMapping",
    "TaskJobIdMapping",
    "TaskMonitorOutputsMapping",
    "WgMonitorOutputs",
    "WgSocketValue",
    "WgTaskProtocol",
]

type FileNode = (
    aiida.orm.RemoteData | aiida.orm.SinglefileData | aiida.orm.FolderData
)
type PortDataMapping = dict[str, FileNode]
type TaskMonitorOutputsMapping = dict[str, WgMonitorOutputs]
type TaskDependencyMapping = dict[str, WgTaskProtocol]
type PortDependencyMapping = dict[str, list[DependencyInfo]]
type TaskFolderMapping = dict[str, WgSocketValue]
type TaskJobIdMapping = dict[str, WgSocketValue]
type LauncherParentsMapping = dict[str, list[str]]


class WgSocketValue(Protocol):
    @property
    def value(self) -> Any: ...


class WgTaskProtocol(Protocol):
    outputs: WgMonitorOutputs

    def __rshift__(self, other: WgTaskProtocol) -> WgTaskProtocol: ...


class WgMonitorOutputs(Protocol):
    remote_folder: WgSocketValue
    job_id: WgSocketValue


class SerializedInputDataInfo(TypedDict):
    port: str
    name: str
    coordinates: dict[str, Any]
    label: str
    is_available: bool
    path: str


class SerializedOutputDataInfo(TypedDict):
    port: str | None
    name: str
    coordinates: dict[str, Any]
    label: str
    path: str


class SerializedDependencyInfo(TypedDict):
    dep_label: str
    filename: str | None
    data_label: str

utils

__all__ = ["PortLabelMapper", "serialize_coordinates", "split_cmd_arg"]


def split_cmd_arg(
    command_line: str, script_name: str | None = None
) -> tuple[str, str]: ...
def serialize_coordinates(coordinates: dict) -> dict: ...


class PortLabelMapper:
    def __init__(self) -> None: ...
    def add(self, port: str, label: str) -> None: ...
    def add_many(self, port: str, labels: list[str]) -> None: ...
    def from_dict(self, mapping: dict[str, str]) -> PortLabelMapper: ...
    def get_port_for_label(self, label: str) -> str | None: ...
    def get_labels_for_port(self, port: str) -> list[str]: ...
    def has_label(self, label: str) -> bool: ...
    def has_port(self, port: str) -> bool: ...
    def to_dict(self) -> dict[str, str]: ...
    def get_all_ports(self) -> list[str]: ...
    def get_all_labels(self) -> list[str]: ...
    def __len__(self) -> int: ...

Remaining TODOs (within this PR)

  • Add engine field to config.yml
  • Templating should output config file with jinja2 template variables replaced
  • Align front_depth meaning (has to be strictly positive integer -> 1 == what I used to call sequential - no pre-submission)
  • Possibly, cleanup files?
  • Verify tests
  • Align tests/cases config files

Remaining TODOs (after this PR)

  • CLI changes (already issue exists for that)
    • Make CLI work with both engines (currently commands either AiiDA or standalone specific)
      • Properly have import checks, if-else, depending on the engine selection
      • For now, just merge without refactoring; otherwise, commands might break for both
    • Drop hard-coding of vars.yml, but expose template_file.yml (or similar) to the user
    • (possibly) instead of exposing the workflow_file, instead point to config_directory (config dir is one entity, e.g., zip it and share with others)
  • Make sure tests cases work with standalone (@leclairm)
  • Align ShellTask Code creation / command selection
    • We use uenv via uenv run, not via SLURM header, to allow for multiple uenvs being used in one run
    • Probably we can bash everything, always prepend bash
    • Use existence of path: also to verify that. If path not given -> we don't have to copy anything.

Notes from the meeting

  • Make config.yml only source of truth. Don't expose engine, scheduler, front_depth via CLI
  • Can probably drop run.sh scripts at this point

Top-level list of features

  • full refactor of workgraph.py to enable SLURM pre-submission with dynamic windowing (using a functional approach)

scope creeps

  • simple and complex dummy workflows with shell scripts that actually do arithmetic, such that workflow runs can be validated (i.e., branch independence, waiting, execution order, etc.)
  • jinja2-templating in config.yml files (template values are given via vars.yml; running test cases as examples via run.sh scripts)
  • patches:
    • to handle slurm job dependency errors in aiida-core
    • dynamic window recomputation in aiida-workgraph
    • symlinking with missing targets via aiida-firecrest

WIP

  • DYAMOND workflow with aiida engine added to tests cases

previously noted TODOs

  • Fix get_job_data: no QB, but pass WG pk directly -> this does not work, as the WG does not exist at this stage yet, only its future label, hence the QB is necessary
  • Rolling task front
  • Fix verdi process dump for Sirocco WG (has to be done in aiida-core ... TODO: add as patch)
  • A bit simpler large workflow (real ICON, can be sleep for pre- and post-proc)
  • next and previous symlinks for symlink tree on HPC

GeigerJ2 and others added 23 commits October 31, 2025 15:13
* refactor workgraph.py

Apply comprehensive refactoring patterns to workgraph.py to reduce
cognitive complexity, improve type safety, and enhance maintainability.

STRUCTURAL CHANGES

- Introduce frozen dataclasses to replace fragile tuple structures:
  * DependencyInfo(dep_label, filename, data_label): replaces 3-element
    tuples used throughout dependency tracking chain
  * InputDataInfo: typed metadata for input data items (7 fields)
  * OutputDataInfo: typed metadata for output data items (5 fields)

- Add type aliases for complex nested structures:
  * PortToDependencies: dict[str, list[DependencyInfo]]
  * ParentFolders, JobIds, TaskDepInfo, LauncherDependencies

COMPLEXITY REDUCTION

- Extract dependency processing helpers to reduce nesting:
  * _resolve_icon_dependency(): encapsulates 3-case logic (filename
    known, namelist resolution, workdir fallback)
  * _create_shell_remote_data(): creates RemoteData with consistent
    patterns
  * Reduces maximum indentation from 5+ levels to 2-3 levels

- Apply early-continue pattern in dependency processing loops:
  * load_icon_dependencies(): flattened control flow
  * load_and_process_shell_dependencies(): reduced nested conditionals

DRY PRINCIPLE APPLICATION

- Consolidate SLURM dependency directive building:
  * _build_slurm_dependency_directive(): single source of truth for
    "#SBATCH --dependency=afterok:..." construction
  * _add_custom_scheduler_command(): unified command appending logic
  * Eliminates duplication between
    build_icon_metadata_with_slurm_dependencies() and
    build_shell_metadata_with_slurm_dependencies()

UTILITY FUNCTIONS

- Add mapping helpers to reduce boilerplate:
  * _map_list_append(): replaces repeated setdefault().append() pattern
  * _map_unique_set(): conditional insertion with existence check
  * Applied in build_dependency_mapping() for cleaner intent

- Add logging wrappers to separate concerns:
  * _log_dependency_processing(): consistent dependency logging
  * _log_remote_data_details(): structured RemoteData creation logging

TYPE SAFETY IMPROVEMENTS

- Update build_shell_task_spec() to use dataclasses:
  * Change input_data_info from list[dict] to list[InputDataInfo]
  * Change output_data_info from list[dict] to list[OutputDataInfo]
  * Replace dict access patterns (info["field"]) with attribute access
    (info.field)
  * Enables IDE autocomplete and static type checking

AFFECTED FUNCTIONS

Modified:
- build_dependency_mapping(): use DependencyInfo, mapping helpers
- load_icon_dependencies(): use helper, type aliases, early-continue
- load_and_process_shell_dependencies(): use helper, type aliases,
  early-continue
- build_icon_metadata_with_slurm_dependencies(): use SLURM helpers
- build_shell_metadata_with_slurm_dependencies(): use SLURM helpers
- build_shell_task_spec(): use InputDataInfo/OutputDataInfo dataclasses

Added:
- _resolve_icon_dependency()
- _create_shell_remote_data()
- _build_slurm_dependency_directive()
- _add_custom_scheduler_command()
- _map_list_append()
- _map_unique_set()
- _log_dependency_processing()
- _log_remote_data_details()

REFACTORING PATTERNS APPLIED

1. Replace tuples with typed dataclasses for self-documentation
2. Extract per-dependency logic into focused helper functions
3. Precompute lookup tables to eliminate nested scanning (already
   optimized)
4. Apply early-continue pattern to reduce indentation depth
5. Consolidate repeated logic following DRY principle
6. Separate logging concerns from business logic
7. Use type aliases to clarify complex nested structures
8. Add utility functions for common mapping operations

* Fix JSON-serialization and upgrade aiida-core

* add branch-independence test case

* nicer test-setup with env var substitution and `run_via_cli.sh`

* delete `large-local` directory

* wip; pre-submission seems to work nicely for the simpler branch-independence workflow
from sirocco.parsing._utils import TimeUtils, convert_to_date, convert_to_duration

if TYPE_CHECKING:
from collections.abc import Iterator
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this seems to be fine for me now in TYPE_CHECKING block

Move pytest marker filter from extra-args to default-args in hatch
config so CI workflows can override it. Include slow tests in
integration CI. Merge redundant test_branch_independence_execution into
the parametrized test_branch_independence_with_front_depths.
Also:
- Add pytest-xdist for parallel test execution (-n auto)
- Consolidate duplicate Shell/Icon tests with parametrization
- More pytest.param
repr=False,
metadata={"description": "Path to wrapper script file relative to the config directory or absolute."},
)
# NOTE: This is hard-coded still to CSCS...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is on purpose because these are the only implemented targets both for standalone or aiida-icon even though target currently only makes sense for standalone. It will change since we want to unify both specification with:

tasks:
  - icon
      plugin: icon
      computer: santis
      target: gpu

Then we can decide where the check happens for supported targets. Maybe that can be split between standalone and aiida-icon to allow some divergence in the support status. Still if we want to keep it centralized at the parsing level, we can make use of the new engine field in a new @model_validator(mode="after") decorator on the ConfigWorkflow class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general note: I think we should be completely agnostic about this in the development of the tool. Everyting center specific should be configurable by users. Otherwise we just create a single center-specific tool, which is how we ended up with 300 different workflow managers in the first place ^^

tasks:
  - icon
      plugin: icon
      computer: santis
      target: gpu

I think this is good. We could even change computer to hostname, as computer is more AiiDA terminology, and hostame, as it is in the SSH config is more known to people (and AiiDA now also supports OpenSSH directly).

Comment on lines +196 to +207
@staticmethod
def _add_sirocco_time_prepend_text(options: AiidaMetadataOptions, task: core.Task) -> AiidaMetadataOptions:
"""Append chunk start/stop exports to prepend_text when date cycling is available."""
start_date = task.cycle_point.chunk_start_date.isoformat() # type: ignore[attr-defined]
stop_date = task.cycle_point.chunk_stop_date.isoformat() # type: ignore[attr-defined]

exports = f"export SIROCCO_START_DATE={start_date}\nexport SIROCCO_STOP_DATE={stop_date}\n"

current_prepend = options.prepend_text or ""
new_prepend = f"{current_prepend}\n{exports}" if current_prepend else exports

return options.model_copy(update={"prepend_text": new_prepend})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this the way it's used at the moment. Yet Christian asked me in the mean time if we could also export the overall dates in the runscript. Either we keep it for later or we already export SIROCCO_CHUNK_START_DATE, SIROCCO_CHUNK_STOP_DATE, SIROCCO_START_DATE, and SIROCCO_STOP_DATE (and adapt the corresponding variable name in the tests) in this PR. I don't have a strong feeling, I just need to remember to change it in both engines if I do it later.

@GeigerJ2 GeigerJ2 merged commit 48d08f6 into main Feb 26, 2026
11 checks passed
@GeigerJ2 GeigerJ2 deleted the workgraph branch February 26, 2026 10:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants