This repository was archived by the owner on Nov 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 137
/
Copy pathReactorEventBus.java
52 lines (39 loc) · 1.57 KB
/
ReactorEventBus.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package tv.codely.shared.infrastructure.bus;
import reactor.bus.Event;
import reactor.bus.EventBus;
import reactor.bus.selector.Selector;
import reactor.fn.Consumer;
import tv.codely.shared.domain.DomainEvent;
import tv.codely.shared.application.DomainEventSubscriber;
import java.util.List;
import java.util.Set;
import static reactor.bus.selector.Selectors.$;
public class ReactorEventBus implements tv.codely.shared.domain.EventBus {
private final EventBus bus;
public ReactorEventBus(final Set<DomainEventSubscriber> subscribers) {
bus = EventBus.create();
subscribers.forEach(this::registerOnEventBus);
}
public void addSubscriber (DomainEventSubscriber subscriber) {
registerOnEventBus(subscriber);
}
@Override
public void publish(final List<DomainEvent> events) {
events.forEach(this::publish);
}
private void publish(final DomainEvent event) {
Class<? extends DomainEvent> eventIdentifier = event.getClass();
Event<DomainEvent> wrappedEvent = Event.wrap(event);
bus.notify(eventIdentifier, wrappedEvent);
}
private void registerOnEventBus(final DomainEventSubscriber subscriber) {
final Selector eventIdentifier = $(subscriber.subscribedTo());
bus.on(eventIdentifier, eventConsumer(subscriber));
}
private Consumer<Event> eventConsumer(final DomainEventSubscriber subscriber) {
return (Event reactorEvent) -> {
DomainEvent unwrappedEvent = (DomainEvent) reactorEvent.getData();
subscriber.consume(unwrappedEvent);
};
}
}