Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Bus in Native Java #324

Closed
wants to merge 16 commits into from
Closed

Conversation

taldekar
Copy link
Contributor

@taldekar taldekar commented Jan 15, 2025

Issue #322

Description of changes:
This PR introduces an Event Bus implementation utilizing the Java 9 Reactive Flow library. The event bus extends the functionality provided by the Flow API by adding custom concurrency constructs to enable asynchronous publishing and subscribing.

Brief Design Overview:
This implementation builds upon the basic publisher provided by the Flow API library, introducing a two-tier concurrency model to optimize performance and responsiveness. Here's a breakdown of the solution:

Publisher Side:

  • Publisher-side Event Queuing: When an event of a particular type is submitted, it's added to an ordered event queue specific to that event type.
  • Publisher Thread Pool: A dedicated thread pool is maintained for publishing events.
  • Publisher Callback Job: If no active job exists for an event type, a new job is submitted to the thread pool that will process the events in the events queue for the relevant type.
  • Registering Callback: A callback is registered for each event type, which determines how the publishing job handles that event. The job will call the registered callback for each event in the event queue.
  • Callback Invocation: The job systematically calls the registered callback, which is the SubmissionPublisher::submit method on the relevant publisher for each event in the order that the events were received.

Subscriber Side:

  • Subscriber-side Event Queuing: When a publishing thread submits an event, the subscriber’s Subscriber::onNext method is called. Inside the subscriber, the published event is submitted to another queue unique to that subscriber.
  • Subscriber Thread Pool: A distinct thread pool is maintained for processing subscriber callbacks.
  • Subscriber Callback Job: If no active job exists to handle a particular subscriber’s callback, a new job is submitted to the thread pool to process the events in the event queue for that subscriber.
  • Registering Callback: A callback is registered for the subscriber, which determines how the above job handles the event. For subscribers, this is the EventObserver::onEvent method provided during subscription.
  • Callback Invocation: The job sequentially invokes the registered callbacks for the events in the queue for each subscriber.

In essence, this solution creates a pipeline where a dedicated thread handles event publishing for each event type, while on the subscriber side, each subscriber's callbacks are processed by a single thread, regardless of the number of events. By maintaining an event queue, we can guarantee event ordering for each subscriber.

Key features:

  • Utilizes Java 9 Reactive Flow for event publishing and subscription.
  • Demonstrates integration with existing codebase.
  • Provides asynchronous publish and subscribe functionality using custom thread pools for each respectively.

Current limitations and areas for further investigation:

  • Thread pool size's need to be fine-tuned.
  • Load testing needs to be performed to ensure that the implementation is scalable and performant.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@taldekar taldekar requested a review from breedloj January 15, 2025 18:30
@taldekar taldekar changed the title Simple event bus in native Java Event Bus in Native Java Jan 17, 2025
@taldekar taldekar force-pushed the taldekar/NativeJavaEventBusPOC branch from e49aaa4 to 968aa47 Compare January 17, 2025 16:18
@taldekar taldekar force-pushed the taldekar/NativeJavaEventBusPOC branch 2 times, most recently from c957b00 to 082e09f Compare January 19, 2025 21:37
@taldekar taldekar force-pushed the taldekar/NativeJavaEventBusPOC branch from 082e09f to 2c3a4ca Compare January 21, 2025 15:18
@taldekar taldekar force-pushed the taldekar/NativeJavaEventBusPOC branch 2 times, most recently from 6ee52a1 to 7916673 Compare January 21, 2025 23:33
@taldekar taldekar force-pushed the taldekar/NativeJavaEventBusPOC branch from 7916673 to 1c682f3 Compare January 21, 2025 23:43
@taldekar taldekar force-pushed the taldekar/NativeJavaEventBusPOC branch from 2930b70 to a316ac0 Compare January 22, 2025 18:32
@taldekar taldekar force-pushed the taldekar/NativeJavaEventBusPOC branch from a316ac0 to 98841fd Compare January 22, 2025 19:25
@taldekar taldekar requested a review from justinmk3 January 23, 2025 17:21

@SuppressWarnings("unchecked")
default Class<T> getEventType() {
Class<?> currentClass = getClass();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any references that you took inspiration from to compose this class? If yes, those would be useful to link in the description here

Copy link
Contributor Author

@taldekar taldekar Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I used stack overflow for this, but it wasn't exactly this. I can try to find it. It think it was this one: https://stackoverflow.com/questions/3437897/how-do-i-get-a-class-instance-of-generic-type-t. I'll add it to the code.


@Override
@SuppressWarnings("unchecked")
default Class<T> getEventType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between StreamObserver and EventObserver? I see a few new functions in this class but the getEventType looks the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EventObserver is the base bones version of a subscriber that only cares about events on the data channel. The StreamObserver extends this functionality with hooks for when the stream errors out or completes. The interfaces were introduced because the standard subscriber method onNext wasn't descriptive or I felt like it didn't communicate that we were listening to/handling events. Also we'd be forced to also implement the onError and onComplete methods everytime.

@@ -44,6 +50,16 @@ public Activator() {
.initializeOnStartUp()
.build();
codeReferenceLoggingService = DefaultCodeReferenceLoggingService.getInstance();

List<TestSubscribers> testSubscriberList = new ArrayList<>(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect Activator to be the source of truth for event broker subscriptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this was just to test that the event bus is working. Subscriptions can come from anywhere in the codebase.

public void onEvent(final TestEvent event) {
// Activator.getLogger().info(event.getMessage());

if (event.getSequenceNumber() - previousSequenceNumber != 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unclear on the significance of the sequence number here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequence numbers are more for testing the POC on my end. They helped ensure that the events arrived in the order in which they were published to all subscribers.

AuthStatusProvider.addAuthStatusChangeListener(amazonQCommonActions.getFeedbackDialogContributionAction());
AuthStatusProvider.addAuthStatusChangeListener(amazonQCommonActions.getCustomizationDialogContributionAction());
authStateSubscription = EventBroker.getInstance().subscribe(this);
EventBroker.getInstance().subscribe(amazonQCommonActions.getSignoutAction());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancels are missing for l88-90 right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, but they were not implemented in the original code either. To provide cancel functionality, we'd just need to keep track of the Subscription returned.

public final class TestPublisher {

public TestPublisher() {
Thread publisherThread = new Thread(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not map this to the auth events that get posted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have separate publishers for auth events. This was just to demo that the event bus was performant and could deliver events in the order they were posted.

}

public static void notifyAuthStatusChanged(final AuthState authState) {
if (prevAuthState != null && prevAuthState.equals(authState)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check is missing from the publisher calls when an event is published to the broker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I added de-duplication logic to the event broker itself here.

@taldekar taldekar requested a review from shruti0085 January 23, 2025 22:38
@taldekar taldekar closed this Jan 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants