From 1b78264de97b29415f1f5074a625655731bac129 Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Thu, 8 May 2025 18:21:49 +0530 Subject: [PATCH 1/8] director and envoy recovery v1 Signed-off-by: Ishant Thakare --- .../workflow/component/director/director.py | 7 +++--- .../workflow/component/director/experiment.py | 13 +++++++---- .../workflow/component/envoy/envoy.py | 5 +++-- .../workflow/interface/fl_spec.py | 22 +++++++++++++------ .../workflow/protocols/director.proto | 1 + .../workflow/runtime/federated_runtime.py | 4 ++-- .../transport/grpc/director_client.py | 3 +-- .../transport/grpc/director_server.py | 6 +++-- 8 files changed, 39 insertions(+), 22 deletions(-) diff --git a/openfl/experimental/workflow/component/director/director.py b/openfl/experimental/workflow/component/director/director.py index 07b44f7a18..6c0735ad69 100644 --- a/openfl/experimental/workflow/component/director/director.py +++ b/openfl/experimental/workflow/component/director/director.py @@ -96,6 +96,7 @@ async def start_experiment_execution_loop(self) -> None: loop = asyncio.get_event_loop() while True: try: + logger.info("Waiting for experiment...") async with self.experiments_registry.get_next_experiment() as experiment: await self._wait_for_authorized_envoys() run_aggregator_future = loop.create_task( @@ -117,7 +118,7 @@ async def start_experiment_execution_loop(self) -> None: await self._flow_status.put(flow_status) except Exception as e: logger.error(f"Error while executing experiment: {e}") - raise + continue async def _wait_for_authorized_envoys(self) -> None: """Wait until all authorized envoys are connected""" @@ -139,8 +140,8 @@ async def get_flow_state(self) -> Tuple[bool, bytes]: status (bool): The flow status. flspec_obj (bytes): A serialized FLSpec object (in bytes) using dill. """ - status, flspec_obj = await self._flow_status.get() - return status, dill.dumps(flspec_obj) + status, flspec_obj, exception = await self._flow_status.get() + return status, dill.dumps(flspec_obj), exception async def wait_experiment(self, envoy_name: str) -> str: """Waits for an experiment to be ready for a given envoy. diff --git a/openfl/experimental/workflow/component/director/experiment.py b/openfl/experimental/workflow/component/director/experiment.py index 044211de20..b8dadfbb82 100644 --- a/openfl/experimental/workflow/component/director/experiment.py +++ b/openfl/experimental/workflow/component/director/experiment.py @@ -6,6 +6,7 @@ import asyncio import logging +import traceback from contextlib import asynccontextmanager from enum import Enum, auto from pathlib import Path @@ -79,6 +80,7 @@ def __init__( self.status = Status.PENDING self.aggregator = None self.updated_flow = None + self.experiment_exception_trace = None async def start( self, @@ -135,12 +137,15 @@ async def start( ) self.status = Status.FINISHED logger.info("Experiment %s was finished successfully.", self.name) - except Exception as e: + except Exception: + self.experiment_exception_trace = traceback.format_exc() self.status = Status.FAILED - logger.error("Experiment %s failed with error: %s.", self.name, e) - raise + self.aggregator.quit_job_sent_to = self.collaborators + logger.error( + f"Experiment {self.name} failed with error: {self.experiment_exception_trace}" + ) - return self.status == Status.FINISHED, self.updated_flow + return self.status == Status.FINISHED, self.updated_flow, self.experiment_exception_trace def _create_aggregator_grpc_server( self, diff --git a/openfl/experimental/workflow/component/envoy/envoy.py b/openfl/experimental/workflow/component/envoy/envoy.py index 1d27d48c79..f589e1b826 100644 --- a/openfl/experimental/workflow/component/envoy/envoy.py +++ b/openfl/experimental/workflow/component/envoy/envoy.py @@ -136,14 +136,14 @@ def _run(self) -> None: while True: try: # Wait for experiment from Director server + logger.info("Waiting for an experiment to run...") experiment_name = self._envoy_dir_client.wait_experiment() data_stream = self._envoy_dir_client.get_experiment_data(experiment_name) + data_file_path = self._save_data_stream_to_file(data_stream) except Exception as exc: logger.exception("Failed to get experiment: %s", exc) time.sleep(self.DEFAULT_RETRY_TIMEOUT_IN_SECONDS) continue - data_file_path = self._save_data_stream_to_file(data_stream) - try: with ExperimentWorkspace( experiment_name=f"{self.name}_{experiment_name}", @@ -154,6 +154,7 @@ def _run(self) -> None: self._run_collaborator() except Exception as exc: logger.exception("Collaborator failed with error: %s:", exc) + continue finally: self.is_experiment_running = False diff --git a/openfl/experimental/workflow/interface/fl_spec.py b/openfl/experimental/workflow/interface/fl_spec.py index f0713a8497..f026a929c7 100644 --- a/openfl/experimental/workflow/interface/fl_spec.py +++ b/openfl/experimental/workflow/interface/fl_spec.py @@ -182,6 +182,7 @@ def _setup_initial_state(self) -> None: def _run_federated(self) -> None: """Executes the flow using FederatedRuntime.""" + exp_name = None try: # Prepare workspace and submit it for the FederatedRuntime archive_path, exp_name = self.runtime.prepare_workspace_archive() @@ -192,11 +193,15 @@ def _run_federated(self) -> None: # Retrieve the flspec object to update the experiment state flspec_obj = self._get_flow_state() # Update state of self - self._update_from_flspec_obj(flspec_obj) + if flspec_obj: + self._update_from_flspec_obj(flspec_obj) except Exception as e: - raise Exception( - f"FederatedRuntime: Experiment {exp_name} failed to run due to error: {e}" + error_msg = ( + "FederatedRuntime: Failed to prepare workspace archive" + if exp_name is None + else f"FederatedRuntime: Experiment {exp_name} failed" ) + raise Exception(f"{error_msg} due to error: {e}") def _update_from_flspec_obj(self, flspec_obj: FLSpec) -> None: """Update self with attributes from the updated flspec instance. @@ -218,12 +223,15 @@ def _get_flow_state(self) -> Union[FLSpec, None]: flspec_obj (Union[FLSpec, None]): An updated FLSpec instance if the experiment runs successfully. None if the experiment could not run. """ - status, flspec_obj = self.runtime.get_flow_state() - if status: - print("Experiment ran successfully") + status, flspec_obj, exception = self.runtime.get_flow_state() + if status and flspec_obj: + print("\033[92mExperiment ran successfully\033[0m") return flspec_obj else: - print("Experiment could not run") + print( + "\033[91m Experiment could not run due to error:\033[0m", + f"\033[91m{exception}\033[0m", + ) return None def _capture_instance_snapshot(self, kwargs) -> List: diff --git a/openfl/experimental/workflow/protocols/director.proto b/openfl/experimental/workflow/protocols/director.proto index 0e4a16e548..fed654fa1f 100644 --- a/openfl/experimental/workflow/protocols/director.proto +++ b/openfl/experimental/workflow/protocols/director.proto @@ -88,6 +88,7 @@ message GetFlowStateRequest {} message GetFlowStateResponse { bool completed = 1; bytes flspec_obj = 2; + string exception = 3; } message SendRuntimeRequest {} diff --git a/openfl/experimental/workflow/runtime/federated_runtime.py b/openfl/experimental/workflow/runtime/federated_runtime.py index 4bda382765..40bd5032c5 100644 --- a/openfl/experimental/workflow/runtime/federated_runtime.py +++ b/openfl/experimental/workflow/runtime/federated_runtime.py @@ -187,14 +187,14 @@ def get_flow_state(self) -> Tuple[bool, Any]: status (bool): The flow status. flow_object: The deserialized flow object. """ - status, flspec_obj = self._runtime_dir_client.get_flow_state() + status, flspec_obj, exception = self._runtime_dir_client.get_flow_state() # Append generated workspace path to sys.path # to allow unpickling of flspec_obj sys.path.append(str(self.generated_workspace_path)) flow_object = dill.loads(flspec_obj) - return status, flow_object + return status, flow_object, exception def get_envoys(self) -> List[str]: """ diff --git a/openfl/experimental/workflow/transport/grpc/director_client.py b/openfl/experimental/workflow/transport/grpc/director_client.py index 189ab00937..375ad873d1 100644 --- a/openfl/experimental/workflow/transport/grpc/director_client.py +++ b/openfl/experimental/workflow/transport/grpc/director_client.py @@ -105,7 +105,6 @@ def wait_experiment(self) -> str: Returns: experiment_name (str): The name of the experiment. """ - logger.info("Waiting for an experiment to run...") response = self.stub.WaitExperiment(self._get_experiment_data()) logger.info("New experiment received: %s", response) if not response.experiment_name: @@ -315,7 +314,7 @@ def get_flow_state(self) -> Tuple: """ response = self.stub.GetFlowState(director_pb2.GetFlowStateRequest()) - return response.completed, response.flspec_obj + return response.completed, response.flspec_obj, response.exception def stream_experiment_stdout(self, experiment_name) -> Iterator[Dict[str, Any]]: """Stream experiment stdout RPC. diff --git a/openfl/experimental/workflow/transport/grpc/director_server.py b/openfl/experimental/workflow/transport/grpc/director_server.py index 608d2a7743..9d415095d7 100644 --- a/openfl/experimental/workflow/transport/grpc/director_server.py +++ b/openfl/experimental/workflow/transport/grpc/director_server.py @@ -325,8 +325,10 @@ async def GetFlowState(self, request, context) -> director_pb2.GetFlowStateRespo Returns: director_pb2.GetFlowStateResponse: The response to the request. """ - status, flspec_obj = await self.director.get_flow_state() - return director_pb2.GetFlowStateResponse(completed=status, flspec_obj=flspec_obj) + status, flspec_obj, exception = await self.director.get_flow_state() + return director_pb2.GetFlowStateResponse( + completed=status, flspec_obj=flspec_obj, exception=exception + ) async def GetExperimentStdout( self, request, context From 1ba83eaf14cf74f334eef3b03393e35974f1ebca Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Fri, 9 May 2025 11:39:56 +0530 Subject: [PATCH 2/8] optimized director recovery Signed-off-by: Ishant Thakare --- .../component/aggregator/aggregator.py | 107 +++++++++++------- .../workflow/component/director/director.py | 19 ++-- .../workflow/component/director/experiment.py | 77 +++++++++++-- .../workflow/interface/fl_spec.py | 8 +- .../transport/grpc/director_server.py | 29 +++-- 5 files changed, 162 insertions(+), 78 deletions(-) diff --git a/openfl/experimental/workflow/component/aggregator/aggregator.py b/openfl/experimental/workflow/component/aggregator/aggregator.py index 8ff1b0779b..ec42f1b4a0 100644 --- a/openfl/experimental/workflow/component/aggregator/aggregator.py +++ b/openfl/experimental/workflow/component/aggregator/aggregator.py @@ -188,6 +188,29 @@ def _log_big_warning(self) -> None: f" WARNED!!!" ) + def _initialize_flow(self) -> None: + """Initialize flow by resetting and creating clones.""" + FLSpec._reset_clones() + FLSpec._create_clones(self.flow, self.flow.runtime.collaborators) + + def _prepare_collaborator_queues(self, next_step) -> None: + """Prepare task queues for collaborators with clones. + + Args: + next_step (str): Next step in the flow + """ + for k, v in self.__collaborator_tasks_queue.items(): + if k in self.selected_collaborators: + v.put((next_step, self.clones_dict[k])) + else: + logger.info(f"Tasks will not be sent to {k}") + + def _restore_instance_snapshot(self) -> None: + """Restore instance snapshot if it exists.""" + if hasattr(self, "instance_snapshot"): + self.flow.restore_instance_snapshot(self.flow, list(self.instance_snapshot)) + delattr(self, "instance_snapshot") + def _update_final_flow(self) -> None: """Update the final flow state with current flow artifacts.""" artifacts_iter, _ = generate_artifacts(ctx=self.flow) @@ -203,6 +226,33 @@ def _get_sleep_time() -> int: """ return 10 + async def _track_collaborator_status(self) -> None: + """Wait for selected collaborators to connect, request tasks, and submit results.""" + while not self.collaborator_task_results.is_set(): + len_sel_collabs = len(self.selected_collaborators) + len_connected_collabs = len(self.connected_collaborators) + if len_connected_collabs < len_sel_collabs: + # Waiting for collaborators to connect. + logger.info( + "Waiting for " + + f"{len_sel_collabs - len_connected_collabs}/{len_sel_collabs}" + + " collaborators to connect..." + ) + elif self.tasks_sent_to_collaborators != len_sel_collabs: + logger.info( + "Waiting for " + + f"{len_sel_collabs - self.tasks_sent_to_collaborators}/{len_sel_collabs}" + + " to make requests for tasks..." + ) + else: + # Waiting for selected collaborators to send the results. + logger.info( + "Waiting for " + + f"{len_sel_collabs - self.collaborators_counter}/{len_sel_collabs}" + + " collaborators to send results..." + ) + await asyncio.sleep(Aggregator._get_sleep_time()) + async def run_flow(self) -> FLSpec: """ Start the execution and run flow until completion. @@ -211,61 +261,36 @@ async def run_flow(self) -> FLSpec: Returns: flow (FLSpec): Updated instance. """ - # Start function will be the first step if any flow + self._initialize_flow() + # Start function will be the first step of any flow f_name = "start" - # Creating a clones from the flow object - FLSpec._reset_clones() - FLSpec._create_clones(self.flow, self.flow.runtime.collaborators) - logger.info(f"Starting round {self.current_round}...") + while True: + # Execute Aggregator steps next_step = self.do_task(f_name) - if self.time_to_quit: logger.info("Experiment Completed.") break - # Prepare queue for collaborator task, with clones - for k, v in self.__collaborator_tasks_queue.items(): - if k in self.selected_collaborators: - v.put((next_step, self.clones_dict[k])) - else: - logger.info(f"Tasks will not be sent to {k}") - - while not self.collaborator_task_results.is_set(): - len_sel_collabs = len(self.selected_collaborators) - len_connected_collabs = len(self.connected_collaborators) - if len_connected_collabs < len_sel_collabs: - # Waiting for collaborators to connect. - logger.info( - "Waiting for " - + f"{len_sel_collabs - len_connected_collabs}/{len_sel_collabs}" - + " collaborators to connect..." - ) - elif self.tasks_sent_to_collaborators != len_sel_collabs: - logger.info( - "Waiting for " - + f"{len_sel_collabs - self.tasks_sent_to_collaborators}/{len_sel_collabs}" - + " to make requests for tasks..." - ) - else: - # Waiting for selected collaborators to send the results. - logger.info( - "Waiting for " - + f"{len_sel_collabs - self.collaborators_counter}/{len_sel_collabs}" - + " collaborators to send results..." - ) - await asyncio.sleep(Aggregator._get_sleep_time()) - + self._prepare_collaborator_queues(next_step) + await self._track_collaborator_status() self.collaborator_task_results.clear() f_name = self.next_step - if hasattr(self, "instance_snapshot"): - self.flow.restore_instance_snapshot(self.flow, list(self.instance_snapshot)) - delattr(self, "instance_snapshot") + self._restore_instance_snapshot() self._update_final_flow() return self.final_flow_state + def extract_flow(self) -> FLSpec: + """Extract the flow object from the aggregator. + + Returns: + FLSpec: The flow object. + """ + self.__delete_private_attrs_from_clone(self.flow) + return self.flow + def call_checkpoint( self, name: str, ctx: Any, f: Callable, stream_buffer: bytes = None ) -> None: diff --git a/openfl/experimental/workflow/component/director/director.py b/openfl/experimental/workflow/component/director/director.py index 6c0735ad69..d2b2edb1d5 100644 --- a/openfl/experimental/workflow/component/director/director.py +++ b/openfl/experimental/workflow/component/director/director.py @@ -9,9 +9,7 @@ import time from collections import defaultdict from pathlib import Path -from typing import Any, AsyncGenerator, Dict, Iterable, Optional, Tuple, Union - -import dill +from typing import Any, AsyncGenerator, Dict, Iterable, Optional, Union from openfl.experimental.workflow.component.director.experiment import ( Experiment, @@ -96,7 +94,7 @@ async def start_experiment_execution_loop(self) -> None: loop = asyncio.get_event_loop() while True: try: - logger.info("Waiting for experiment...") + logger.info("Waiting for an experiment to run...") async with self.experiments_registry.get_next_experiment() as experiment: await self._wait_for_authorized_envoys() run_aggregator_future = loop.create_task( @@ -118,7 +116,7 @@ async def start_experiment_execution_loop(self) -> None: await self._flow_status.put(flow_status) except Exception as e: logger.error(f"Error while executing experiment: {e}") - continue + raise async def _wait_for_authorized_envoys(self) -> None: """Wait until all authorized envoys are connected""" @@ -132,16 +130,15 @@ async def _wait_for_authorized_envoys(self) -> None: ) await asyncio.sleep(10) - async def get_flow_state(self) -> Tuple[bool, bytes]: + async def get_flow_state(self) -> dict: """Wait until the experiment flow status indicates completion - and return the status along with a serialized FLSpec object. + and return the flow status. Returns: - status (bool): The flow status. - flspec_obj (bytes): A serialized FLSpec object (in bytes) using dill. + dict: A dictionary containing the flow status. """ - status, flspec_obj, exception = await self._flow_status.get() - return status, dill.dumps(flspec_obj), exception + status = await self._flow_status.get() + return status async def wait_experiment(self, envoy_name: str) -> str: """Waits for an experiment to be ready for a given envoy. diff --git a/openfl/experimental/workflow/component/director/experiment.py b/openfl/experimental/workflow/component/director/experiment.py index b8dadfbb82..a769c871bc 100644 --- a/openfl/experimental/workflow/component/director/experiment.py +++ b/openfl/experimental/workflow/component/director/experiment.py @@ -8,9 +8,10 @@ import logging import traceback from contextlib import asynccontextmanager +from dataclasses import dataclass from enum import Enum, auto from pathlib import Path -from typing import Any, Iterable, List, Optional, Tuple, Union +from typing import Any, Iterable, List, Optional, Union from openfl.experimental.workflow.federated import Plan from openfl.experimental.workflow.transport import AggregatorGRPCServer @@ -29,6 +30,51 @@ class Status(Enum): REJECTED = auto() +@dataclass +class ExperimentStatus: + """ + A class to track the status and exceptions of an experiment run. + + Attributes: + status (Status): The current running status of the experiment. + updated_flow (FLSpec): The updated flow object associated with the experiment. + exception (str, optional): Any exception that occurred during the experiment. + + """ + + status: Status = Status.PENDING + updated_flow: Optional[Any] = None + exception: Optional[str] = None + + def update_experiment_status( + self, + status: Status, + updated_flow: Optional[Any] = None, + exception: Optional[str] = None, + ) -> None: + """ + A method to update the experiment status and associated details. + """ + self.status = status + if updated_flow: + self.updated_flow = updated_flow + if exception: + self.exception = exception + + def get_status(self) -> dict: + """ + Get the status of the experiment. + + Returns: + dict: The status of the experiment. + """ + return { + "status": self.status == Status.FINISHED, + "updated_flow": self.updated_flow, + "exception": self.exception, + } + + class Experiment: """Experiment class. @@ -77,7 +123,7 @@ def __init__( # experiment workspace provided by the director self.plan_path = Path(plan_path) self.users = set() if users is None else set(users) - self.status = Status.PENDING + self.experiment_status = ExperimentStatus() self.aggregator = None self.updated_flow = None self.experiment_exception_trace = None @@ -91,7 +137,7 @@ async def start( certificate: Optional[Union[Path, str]] = None, director_config: Path = None, install_requirements: bool = False, - ) -> Tuple[bool, Any]: + ) -> dict: """Run experiment. Args: @@ -108,11 +154,12 @@ async def start( requirements should be installed. Defaults to False. Returns: - List[Union[bool, Any]]: - - status: status of the experiment. - - updated_flow: The updated flow object. + dict: A dictionary containing: + - status (Status): Final status of the experiment. + - updated_flow (Any): The updated flow object. + - exception (str or None): Formatted traceback if any exception occurred. """ - self.status = Status.IN_PROGRESS + self.experiment_status.update_experiment_status(Status.IN_PROGRESS) try: logger.info(f"New experiment {self.name} for collaborators {self.collaborators}") @@ -135,17 +182,23 @@ async def start( ), self.aggregator.run_flow(), ) - self.status = Status.FINISHED + self.experiment_status.update_experiment_status( + Status.FINISHED, + updated_flow=self.updated_flow, + ) logger.info("Experiment %s was finished successfully.", self.name) except Exception: - self.experiment_exception_trace = traceback.format_exc() - self.status = Status.FAILED + self.experiment_status.update_experiment_status( + Status.FAILED, + updated_flow=self.aggregator.extract_flow(), + exception=traceback.format_exc(), + ) self.aggregator.quit_job_sent_to = self.collaborators logger.error( - f"Experiment {self.name} failed with error: {self.experiment_exception_trace}" + f"Experiment {self.name} failed with error: {self.experiment_status.exception}" ) - return self.status == Status.FINISHED, self.updated_flow, self.experiment_exception_trace + return self.experiment_status.get_status() def _create_aggregator_grpc_server( self, diff --git a/openfl/experimental/workflow/interface/fl_spec.py b/openfl/experimental/workflow/interface/fl_spec.py index f026a929c7..d7f697f652 100644 --- a/openfl/experimental/workflow/interface/fl_spec.py +++ b/openfl/experimental/workflow/interface/fl_spec.py @@ -224,15 +224,15 @@ def _get_flow_state(self) -> Union[FLSpec, None]: runs successfully. None if the experiment could not run. """ status, flspec_obj, exception = self.runtime.get_flow_state() - if status and flspec_obj: - print("\033[92mExperiment ran successfully\033[0m") - return flspec_obj - else: + if exception and not status: print( "\033[91m Experiment could not run due to error:\033[0m", f"\033[91m{exception}\033[0m", ) return None + else: + print("\033[92mExperiment ran successfully\033[0m") + return flspec_obj def _capture_instance_snapshot(self, kwargs) -> List: """Takes backup of self before exclude or include filtering. diff --git a/openfl/experimental/workflow/transport/grpc/director_server.py b/openfl/experimental/workflow/transport/grpc/director_server.py index 9d415095d7..8d5d626fbe 100644 --- a/openfl/experimental/workflow/transport/grpc/director_server.py +++ b/openfl/experimental/workflow/transport/grpc/director_server.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import AsyncIterator, Optional, Union +import dill import grpc from grpc import aio, ssl_server_credentials @@ -250,14 +251,20 @@ async def GetExperimentData( Yields: director_pb2.ExperimentData: The experiment data. """ - data_file_path = self.director.get_experiment_data(request.experiment_name) - max_buffer_size = 2 * 1024 * 1024 - with open(data_file_path, "rb") as df: - while True: - data = df.read(max_buffer_size) - if len(data) == 0: - break - yield director_pb2.ExperimentData(size=len(data), exp_data=data) + try: + data_file_path = self.director.get_experiment_data(request.experiment_name) + max_buffer_size = 2 * 1024 * 1024 + with open(data_file_path, "rb") as df: + while True: + data = df.read(max_buffer_size) + if not data: + break + yield director_pb2.ExperimentData(size=len(data), exp_data=data) + except Exception as e: + await context.abort( + grpc.StatusCode.INTERNAL, + f"Failed to stream experiment data: {type(e).__name__}: {e}", + ) async def WaitExperiment(self, request, context) -> director_pb2.WaitExperimentResponse: """Handles a request to wait for an experiment to be ready. @@ -325,9 +332,11 @@ async def GetFlowState(self, request, context) -> director_pb2.GetFlowStateRespo Returns: director_pb2.GetFlowStateResponse: The response to the request. """ - status, flspec_obj, exception = await self.director.get_flow_state() + status = await self.director.get_flow_state() return director_pb2.GetFlowStateResponse( - completed=status, flspec_obj=flspec_obj, exception=exception + completed=status["status"], + flspec_obj=dill.dumps(status["updated_flow"]), + exception=status["exception"], ) async def GetExperimentStdout( From ecec85263a7277de9006b88ee1076a79590746b8 Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Fri, 9 May 2025 13:34:53 +0530 Subject: [PATCH 3/8] incorporated internal review comments Signed-off-by: Ishant Thakare --- .../workflow/component/director/director.py | 2 +- .../workflow/component/director/experiment.py | 4 ++-- openfl/experimental/workflow/interface/fl_spec.py | 11 +++++------ .../workflow/runtime/federated_runtime.py | 3 ++- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/openfl/experimental/workflow/component/director/director.py b/openfl/experimental/workflow/component/director/director.py index d2b2edb1d5..fa7f330c35 100644 --- a/openfl/experimental/workflow/component/director/director.py +++ b/openfl/experimental/workflow/component/director/director.py @@ -130,7 +130,7 @@ async def _wait_for_authorized_envoys(self) -> None: ) await asyncio.sleep(10) - async def get_flow_state(self) -> dict: + async def get_flow_state(self) -> dict[bool, Any, str]: """Wait until the experiment flow status indicates completion and return the flow status. diff --git a/openfl/experimental/workflow/component/director/experiment.py b/openfl/experimental/workflow/component/director/experiment.py index a769c871bc..0bf19f9d85 100644 --- a/openfl/experimental/workflow/component/director/experiment.py +++ b/openfl/experimental/workflow/component/director/experiment.py @@ -61,7 +61,7 @@ def update_experiment_status( if exception: self.exception = exception - def get_status(self) -> dict: + def get_status(self) -> dict[bool, Any, str]: """ Get the status of the experiment. @@ -137,7 +137,7 @@ async def start( certificate: Optional[Union[Path, str]] = None, director_config: Path = None, install_requirements: bool = False, - ) -> dict: + ) -> dict[bool, Any, str]: """Run experiment. Args: diff --git a/openfl/experimental/workflow/interface/fl_spec.py b/openfl/experimental/workflow/interface/fl_spec.py index d7f697f652..24ad9914ed 100644 --- a/openfl/experimental/workflow/interface/fl_spec.py +++ b/openfl/experimental/workflow/interface/fl_spec.py @@ -193,8 +193,7 @@ def _run_federated(self) -> None: # Retrieve the flspec object to update the experiment state flspec_obj = self._get_flow_state() # Update state of self - if flspec_obj: - self._update_from_flspec_obj(flspec_obj) + self._update_from_flspec_obj(flspec_obj) except Exception as e: error_msg = ( "FederatedRuntime: Failed to prepare workspace archive" @@ -224,15 +223,15 @@ def _get_flow_state(self) -> Union[FLSpec, None]: runs successfully. None if the experiment could not run. """ status, flspec_obj, exception = self.runtime.get_flow_state() - if exception and not status: + if status: + print("\033[92mExperiment ran successfully\033[0m") + return flspec_obj + else: print( "\033[91m Experiment could not run due to error:\033[0m", f"\033[91m{exception}\033[0m", ) return None - else: - print("\033[92mExperiment ran successfully\033[0m") - return flspec_obj def _capture_instance_snapshot(self, kwargs) -> List: """Takes backup of self before exclude or include filtering. diff --git a/openfl/experimental/workflow/runtime/federated_runtime.py b/openfl/experimental/workflow/runtime/federated_runtime.py index 40bd5032c5..100d149487 100644 --- a/openfl/experimental/workflow/runtime/federated_runtime.py +++ b/openfl/experimental/workflow/runtime/federated_runtime.py @@ -179,13 +179,14 @@ def submit_experiment(self, archive_path, exp_name) -> None: finally: self.remove_workspace_archive(archive_path) - def get_flow_state(self) -> Tuple[bool, Any]: + def get_flow_state(self) -> Tuple[bool, Any, str]: """ Retrieve the updated flow status and deserialized flow object. Returns: status (bool): The flow status. flow_object: The deserialized flow object. + exception (str): Exception message if any. """ status, flspec_obj, exception = self._runtime_dir_client.get_flow_state() From 140d172396dfbb2da401839968746ab7a461db7e Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Fri, 9 May 2025 15:28:57 +0530 Subject: [PATCH 4/8] Incorporated review comments Signed-off-by: Ishant Thakare --- .../experimental/workflow/component/director/director.py | 2 +- .../workflow/component/director/experiment.py | 8 ++++---- openfl/experimental/workflow/component/envoy/envoy.py | 1 - openfl/experimental/workflow/interface/fl_spec.py | 7 +++---- .../workflow/transport/grpc/director_client.py | 1 + 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/openfl/experimental/workflow/component/director/director.py b/openfl/experimental/workflow/component/director/director.py index fa7f330c35..d25ec3ec33 100644 --- a/openfl/experimental/workflow/component/director/director.py +++ b/openfl/experimental/workflow/component/director/director.py @@ -130,7 +130,7 @@ async def _wait_for_authorized_envoys(self) -> None: ) await asyncio.sleep(10) - async def get_flow_state(self) -> dict[bool, Any, str]: + async def get_flow_state(self) -> Dict[str, Union[bool, Optional[Any], Optional[str]]]: """Wait until the experiment flow status indicates completion and return the flow status. diff --git a/openfl/experimental/workflow/component/director/experiment.py b/openfl/experimental/workflow/component/director/experiment.py index 0bf19f9d85..d1f086b2a1 100644 --- a/openfl/experimental/workflow/component/director/experiment.py +++ b/openfl/experimental/workflow/component/director/experiment.py @@ -11,7 +11,7 @@ from dataclasses import dataclass from enum import Enum, auto from pathlib import Path -from typing import Any, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Union from openfl.experimental.workflow.federated import Plan from openfl.experimental.workflow.transport import AggregatorGRPCServer @@ -61,7 +61,7 @@ def update_experiment_status( if exception: self.exception = exception - def get_status(self) -> dict[bool, Any, str]: + def get_status(self) -> Dict[str, Union[bool, Optional[Any], Optional[str]]]: """ Get the status of the experiment. @@ -137,7 +137,7 @@ async def start( certificate: Optional[Union[Path, str]] = None, director_config: Path = None, install_requirements: bool = False, - ) -> dict[bool, Any, str]: + ) -> Dict[str, Union[bool, Optional[Any], Optional[str]]]: """Run experiment. Args: @@ -156,7 +156,7 @@ async def start( Returns: dict: A dictionary containing: - status (Status): Final status of the experiment. - - updated_flow (Any): The updated flow object. + - updated_flow (FLSpec): The updated flow object. - exception (str or None): Formatted traceback if any exception occurred. """ self.experiment_status.update_experiment_status(Status.IN_PROGRESS) diff --git a/openfl/experimental/workflow/component/envoy/envoy.py b/openfl/experimental/workflow/component/envoy/envoy.py index f589e1b826..d1a1ea7e39 100644 --- a/openfl/experimental/workflow/component/envoy/envoy.py +++ b/openfl/experimental/workflow/component/envoy/envoy.py @@ -136,7 +136,6 @@ def _run(self) -> None: while True: try: # Wait for experiment from Director server - logger.info("Waiting for an experiment to run...") experiment_name = self._envoy_dir_client.wait_experiment() data_stream = self._envoy_dir_client.get_experiment_data(experiment_name) data_file_path = self._save_data_stream_to_file(data_stream) diff --git a/openfl/experimental/workflow/interface/fl_spec.py b/openfl/experimental/workflow/interface/fl_spec.py index 24ad9914ed..540195c215 100644 --- a/openfl/experimental/workflow/interface/fl_spec.py +++ b/openfl/experimental/workflow/interface/fl_spec.py @@ -182,8 +182,8 @@ def _setup_initial_state(self) -> None: def _run_federated(self) -> None: """Executes the flow using FederatedRuntime.""" - exp_name = None try: + exp_name = None # Prepare workspace and submit it for the FederatedRuntime archive_path, exp_name = self.runtime.prepare_workspace_archive() self.runtime.submit_experiment(archive_path, exp_name) @@ -200,7 +200,7 @@ def _run_federated(self) -> None: if exp_name is None else f"FederatedRuntime: Experiment {exp_name} failed" ) - raise Exception(f"{error_msg} due to error: {e}") + raise Exception(f"{error_msg} due to error: {e}") from e def _update_from_flspec_obj(self, flspec_obj: FLSpec) -> None: """Update self with attributes from the updated flspec instance. @@ -225,13 +225,12 @@ def _get_flow_state(self) -> Union[FLSpec, None]: status, flspec_obj, exception = self.runtime.get_flow_state() if status: print("\033[92mExperiment ran successfully\033[0m") - return flspec_obj else: print( "\033[91m Experiment could not run due to error:\033[0m", f"\033[91m{exception}\033[0m", ) - return None + return flspec_obj def _capture_instance_snapshot(self, kwargs) -> List: """Takes backup of self before exclude or include filtering. diff --git a/openfl/experimental/workflow/transport/grpc/director_client.py b/openfl/experimental/workflow/transport/grpc/director_client.py index 375ad873d1..1ba1e7df1b 100644 --- a/openfl/experimental/workflow/transport/grpc/director_client.py +++ b/openfl/experimental/workflow/transport/grpc/director_client.py @@ -105,6 +105,7 @@ def wait_experiment(self) -> str: Returns: experiment_name (str): The name of the experiment. """ + logger.info("Waiting for an experiment to run...") response = self.stub.WaitExperiment(self._get_experiment_data()) logger.info("New experiment received: %s", response) if not response.experiment_name: From 88a0c3e86f204792b14696a23aa572785cdb97c6 Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Mon, 12 May 2025 16:22:58 +0530 Subject: [PATCH 5/8] optimize code and handle corner cases Signed-off-by: Ishant Thakare --- .../workflow/component/director/director.py | 9 +- .../workflow/component/director/experiment.py | 108 +++++++++++++----- .../workflow/interface/fl_spec.py | 3 +- 3 files changed, 88 insertions(+), 32 deletions(-) diff --git a/openfl/experimental/workflow/component/director/director.py b/openfl/experimental/workflow/component/director/director.py index d25ec3ec33..afbae89afb 100644 --- a/openfl/experimental/workflow/component/director/director.py +++ b/openfl/experimental/workflow/component/director/director.py @@ -114,6 +114,9 @@ async def start_experiment_execution_loop(self) -> None: # Wait for the experiment to complete and save the result flow_status = await run_aggregator_future await self._flow_status.put(flow_status) + # Mark all envoys' experiment states as None, + # indicating no active experiment + self.col_exp = dict.fromkeys(self.col_exp, None) except Exception as e: logger.error(f"Error while executing experiment: {e}") raise @@ -218,7 +221,11 @@ async def stream_experiment_stdout( f'No experiment name "{experiment_name}" in experiments list, or caller "{caller}"' f" does not have access to this experiment" ) - while not self.experiments_registry[experiment_name].aggregator: + experiment = self.experiments_registry[experiment_name] + while not experiment.aggregator: + if experiment.experiment_status.status.value == 4: + # Exit early if the experiment failed to start (status value 4) + return await asyncio.sleep(5) aggregator = self.experiments_registry[experiment_name].aggregator while True: diff --git a/openfl/experimental/workflow/component/director/experiment.py b/openfl/experimental/workflow/component/director/experiment.py index d1f086b2a1..b82ce428b7 100644 --- a/openfl/experimental/workflow/component/director/experiment.py +++ b/openfl/experimental/workflow/component/director/experiment.py @@ -88,6 +88,8 @@ class Experiment: plan_path (Union[Path, str]): The path to the plan. users (Iterable[str]): The list of users. status (str): The status of the experiment. + aggregator_grpc_server (AggregatorGRPCServer): The gRPC server + for the aggregator. aggregator (Aggregator): The aggregator instance. updated_flow (FLSpec): Updated flow instance. """ @@ -124,10 +126,75 @@ def __init__( self.plan_path = Path(plan_path) self.users = set() if users is None else set(users) self.experiment_status = ExperimentStatus() + self._aggregator_grpc_server = None self.aggregator = None self.updated_flow = None self.experiment_exception_trace = None + def _initialize_aggregator_server( + self, + tls: bool, + root_certificate: Optional[Union[Path, str]], + private_key: Optional[Union[Path, str]], + certificate: Optional[Union[Path, str]], + director_config: Path, + ) -> bool: + """Initialize the aggregator server. + + Args: + tls (bool, optional): A flag indicating if TLS should be used for + connections. Defaults to True. + root_certificate (Optional[Union[Path, str]]): The path to the + root certificate for TLS. Defaults to None. + private_key (Optional[Union[Path, str]]): The path to the private + key for TLS. Defaults to None. + certificate (Optional[Union[Path, str]]): The path to the + certificate for TLS. Defaults to None. + director_config (Path): Path to director's config file. + Defaults to None. + + Returns: + bool: True if the server was created successfully, False otherwise. + """ + try: + self._aggregator_grpc_server = self._create_aggregator_grpc_server( + tls=tls, + root_certificate=root_certificate, + private_key=private_key, + certificate=certificate, + director_config=director_config, + ) + self.aggregator = self._aggregator_grpc_server.aggregator + return True + except Exception: + exception_trace = traceback.format_exc() + logger.error(f"Failed to create aggregator server: {exception_trace}") + self.experiment_status.update_experiment_status( + Status.FAILED, + exception=exception_trace, + ) + return False + + async def _run_experiment_flow(self) -> None: + """Run the experiment flow and the aggregator gRPC server.""" + _, self.updated_flow = await asyncio.gather( + self._run_aggregator_grpc_server(self._aggregator_grpc_server), + self.aggregator.run_flow(), + ) + + def _handle_experiment_failure(self) -> None: + """Handle experiment failure and update status.""" + exception_trace = traceback.format_exc() + self.experiment_status.update_experiment_status( + Status.FAILED, + updated_flow=self.aggregator.extract_flow(), + exception=exception_trace, + ) + # Mark quit jobs as sent to all collaborators to allow the + # aggregator gRPC server to shut down + self.aggregator.quit_job_sent_to = self.collaborators + logger.error(f"Experiment {self.name} failed with error: {exception_trace}") + async def start( self, *, @@ -160,43 +227,24 @@ async def start( - exception (str or None): Formatted traceback if any exception occurred. """ self.experiment_status.update_experiment_status(Status.IN_PROGRESS) + logger.info(f"New experiment {self.name} for collaborators {self.collaborators}") try: - logger.info(f"New experiment {self.name} for collaborators {self.collaborators}") - with ExperimentWorkspace( experiment_name=self.name, data_file_path=self.archive_path, install_requirements=install_requirements, ): - aggregator_grpc_server = self._create_aggregator_grpc_server( - tls=tls, - root_certificate=root_certificate, - private_key=private_key, - certificate=certificate, - director_config=director_config, - ) - self.aggregator = aggregator_grpc_server.aggregator - _, self.updated_flow = await asyncio.gather( - self._run_aggregator_grpc_server( - aggregator_grpc_server, - ), - self.aggregator.run_flow(), - ) - self.experiment_status.update_experiment_status( - Status.FINISHED, - updated_flow=self.updated_flow, - ) - logger.info("Experiment %s was finished successfully.", self.name) + if self._initialize_aggregator_server( + tls, root_certificate, private_key, certificate, director_config + ): + await self._run_experiment_flow() + self.experiment_status.update_experiment_status( + Status.FINISHED, + updated_flow=self.updated_flow, + ) + logger.info("Experiment %s was finished successfully.", self.name) except Exception: - self.experiment_status.update_experiment_status( - Status.FAILED, - updated_flow=self.aggregator.extract_flow(), - exception=traceback.format_exc(), - ) - self.aggregator.quit_job_sent_to = self.collaborators - logger.error( - f"Experiment {self.name} failed with error: {self.experiment_status.exception}" - ) + self._handle_experiment_failure() return self.experiment_status.get_status() diff --git a/openfl/experimental/workflow/interface/fl_spec.py b/openfl/experimental/workflow/interface/fl_spec.py index 540195c215..35146939d1 100644 --- a/openfl/experimental/workflow/interface/fl_spec.py +++ b/openfl/experimental/workflow/interface/fl_spec.py @@ -193,7 +193,8 @@ def _run_federated(self) -> None: # Retrieve the flspec object to update the experiment state flspec_obj = self._get_flow_state() # Update state of self - self._update_from_flspec_obj(flspec_obj) + if flspec_obj: + self._update_from_flspec_obj(flspec_obj) except Exception as e: error_msg = ( "FederatedRuntime: Failed to prepare workspace archive" From 40e8f3a7e1a615740260513abd8971d58c0c29f4 Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Mon, 12 May 2025 16:35:48 +0530 Subject: [PATCH 6/8] optimized code Signed-off-by: Ishant Thakare --- openfl/experimental/workflow/component/director/director.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/openfl/experimental/workflow/component/director/director.py b/openfl/experimental/workflow/component/director/director.py index afbae89afb..f60afd82de 100644 --- a/openfl/experimental/workflow/component/director/director.py +++ b/openfl/experimental/workflow/component/director/director.py @@ -14,6 +14,7 @@ from openfl.experimental.workflow.component.director.experiment import ( Experiment, ExperimentsRegistry, + Status, ) from openfl.experimental.workflow.transport.grpc.exceptions import EnvoyNotFoundError @@ -223,8 +224,8 @@ async def stream_experiment_stdout( ) experiment = self.experiments_registry[experiment_name] while not experiment.aggregator: - if experiment.experiment_status.status.value == 4: - # Exit early if the experiment failed to start (status value 4) + if experiment.experiment_status.status == Status.FAILED: + # Exit early if the experiment failed to start return await asyncio.sleep(5) aggregator = self.experiments_registry[experiment_name].aggregator From 3e153758e888cd3259fee22796a03e2fc2ae165d Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Tue, 3 Jun 2025 12:50:55 +0530 Subject: [PATCH 7/8] Incorporated internal review comments Signed-off-by: Ishant Thakare --- .../component/aggregator/aggregator.py | 98 +++---------------- .../workflow/component/director/director.py | 16 +-- 2 files changed, 23 insertions(+), 91 deletions(-) diff --git a/openfl/experimental/workflow/component/aggregator/aggregator.py b/openfl/experimental/workflow/component/aggregator/aggregator.py index ec42f1b4a0..e81e47e8dc 100644 --- a/openfl/experimental/workflow/component/aggregator/aggregator.py +++ b/openfl/experimental/workflow/component/aggregator/aggregator.py @@ -182,10 +182,10 @@ def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) def _log_big_warning(self) -> None: """Warn user about single collaborator cert mode.""" logger.warning( - f"\n{the_dragon}\nYOU ARE RUNNING IN SINGLE COLLABORATOR CERT MODE! THIS IS" - f" NOT PROPER PKI AND " - f"SHOULD ONLY BE USED IN DEVELOPMENT SETTINGS!!!! YE HAVE BEEN" - f" WARNED!!!" + "YOU ARE RUNNING IN SINGLE COLLABORATOR CERT MODE! THIS IS" + " NOT PROPER PKI AND " + "SHOULD ONLY BE USED IN DEVELOPMENT SETTINGS!!!! YE HAVE BEEN" + " WARNED!!!" ) def _initialize_flow(self) -> None: @@ -193,20 +193,23 @@ def _initialize_flow(self) -> None: FLSpec._reset_clones() FLSpec._create_clones(self.flow, self.flow.runtime.collaborators) - def _prepare_collaborator_queues(self, next_step) -> None: - """Prepare task queues for collaborators with clones. + def _enqueue_next_step_for_collaborators(self, next_step) -> None: + """Enqueue the next step and associated clone for each selected collaborator. Args: - next_step (str): Next step in the flow + next_step (str): Next step to be executed by collaborators """ for k, v in self.__collaborator_tasks_queue.items(): if k in self.selected_collaborators: v.put((next_step, self.clones_dict[k])) else: - logger.info(f"Tasks will not be sent to {k}") + logger.info( + f"Skipping task dispatch for collaborator '{k}' " + f"as it is not part of selected_collaborators." + ) def _restore_instance_snapshot(self) -> None: - """Restore instance snapshot if it exists.""" + """Restore the FLSpec state at the aggregator from a saved instance snapshot.""" if hasattr(self, "instance_snapshot"): self.flow.restore_instance_snapshot(self.flow, list(self.instance_snapshot)) delattr(self, "instance_snapshot") @@ -267,13 +270,13 @@ async def run_flow(self) -> FLSpec: logger.info(f"Starting round {self.current_round}...") while True: - # Execute Aggregator steps + # Perform Aggregator steps if any next_step = self.do_task(f_name) if self.time_to_quit: logger.info("Experiment Completed.") break - self._prepare_collaborator_queues(next_step) + self._enqueue_next_step_for_collaborators(next_step) await self._track_collaborator_status() self.collaborator_task_results.clear() f_name = self.next_step @@ -554,76 +557,3 @@ def valid_collaborator_cn_and_id( def all_quit_jobs_sent(self) -> bool: """Assert all quit jobs are sent to collaborators.""" return set(self.quit_job_sent_to) == set(self.authorized_cols) - - -the_dragon = """ - - ,@@.@@+@@##@,@@@@.`@@#@+ *@@@@ #@##@ `@@#@# @@@@@ @@ @@@@` #@@@ :@@ `@#`@@@#.@ - @@ #@ ,@ +. @@.@* #@ :` @+*@ .@`+. @@ *@::@`@@ @@# @@ #`;@`.@@ @@@`@`#@* +:@` - @@@@@ ,@@@ @@@@ +@@+ @@@@ .@@@ @@ .@+:@@@: .;+@` @@ ,;,#@` @@ @@@@@ ,@@@* @ - @@ #@ ,@`*. @@.@@ #@ ,; `@+,@#.@.*` @@ ,@::@`@@` @@@@# @@`:@;*@+ @@ @`:@@`@ *@@ ` - .@@`@@,+@+;@.@@ @@`@@;*@ ;@@#@:*@+;@ `@@;@@ #@**@+;@ `@@:`@@@@ @@@@.`@+ .@ +@+@*,@ - `` `` ` `` . ` ` ` ` ` .` ` `` `` `` ` . ` - - - - .** - ;` `****: - @**`******* - *** +***********; - ,@***;` .*:,;************ - ;***********@@*********** - ;************************, - `************************* - ************************* - ,************************ - **#********************* - *@****` :**********; - +**; .********. - ;*; `*******#: `,: - ****@@@++:: ,,;***. - *@@@**;#;: +: **++*, - @***#@@@: +*; ,**** - @*@+**** ***` ****, - ,@#******. , **** **;,**. - * ******** :, ;*:*+ ** :,** - # ********:: *,.*:**` * ,*; - . *********: .+,*:;*: : `:** - ; :********: ***::** ` ` ** - + :****::*** , *;;::**` :* - `` .****::;**::: *;::::*; ;* - * *****::***:. **::::** ;: - # *****;:**** ;*::;*** ,*` - ; ************` ,**:****; ::* - : *************;:;*;*++: *. - : *****************;* `* - `. `*****************; : *. - .` .*+************+****;: :* - `. :;+***********+******;` : .,* - ; ::*+*******************. `:: .`:. - + :::**********************;;:` * - + ,::;*************;:::*******. * - # `:::+*************:::;******** :, * - @ :::***************;:;*********;:, * - @ ::::******:*********************: ,:* - @ .:::******:;*********************, :* - # :::******::******###@*******;;**** *, - # .::;*****::*****#****@*****;:::***; `` ** - * ::;***********+*****+#******::*****,,,,** - : :;***********#******#****************** - .` `;***********#******+****+************ - `, ***#**@**+***+*****+**************;` - ; *++**#******#+****+` `.,.. - + `@***#*******#****# - + +***@********+**+: - * .+**+;**;;;**;#**# - ,` ****@ +*+: - # +**+ :+** - @ ;**+, ,***+ - # #@+**** *#****+ - `; @+***+@ `#**+#++ - # #*#@##, .++:.,# - `* @# +. - @@@ - # `@ - , """ diff --git a/openfl/experimental/workflow/component/director/director.py b/openfl/experimental/workflow/component/director/director.py index 281e2ac2db..775e330683 100644 --- a/openfl/experimental/workflow/component/director/director.py +++ b/openfl/experimental/workflow/component/director/director.py @@ -36,7 +36,7 @@ class Director: _flow_status (Queue): Stores the flow status experiments_registry (ExperimentsRegistry): An object of ExperimentsRegistry to store the experiments. - col_exp (dict): A dictionary to store the experiments for + collaborator_experiments (dict): A dictionary to store the experiments for collaborators. col_exp_queues (defaultdict): A defaultdict to store the experiment queues for collaborators. @@ -83,7 +83,7 @@ def __init__( self._flow_status = asyncio.Queue() self.experiments_registry = ExperimentsRegistry() - self.col_exp = {} + self.collaborator_experiments = {} self.col_exp_queues = defaultdict(asyncio.Queue) self._envoy_registry = {} self.envoy_health_check_period = envoy_health_check_period @@ -117,7 +117,9 @@ async def start_experiment_execution_loop(self) -> None: await self._flow_status.put(flow_status) # Mark all envoys' experiment states as None, # indicating no active experiment - self.col_exp = dict.fromkeys(self.col_exp, None) + self.collaborator_experiments = dict.fromkeys( + self.collaborator_experiments, None + ) except Exception as e: logger.error(f"Error while executing experiment: {e}") raise @@ -153,17 +155,17 @@ async def wait_experiment(self, envoy_name: str) -> str: Returns: str: The name of the experiment on the queue. """ - experiment_name = self.col_exp.get(envoy_name) + experiment_name = self.collaborator_experiments.get(envoy_name) # If any envoy gets disconnected if experiment_name and experiment_name in self.experiments_registry: experiment = self.experiments_registry[experiment_name] if experiment.aggregator.current_round < experiment.aggregator.rounds_to_train: return experiment_name - self.col_exp[envoy_name] = None + self.collaborator_experiments[envoy_name] = None queue = self.col_exp_queues[envoy_name] experiment_name = await queue.get() - self.col_exp[envoy_name] = experiment_name + self.collaborator_experiments[envoy_name] = experiment_name return experiment_name @@ -283,7 +285,7 @@ def get_envoys(self) -> Dict[str, Any]: envoy["is_online"] = time.time() < envoy.get("last_updated", 0) + envoy.get( "valid_duration", 0 ) - envoy["experiment_name"] = self.col_exp.get(envoy["name"], "None") + envoy["experiment_name"] = self.collaborator_experiments.get(envoy["name"], "None") return self._envoy_registry From 46c65b188e63a2319f36b80b54f5a0e1dfffb847 Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Wed, 18 Jun 2025 12:49:38 +0530 Subject: [PATCH 8/8] Incoporate internal review comments Signed-off-by: Ishant Thakare --- .../workflow/component/aggregator/aggregator.py | 16 +++++++++++----- .../workflow/component/director/experiment.py | 1 - 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/openfl/experimental/workflow/component/aggregator/aggregator.py b/openfl/experimental/workflow/component/aggregator/aggregator.py index e81e47e8dc..053819295d 100644 --- a/openfl/experimental/workflow/component/aggregator/aggregator.py +++ b/openfl/experimental/workflow/component/aggregator/aggregator.py @@ -199,12 +199,12 @@ def _enqueue_next_step_for_collaborators(self, next_step) -> None: Args: next_step (str): Next step to be executed by collaborators """ - for k, v in self.__collaborator_tasks_queue.items(): - if k in self.selected_collaborators: - v.put((next_step, self.clones_dict[k])) + for collaborator, task_queue in self.__collaborator_tasks_queue.items(): + if collaborator in self.selected_collaborators: + task_queue.put((next_step, self.clones_dict[collaborator])) else: logger.info( - f"Skipping task dispatch for collaborator '{k}' " + f"Skipping task dispatch for collaborator '{collaborator}' " f"as it is not part of selected_collaborators." ) @@ -555,5 +555,11 @@ def valid_collaborator_cn_and_id( ) def all_quit_jobs_sent(self) -> bool: - """Assert all quit jobs are sent to collaborators.""" + """ + Check whether a quit job has been sent to all authorized collaborators. + + Returns: + bool: True if quit jobs have been sent to all authorized collaborators, + False otherwise. + """ return set(self.quit_job_sent_to) == set(self.authorized_cols) diff --git a/openfl/experimental/workflow/component/director/experiment.py b/openfl/experimental/workflow/component/director/experiment.py index 5586eeaaef..b0fb889550 100644 --- a/openfl/experimental/workflow/component/director/experiment.py +++ b/openfl/experimental/workflow/component/director/experiment.py @@ -129,7 +129,6 @@ def __init__( self._aggregator_grpc_server = None self.aggregator = None self.updated_flow = None - self.experiment_exception_trace = None def _initialize_aggregator_server( self,