Skip to content

Commit 3dd7d5d

Browse files
authored
feat: add means of awaiting event emission, fix flaky build (#1463)
## This PR - adds the ability to await event emissions by returning a new construct - uses this construct in tests :warning: in rare cases, this could be a breaking change, but the events API is currently experimental, so we will not do a major version ubmp ### Related Issues Fixes #1449
1 parent 6cca721 commit 3dd7d5d

File tree

8 files changed

+189
-44
lines changed

8 files changed

+189
-44
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package dev.openfeature.sdk;
2+
3+
/**
4+
* A class to help with synchronization by allowing the optional awaiting of the associated action.
5+
*/
6+
public class Awaitable {
7+
8+
/**
9+
* An already-completed Awaitable. Awaiting this will return immediately.
10+
*/
11+
public static final Awaitable FINISHED = new Awaitable(true);
12+
13+
private boolean isDone = false;
14+
15+
public Awaitable() {}
16+
17+
private Awaitable(boolean isDone) {
18+
this.isDone = isDone;
19+
}
20+
21+
/**
22+
* Lets the calling thread wait until some other thread calls {@link Awaitable#wakeup()}. If
23+
* {@link Awaitable#wakeup()} has been called before the current thread invokes this method, it will return
24+
* immediately.
25+
*/
26+
@SuppressWarnings("java:S2142")
27+
public synchronized void await() {
28+
while (!isDone) {
29+
try {
30+
this.wait();
31+
} catch (InterruptedException ignored) {
32+
// ignored, do not propagate the interrupted state
33+
}
34+
}
35+
}
36+
37+
/**
38+
* Wakes up all threads that have called {@link Awaitable#await()} and lets them proceed.
39+
*/
40+
public synchronized void wakeup() {
41+
isDone = true;
42+
this.notifyAll();
43+
}
44+
}

src/main/java/dev/openfeature/sdk/EventProvider.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,32 @@ public void shutdown() {
7676
* @param event The event type
7777
* @param details The details of the event
7878
*/
79-
public void emit(ProviderEvent event, ProviderEventDetails details) {
80-
if (eventProviderListener != null) {
81-
eventProviderListener.onEmit(event, details);
82-
}
79+
public Awaitable emit(final ProviderEvent event, final ProviderEventDetails details) {
80+
final var localEventProviderListener = this.eventProviderListener;
81+
final var localOnEmit = this.onEmit;
8382

84-
final TriConsumer<EventProvider, ProviderEvent, ProviderEventDetails> localOnEmit = this.onEmit;
85-
if (localOnEmit != null) {
86-
emitterExecutor.submit(() -> localOnEmit.accept(this, event, details));
83+
if (localEventProviderListener == null && localOnEmit == null) {
84+
return Awaitable.FINISHED;
8785
}
86+
87+
final var awaitable = new Awaitable();
88+
89+
// These calls need to be executed on a different thread to prevent deadlocks when the provider initialization
90+
// relies on a ready event to be emitted
91+
emitterExecutor.submit(() -> {
92+
try (var ignored = OpenFeatureAPI.lock.readLockAutoCloseable()) {
93+
if (localEventProviderListener != null) {
94+
localEventProviderListener.onEmit(event, details);
95+
}
96+
if (localOnEmit != null) {
97+
localOnEmit.accept(this, event, details);
98+
}
99+
} finally {
100+
awaitable.wakeup();
101+
}
102+
});
103+
104+
return awaitable;
88105
}
89106

90107
/**
@@ -93,8 +110,8 @@ public void emit(ProviderEvent event, ProviderEventDetails details) {
93110
*
94111
* @param details The details of the event
95112
*/
96-
public void emitProviderReady(ProviderEventDetails details) {
97-
emit(ProviderEvent.PROVIDER_READY, details);
113+
public Awaitable emitProviderReady(ProviderEventDetails details) {
114+
return emit(ProviderEvent.PROVIDER_READY, details);
98115
}
99116

100117
/**
@@ -104,8 +121,8 @@ public void emitProviderReady(ProviderEventDetails details) {
104121
*
105122
* @param details The details of the event
106123
*/
107-
public void emitProviderConfigurationChanged(ProviderEventDetails details) {
108-
emit(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details);
124+
public Awaitable emitProviderConfigurationChanged(ProviderEventDetails details) {
125+
return emit(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details);
109126
}
110127

111128
/**
@@ -114,8 +131,8 @@ public void emitProviderConfigurationChanged(ProviderEventDetails details) {
114131
*
115132
* @param details The details of the event
116133
*/
117-
public void emitProviderStale(ProviderEventDetails details) {
118-
emit(ProviderEvent.PROVIDER_STALE, details);
134+
public Awaitable emitProviderStale(ProviderEventDetails details) {
135+
return emit(ProviderEvent.PROVIDER_STALE, details);
119136
}
120137

121138
/**
@@ -124,7 +141,7 @@ public void emitProviderStale(ProviderEventDetails details) {
124141
*
125142
* @param details The details of the event
126143
*/
127-
public void emitProviderError(ProviderEventDetails details) {
128-
emit(ProviderEvent.PROVIDER_ERROR, details);
144+
public Awaitable emitProviderError(ProviderEventDetails details) {
145+
return emit(ProviderEvent.PROVIDER_ERROR, details);
129146
}
130147
}

src/main/java/dev/openfeature/sdk/EventSupport.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package dev.openfeature.sdk;
22

3-
import java.util.ArrayList;
4-
import java.util.List;
3+
import java.util.Collection;
54
import java.util.Map;
65
import java.util.Optional;
76
import java.util.Set;
87
import java.util.UUID;
98
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.ConcurrentLinkedQueue;
1010
import java.util.concurrent.ExecutorService;
1111
import java.util.concurrent.Executors;
1212
import java.util.concurrent.TimeUnit;
@@ -23,13 +23,10 @@ class EventSupport {
2323

2424
// we use a v4 uuid as a "placeholder" for anonymous clients, since
2525
// ConcurrentHashMap doesn't support nulls
26-
private static final String defaultClientUuid = UUID.randomUUID().toString();
26+
private static final String DEFAULT_CLIENT_UUID = UUID.randomUUID().toString();
2727
private final Map<String, HandlerStore> handlerStores = new ConcurrentHashMap<>();
2828
private final HandlerStore globalHandlerStore = new HandlerStore();
29-
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(runnable -> {
30-
final Thread thread = new Thread(runnable);
31-
return thread;
32-
});
29+
private final ExecutorService taskExecutor = Executors.newCachedThreadPool();
3330

3431
/**
3532
* Run all the event handlers associated with this domain.
@@ -40,11 +37,10 @@ class EventSupport {
4037
* @param eventDetails the event details
4138
*/
4239
public void runClientHandlers(String domain, ProviderEvent event, EventDetails eventDetails) {
43-
domain = Optional.ofNullable(domain).orElse(defaultClientUuid);
40+
domain = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID);
4441

4542
// run handlers if they exist
4643
Optional.ofNullable(handlerStores.get(domain))
47-
.filter(store -> Optional.of(store).isPresent())
4844
.map(store -> store.handlerMap.get(event))
4945
.ifPresent(handlers -> handlers.forEach(handler -> runHandler(handler, eventDetails)));
5046
}
@@ -69,7 +65,7 @@ public void runGlobalHandlers(ProviderEvent event, EventDetails eventDetails) {
6965
* @param handler the handler function to run
7066
*/
7167
public void addClientHandler(String domain, ProviderEvent event, Consumer<EventDetails> handler) {
72-
final String name = Optional.ofNullable(domain).orElse(defaultClientUuid);
68+
final String name = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID);
7369

7470
// lazily create and cache a HandlerStore if it doesn't exist
7571
HandlerStore store = Optional.ofNullable(this.handlerStores.get(name)).orElseGet(() -> {
@@ -89,7 +85,7 @@ public void addClientHandler(String domain, ProviderEvent event, Consumer<EventD
8985
* @param handler the handler ref to be removed
9086
*/
9187
public void removeClientHandler(String domain, ProviderEvent event, Consumer<EventDetails> handler) {
92-
domain = Optional.ofNullable(domain).orElse(defaultClientUuid);
88+
domain = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID);
9389
this.handlerStores.get(domain).removeHandler(event, handler);
9490
}
9591

@@ -160,14 +156,14 @@ public void shutdown() {
160156
// instantiated when a handler is added to that client.
161157
static class HandlerStore {
162158

163-
private final Map<ProviderEvent, List<Consumer<EventDetails>>> handlerMap;
159+
private final Map<ProviderEvent, Collection<Consumer<EventDetails>>> handlerMap;
164160

165161
HandlerStore() {
166162
handlerMap = new ConcurrentHashMap<>();
167-
handlerMap.put(ProviderEvent.PROVIDER_READY, new ArrayList<>());
168-
handlerMap.put(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, new ArrayList<>());
169-
handlerMap.put(ProviderEvent.PROVIDER_ERROR, new ArrayList<>());
170-
handlerMap.put(ProviderEvent.PROVIDER_STALE, new ArrayList<>());
163+
handlerMap.put(ProviderEvent.PROVIDER_READY, new ConcurrentLinkedQueue<>());
164+
handlerMap.put(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, new ConcurrentLinkedQueue<>());
165+
handlerMap.put(ProviderEvent.PROVIDER_ERROR, new ConcurrentLinkedQueue<>());
166+
handlerMap.put(ProviderEvent.PROVIDER_STALE, new ConcurrentLinkedQueue<>());
171167
}
172168

173169
void addHandler(ProviderEvent event, Consumer<EventDetails> handler) {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package dev.openfeature.sdk;
2+
3+
import static org.awaitility.Awaitility.await;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
import static org.junit.jupiter.api.Assertions.fail;
6+
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
import java.util.concurrent.atomic.AtomicInteger;
10+
import org.junit.jupiter.api.Test;
11+
import org.junit.jupiter.api.Timeout;
12+
13+
@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
14+
class AwaitableTest {
15+
@Test
16+
void waitingForFinishedIsANoOp() {
17+
var startTime = System.currentTimeMillis();
18+
Awaitable.FINISHED.await();
19+
var endTime = System.currentTimeMillis();
20+
assertTrue(endTime - startTime < 10);
21+
}
22+
23+
@Test
24+
void waitingForNotFinishedWaitsEvenWhenInterrupted() throws InterruptedException {
25+
var awaitable = new Awaitable();
26+
var mayProceed = new AtomicBoolean(false);
27+
28+
var thread = new Thread(() -> {
29+
awaitable.await();
30+
if (!mayProceed.get()) {
31+
fail();
32+
}
33+
});
34+
thread.start();
35+
36+
var startTime = System.currentTimeMillis();
37+
do {
38+
thread.interrupt();
39+
} while (startTime + 1000 > System.currentTimeMillis());
40+
mayProceed.set(true);
41+
awaitable.wakeup();
42+
thread.join();
43+
}
44+
45+
@Test
46+
void callingWakeUpWakesUpAllWaitingThreads() throws InterruptedException {
47+
var awaitable = new Awaitable();
48+
var isRunning = new AtomicInteger();
49+
50+
Runnable runnable = () -> {
51+
isRunning.incrementAndGet();
52+
var start = System.currentTimeMillis();
53+
awaitable.await();
54+
var end = System.currentTimeMillis();
55+
if (end - start > 10) {
56+
fail();
57+
}
58+
};
59+
60+
var numThreads = 2;
61+
var threads = new Thread[numThreads];
62+
for (int i = 0; i < numThreads; i++) {
63+
threads[i] = new Thread(runnable);
64+
threads[i].start();
65+
}
66+
67+
await().atMost(1, TimeUnit.SECONDS).until(() -> isRunning.get() == numThreads);
68+
69+
awaitable.wakeup();
70+
71+
for (int i = 0; i < numThreads; i++) {
72+
threads[i].join();
73+
}
74+
}
75+
}

src/test/java/dev/openfeature/sdk/DeveloperExperienceTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ void shouldPutTheProviderInStateErrorAfterEmittingErrorEvent() {
150150
api.setProviderAndWait(domain, provider);
151151
Client client = api.getClient(domain);
152152
assertThat(client.getProviderState()).isEqualTo(ProviderState.READY);
153-
provider.emitProviderError(ProviderEventDetails.builder().build());
153+
provider.emitProviderError(ProviderEventDetails.builder().build()).await();
154154
assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR);
155155
}
156156

@@ -165,7 +165,7 @@ void shouldPutTheProviderInStateStaleAfterEmittingStaleEvent() {
165165
api.setProviderAndWait(domain, provider);
166166
Client client = api.getClient(domain);
167167
assertThat(client.getProviderState()).isEqualTo(ProviderState.READY);
168-
provider.emitProviderStale(ProviderEventDetails.builder().build());
168+
provider.emitProviderStale(ProviderEventDetails.builder().build()).await();
169169
assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE);
170170
}
171171

@@ -180,9 +180,9 @@ void shouldPutTheProviderInStateReadyAfterEmittingReadyEvent() {
180180
api.setProviderAndWait(domain, provider);
181181
Client client = api.getClient(domain);
182182
assertThat(client.getProviderState()).isEqualTo(ProviderState.READY);
183-
provider.emitProviderStale(ProviderEventDetails.builder().build());
183+
provider.emitProviderStale(ProviderEventDetails.builder().build()).await();
184184
assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE);
185-
provider.emitProviderReady(ProviderEventDetails.builder().build());
185+
provider.emitProviderReady(ProviderEventDetails.builder().build()).await();
186186
assertThat(client.getProviderState()).isEqualTo(ProviderState.READY);
187187
}
188188
}

src/test/java/dev/openfeature/sdk/EventProviderTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public static void resetDefaultProvider() {
3232
}
3333

3434
@Test
35+
@Timeout(value = 2, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
3536
@DisplayName("should run attached onEmit with emitters")
3637
void emitsEventsWhenAttached() {
3738
TriConsumer<EventProvider, ProviderEvent, ProviderEventDetails> onEmit = mockOnEmit();

src/test/java/dev/openfeature/sdk/EventsTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class EventsTest {
2424
private OpenFeatureAPI api;
2525

2626
@BeforeEach
27-
public void setUp() throws Exception {
27+
void setUp() {
2828
api = new OpenFeatureAPI();
2929
}
3030

@@ -578,7 +578,7 @@ void shouldHaveAllProperties() {
578578
number = "5.3.3",
579579
text = "Handlers attached after the provider is already in the associated state, MUST run immediately.")
580580
void matchingReadyEventsMustRunImmediately() {
581-
final String name = "matchingEventsMustRunImmediately";
581+
final String name = "matchingReadyEventsMustRunImmediately";
582582
final Consumer<EventDetails> handler = mockHandler();
583583

584584
// provider which is already ready
@@ -597,14 +597,14 @@ void matchingReadyEventsMustRunImmediately() {
597597
number = "5.3.3",
598598
text = "Handlers attached after the provider is already in the associated state, MUST run immediately.")
599599
void matchingStaleEventsMustRunImmediately() {
600-
final String name = "matchingEventsMustRunImmediately";
600+
final String name = "matchingStaleEventsMustRunImmediately";
601601
final Consumer<EventDetails> handler = mockHandler();
602602

603603
// provider which is already stale
604604
TestEventsProvider provider = new TestEventsProvider(INIT_DELAY);
605605
Client client = api.getClient(name);
606606
api.setProviderAndWait(name, provider);
607-
provider.emitProviderStale(ProviderEventDetails.builder().build());
607+
provider.emitProviderStale(ProviderEventDetails.builder().build()).await();
608608
assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE);
609609

610610
// should run even though handler was added after stale
@@ -618,14 +618,14 @@ void matchingStaleEventsMustRunImmediately() {
618618
number = "5.3.3",
619619
text = "Handlers attached after the provider is already in the associated state, MUST run immediately.")
620620
void matchingErrorEventsMustRunImmediately() {
621-
final String name = "matchingEventsMustRunImmediately";
621+
final String name = "matchingErrorEventsMustRunImmediately";
622622
final Consumer<EventDetails> handler = mockHandler();
623623

624624
// provider which is already in error
625625
TestEventsProvider provider = new TestEventsProvider(INIT_DELAY);
626626
Client client = api.getClient(name);
627627
api.setProviderAndWait(name, provider);
628-
provider.emitProviderError(ProviderEventDetails.builder().build());
628+
provider.emitProviderError(ProviderEventDetails.builder().build()).await();
629629
assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR);
630630

631631
verify(handler, never()).accept(any());

0 commit comments

Comments
 (0)