Skip to content

Conversation

@ishant162
Copy link
Owner

@ishant162 ishant162 commented May 20, 2025

Summary

Workflow API allows for the specification of private attributes for participants in a Federation. These private attributes represent confidential information that should not be shared with other participants.

Currently, in LocalRuntime, private attributes of participants can be updated within the flow. This is possible because they are updated from the FLSpec clones back into the participant object before being removed from the clone (ref).

This introduces the possibility for users to implement mechanisms that persist intermediate results or artifacts within private attributes, which can subsequently be retrieved via the participant object upon experiment completion. For example:

class FLFlow(FLSpec):
    @aggregator
    def start(self):
        self.collaborators = self.runtime.collaborators
        self.next(self.aggregator_step)

    @aggregator
    def aggregator_step(self):
        self.watermark_data_loader = torch.utils.data.DataLoader(
            watermark_data, batch_size=64, shuffle=True                 # --> Modified aggregator private attribute 
        )
        self.next(self.collaborator_step_a, foreach="collaborators")

    @collaborator
    def collaborator_step_a(self):
        self.train_loader = torch.utils.data.DataLoader(
            train_dataset, batch_size=64, shuffle=True                   # --> Modified collaborator private attribute
        )
        self.test_loader = torch.utils.data.DataLoader(
            test_dataset, batch_size=64, shuffle=True                    # --> Modified collaborator private attribute
        )
        self.next(self.end)

    @aggregator
    def end(self, _):
        print("End of flow")

if __name__ == "__main__":

    aggregator = Aggregator()
    aggregator.private_attributes = {
    "watermark_data_loader": torch.utils.data.DataLoader(
            watermark_data, batch_size=128, shuffle=True
        )
    }

    collaborator_names = ["Portland", "Seattle"]
    collaborators = [Collaborator(name=name) for name in collaborator_names]
    for idx, collab in enumerate(collaborators):
        collab.private_attributes = {
            'train_loader': torch.utils.data.DataLoader(local_train,batch_size=128, shuffle=True),
            'test_loader': torch.utils.data.DataLoader(local_test,batch_size=128, shuffle=True)
        }

    local_runtime = LocalRuntime(
        aggregator=aggregator, collaborators=collaborators, backend=backend     # --> ray or single_process 
    )
    flflow = FLFlow()
    flflow.runtime = local_runtime
    flflow.run()

    print(aggregator.private_attributes)
    O/P-> 
        {"watermark_data_loader": <torch.utils.data.dataloader.DataLoader at 0x7feac0bdb8e0>}
    
    print(collaborators[0].private_attributes)
    O/P->
        {
            "train_loader": <torch.utils.data.dataloader.DataLoader at 0x7feac0bdac50>,
            "test_loader": <torch.utils.data.dataloader.DataLoader at 0x7feac0bdbd60>
        }

Although this approach is effective when using the single_process backend, it does not behave as expected under the ray backend.

This PR introduces functionality to update the state for Aggregator and Collaborator using the ray backend in LocalRuntime, ensuring consistent behavior with the single_process backend.

Use cases:

  • Each collaborator initially sets a train_loader using the full dataset. During the experiment, based on performance metrics (e.g., poor recall), the collaborator updates the train_loader to use over-sampled minority cases or focus on rare classes.
    if current_round_recall < 0.6: self.train_loader = build_oversampled_loader(local_dataset)
  • A federated AutoML system analyzes which samples lead to high gradient noise and automatically filters them out in future rounds.
    self.train_loader = filter_out_noisy_samples(self.train_loader)

Type of Change (Mandatory)

Specify the type of change being made.

  • Feature Enhancement

Description (Mandatory)

In LocalRuntime with the Ray backend, the Aggregator and Collaborator operate as remote actors. After the experiment concludes, while the FLSpec state is updated ([reference]), the internal states of these remote actors are not synchronized with their corresponding objects.

To update the local participant state, the following changes are implemented for the ray backend:

  • Maintain references to Aggregator and Collaborator objects in LocalRuntime.
  • Update the Participant class to implement a get_state() method.
  • After flow execution, call get_state() on remote actors and update references.

Testing

  • Added testcase: Validate the dynamic assignment and synchronization of private attributes.
  • LocalRuntime Regression

Topics for Discussion

  • Workflow API is designed to enable users to retrieve experiment results (trained model, metrics, and other artifacts) via the FLSpec. The ability for users to modify the state of private attributes and fetch it exposes a new interface for accessing the internal participant state. Should this behavior be considered acceptable and supported? Could be helpful to align on this.

  • The private attributes of Participants are properties of the Aggregator and Collaborator. Allowing the flow (developed by the user/data scientist) to modify these attributes could pose a security risk. Since LocalRuntime is a simulation with no real participants, this behavior could still be acceptable
    However, in FederatedRuntime, these private attributes are strictly inaccessible to the user-defined flow by design, in order to preserve data privacy and maintain system integrity.
    Supporting this feature only in LocalRuntime would create a discrepancy between the simulation and production environments. If this behavior is retained, it should be clearly documented to avoid confusion and misaligned expectations.

Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
@ishant162 ishant162 requested a review from Copilot May 22, 2025 06:42
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds state synchronization for aggregator and collaborators when using the Ray backend.

  • Stores references to aggregator and collaborator objects for later state syncing
  • Introduces get_state on participants to retrieve internal state
  • Implements _sync_participants_state and invokes it after aggregation tasks in Ray mode

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
openfl/experimental/workflow/runtime/local_runtime.py Store object references, add state sync method, call it in execute_task
openfl/experimental/workflow/interface/participants.py Add get_state method returning participant’s __dict__
Comments suppressed due to low confidence (4)

openfl/experimental/workflow/runtime/local_runtime.py:401

  • [nitpick] Using double underscores leads to name mangling and may complicate access in subclasses. Consider using a single leading underscore (e.g., _aggregator_reference) for a private attribute.
self.__aggregator_reference = aggregator

openfl/experimental/workflow/runtime/local_runtime.py:492

  • The ray_group_assign call is no longer guarded by a backend == "ray" check, so it will run for any backend other than single_process. Reintroduce a conditional to restrict Ray-specific code to the Ray backend.
collaborator_ray_refs = ray_group_assign(collaborators, num_actors=self.num_actors)

openfl/experimental/workflow/runtime/local_runtime.py:676

  • [nitpick] The new _sync_participants_state logic isn't covered by existing tests. Add unit tests to verify that aggregator and collaborator states are correctly synchronized when using the Ray backend.
if self.backend == "ray":

openfl/experimental/workflow/runtime/local_runtime.py:549

  • In _sync_participants_state, self._aggregator is undefined and will raise an AttributeError. It should reference self.__aggregator_reference instead of self._aggregator.
self.__aggregator_reference.__dict__.update(ray.get(self._aggregator.get_state.remote()))

Returns:
Dict[str, Any]: The state of the participant.
"""
return self.__dict__
Copy link

Copilot AI May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Returning __dict__ exposes all internal attributes, including private ones. Consider returning only the necessary state fields to avoid leaking internal implementation details.

Suggested change
return self.__dict__
return {
"private_attributes": self.private_attributes,
"name": self._name
}

Copilot uses AI. Check for mistakes.
Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
@scngupta-dsp
Copy link
Collaborator

scngupta-dsp commented May 22, 2025

Summary

Workflow API allows for the specification of private attributes for participants in a Federation. These private attributes represent confidential information that should not be shared with other participants.

Currently, in LocalRuntime, private attributes of participants can be updated within the flow. This is possible because they are updated from the FLSpec clones back into the participant object before being removed from the clone (ref).

Users could potentially develop functionality that allows them to save intermediate results or artifacts into these private attributes and access them via the participant object at the end of the experiment. An example of this functionality is as follows:

Add simple example

While this approach works for backend="single_process", it does not work for backend="ray".

This PR introduces functionality to update the state for Aggregator and Collaborator using the ray backend in LocalRuntime, ensuring consistent behavior with the single_process backend.

Type of Change (Mandatory)

Specify the type of change being made.

  • Feature Enhancement

Description (Mandatory)

In LocalRuntime with the Ray backend, the Aggregator and Collaborator operate as remote actors. After the experiment concludes, while the FLSpec state is updated ([reference]), the internal states of these remote actors are not synchronized with their corresponding objects.

To update the local participant state, the following changes are implemented for the ray backend:

  • Maintain references to Aggregator and Collaborator objects in LocalRuntime.
  • Update the Participant class to implement a get_state() method.
  • After flow execution, call get_state() on remote actors and update references.

Testing

  • Added testcase: Validate the dynamic assignment and synchronization of private attributes.
  • LocalRuntime Regression

Topics for Discussion

  • Workflow API is designed to enable users to retrieve experiment results (trained model, metrics, and other artifacts) via the FLSpec. The ability for users to modify the state of private attributes and fetch itexposes a new interface for accessing the internal participant state. Is this behavior acceptable and should it even be supported ?

  • Is the ability of the flow to dynamically update the participant private attributes desirable ?
    The private attributes of Participants are properties of the Aggregator and Collaborator. Allowing the flow (developed by the user/data scientist) to modify these attributes could pose a security risk. Since LocalRuntime is a simulation with no real participants, this behavior could still be acceptable
    However in FederatedRuntime, the participant's private attributes are not accessible to user. Supporting this feature only in LocalRuntime would lead to a discrepancy between simulation and production environments and should be documented

Signed-off-by: Ishant Thakare <ishantrog752@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants