Skip to content

Commit

Permalink
Improve internal API readability
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 19, 2025
1 parent b915d76 commit 082e09f
Showing 1 changed file with 69 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@

import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

public final class EventBroker {

@FunctionalInterface
private interface TypedCallable<T> {
void call(T event);
void callWith(T event);
}

private final class BlockingCallerRunsPolicy implements RejectedExecutionHandler {
public static final class CallerRunsPolicyBlocking implements RejectedExecutionHandler {

private final BlockingQueue<Runnable> workQueue;

BlockingCallerRunsPolicy(final BlockingQueue<Runnable> workQueue) {
CallerRunsPolicyBlocking(final BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
}

Expand All @@ -56,96 +55,81 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor

}

private class OrderedThreadPoolExecutor {
public static final class OrderedThreadPoolExecutor {

private final Map<String, BlockingQueue<?>> typedEventQueue;
private final Map<String, AtomicBoolean> typedJobStatus;
private final Map<String, ReentrantLock> typedJobLock;
private final Map<String, TypedCallable<?>> typedCallback;
private final Map<String, BlockingQueue<?>> interestIdToEventQueueMap;
private final Map<String, AtomicBoolean> interestIdToJobStatusMap;
private final Map<String, ReentrantLock> interestIdToJobLockMap;
private final Map<String, TypedCallable<?>> interestIdToCallbackMap;

private final BlockingQueue<Runnable> workQueue;
private final BlockingQueue<Runnable> scheduledJobsQueue;
private final ThreadPoolExecutor executor;
private final int eventQueueCapacity;

OrderedThreadPoolExecutor(final int coreThreadCount, final int maxThreadCount, final int queueCapacity,
final int keepAliveTime, final int eventQueueCapacity) {
workQueue = new ArrayBlockingQueue<>(queueCapacity);
typedEventQueue = new ConcurrentHashMap<>();
typedJobStatus = new ConcurrentHashMap<>();
typedJobLock = new ConcurrentHashMap<>();
typedCallback = new ConcurrentHashMap<>();
OrderedThreadPoolExecutor(final int coreThreadCount, final int maxThreadCount, final int jobQueueCapacity,
final int eventQueueCapacity, final int keepAliveTime, final TimeUnit keepAliveTimeUnit) {
scheduledJobsQueue = new ArrayBlockingQueue<>(jobQueueCapacity);
interestIdToEventQueueMap = new ConcurrentHashMap<>();
interestIdToJobStatusMap = new ConcurrentHashMap<>();
interestIdToJobLockMap = new ConcurrentHashMap<>();
interestIdToCallbackMap = new ConcurrentHashMap<>();

this.eventQueueCapacity = eventQueueCapacity;

executor = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, keepAliveTime, TimeUnit.MILLISECONDS,
workQueue, Executors.defaultThreadFactory(), new BlockingCallerRunsPolicy(workQueue));
executor = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, keepAliveTime, keepAliveTimeUnit,
scheduledJobsQueue, Executors.defaultThreadFactory(), new CallerRunsPolicyBlocking(scheduledJobsQueue));
}

public <T, R> void registerCallback(final String interestId, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestId, callback);
public <T, R> void registerCallbackForInterest(final String interestId, final TypedCallable<R> callback) {
interestIdToCallbackMap.putIfAbsent(interestId, callback);
}

public <T> boolean hasRegisteredCallback(final String interestType) {
return typedCallback.containsKey(interestType);
public <T> boolean isCallbackRegisteredForInterest(final String interestId) {
return interestIdToCallbackMap.containsKey(interestId);
}

@SuppressWarnings("unchecked")
public <T, R> void submit(final String interestId, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.computeIfAbsent(interestId,
public <T, R> void submitEventForInterest(final String interestId, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) interestIdToEventQueueMap.computeIfAbsent(interestId,
k -> new ArrayBlockingQueue<>(eventQueueCapacity, true));
try {
eventQueue.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
}

handleScheduling(interestId, (Class<R>) event.getClass(), eventQueue);
handleJobScheduling(interestId, (Class<R>) event.getClass(), eventQueue);
}

public <T, R> void handleScheduling(final String interestId, final Class<R> eventType,
private <T, R> void handleJobScheduling(final String interestId, final Class<R> eventType,
final BlockingQueue<R> eventQueue) {
AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestId, k -> new AtomicBoolean(false));
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestId, k -> new ReentrantLock(true));
AtomicBoolean jobStatus = interestIdToJobStatusMap.computeIfAbsent(interestId, k -> new AtomicBoolean(false));
ReentrantLock jobLock = interestIdToJobLockMap.computeIfAbsent(interestId, k -> new ReentrantLock(true));

jobLock.lock();
try {
if (!jobStatus.get() && !eventQueue.isEmpty()) {
if (jobStatus.compareAndSet(false, true)) {
executor.submit(() -> processEventQueue(interestId, eventType,
eventQueue, jobStatus, jobLock));
}
}
} finally {
jobLock.unlock();
if (!jobStatus.get() && !eventQueue.isEmpty() && jobStatus.compareAndSet(false, true)) {
executor.submit(() -> processQueuedEvents(interestId, eventType, eventQueue, jobStatus, jobLock));
}
}

@SuppressWarnings("unchecked")
public <T, R> void processEventQueue(final String interestId, final Class<R> eventType,
private <T, R> void processQueuedEvents(final String interestId, final Class<R> eventType,
final BlockingQueue<R> eventQueue, final AtomicBoolean jobStatus, final ReentrantLock jobLock) {
if (jobStatus == null || jobLock == null || eventQueue == null) {
throw new NullPointerException("ThreadPoolExecutor in unexpected state");
}

jobLock.lock();

try {
TypedCallable<R> eventCallback = (TypedCallable<R>) typedCallback.get(interestId);
TypedCallable<R> eventCallback = (TypedCallable<R>) interestIdToCallbackMap.get(interestId);
if (eventCallback == null) {
return;
}

while (!eventQueue.isEmpty()) {
try {
R newEvent = eventQueue.take();
if (newEvent != null) {
try {
eventCallback.call(newEvent);
} catch (Exception e) {
e.printStackTrace();
}
R newEvent = eventQueue.poll();
if (newEvent != null) {
try {
eventCallback.callWith(newEvent);
} catch (Exception e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
Expand All @@ -159,19 +143,18 @@ public <T, R> void processEventQueue(final String interestId, final Class<R> eve
}

private static final EventBroker INSTANCE;
private final Map<Class<?>, SubmissionPublisher<?>> publishers;
private final OrderedThreadPoolExecutor emissionExecutor;
private final OrderedThreadPoolExecutor consumptionExecutor;
private final Map<Class<?>, SubmissionPublisher<?>> eventTypeToPublisherMap;
private final OrderedThreadPoolExecutor eventEmissionExecutor;
private final OrderedThreadPoolExecutor eventConsumptionExecutor;

static {
INSTANCE = new EventBroker();
}

private EventBroker() {
publishers = new ConcurrentHashMap<>();

emissionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000);
consumptionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000);
eventTypeToPublisherMap = new ConcurrentHashMap<>();
eventEmissionExecutor = new OrderedThreadPoolExecutor(3, 10, 10, 100, 10, TimeUnit.MILLISECONDS);
eventConsumptionExecutor = new OrderedThreadPoolExecutor(3, 10, 10, 100, 10, TimeUnit.MILLISECONDS);
}

public static EventBroker getInstance() {
Expand All @@ -184,20 +167,20 @@ public <T> void post(final T event) {
return;
}

SubmissionPublisher<T> publisher = getPublisher((Class<T>) event.getClass());
if (!emissionExecutor.hasRegisteredCallback((event.getClass().getName()))) {
registerPublisherCallback(publisher, event.getClass().getName());
SubmissionPublisher<T> publisher = getPublisherForEventType((Class<T>) event.getClass());
if (!eventEmissionExecutor.isCallbackRegisteredForInterest((event.getClass().getName()))) {
registerPublisherCallbackForInterest(event.getClass().getName(), publisher);
}

emissionExecutor.submit(event.getClass().getName(), event);
eventEmissionExecutor.submitEventForInterest(event.getClass().getName(), event);
}

public <T> Subscription subscribe(final EventObserver<T> observer) {
SubmissionPublisher<T> publisher = getPublisher(observer.getEventType());
SubmissionPublisher<T> publisher = getPublisherForEventType(observer.getEventType());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
String subscriberId = UUID.randomUUID().toString();

registerSubscriberCallback(observer, subscriberId);
registerSubscriberCallbackForInterest(subscriberId, observer);

Subscriber<T> subscriber = new Subscriber<>() {

Expand All @@ -207,19 +190,18 @@ public <T> Subscription subscribe(final EventObserver<T> observer) {
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscriptionReference.set(subscription);

this.subscription.request(1);
}

@Override
public void onNext(final T event) {
consumptionExecutor.submit(subscriberId, event);
this.subscription.request(1);
eventConsumptionExecutor.submitEventForInterest(subscriberId, event);
subscription.request(1);
}

@Override
public void onError(final Throwable throwable) {
return;
throwable.printStackTrace();
}

@Override
Expand All @@ -234,11 +216,11 @@ public void onComplete() {
}

public <T> Subscription subscribe(final StreamObserver<T> observer) {
SubmissionPublisher<T> publisher = getPublisher(observer.getEventType());
SubmissionPublisher<T> publisher = getPublisherForEventType(observer.getEventType());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
String subscriberId = UUID.randomUUID().toString();

registerSubscriberCallback(observer, subscriberId);
registerSubscriberCallbackForInterest(subscriberId, observer);

Subscriber<T> subscriber = new Subscriber<>() {

Expand All @@ -248,14 +230,13 @@ public <T> Subscription subscribe(final StreamObserver<T> observer) {
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscriptionReference.set(subscription);

this.subscription.request(1);
}

@Override
public void onNext(final T event) {
consumptionExecutor.submit(subscriberId, event);
this.subscription.request(1);
eventConsumptionExecutor.submitEventForInterest(subscriberId, event);
subscription.request(1);
}

@Override
Expand All @@ -275,30 +256,31 @@ public void onComplete() {
}

@SuppressWarnings("unchecked")
private <T> SubmissionPublisher<T> getPublisher(final Class<T> eventType) {
return (SubmissionPublisher<T>) publishers.computeIfAbsent(eventType,
private <T> SubmissionPublisher<T> getPublisherForEventType(final Class<T> eventType) {
return (SubmissionPublisher<T>) eventTypeToPublisherMap.computeIfAbsent(eventType,
key -> new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize()));
}

private <T> void registerSubscriberCallback(final EventObserver<T> subscriber, final String subscriberId) {
Activator.getLogger().info(subscriberId);
private <T> void registerSubscriberCallbackForInterest(final String interestId,
final EventObserver<T> observer) {
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void call(final T event) {
subscriber.onEvent(event);
public void callWith(final T event) {
observer.onEvent(event);
}
};
consumptionExecutor.registerCallback(subscriberId, eventCallback);
eventConsumptionExecutor.registerCallbackForInterest(interestId, eventCallback);
}

private <T> void registerPublisherCallback(final SubmissionPublisher<T> publisher, final String eventId) {
private <T> void registerPublisherCallbackForInterest(final String interestId,
final SubmissionPublisher<T> publisher) {
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void call(final T event) {
public void callWith(final T event) {
publisher.submit(event);
}
};
emissionExecutor.registerCallback(eventId, eventCallback);
eventEmissionExecutor.registerCallbackForInterest(interestId, eventCallback);
}

}

0 comments on commit 082e09f

Please sign in to comment.