1
1
package io .neonbee .internal ;
2
2
3
- import static io .vertx .core .Future .succeededFuture ;
4
-
5
3
import java .util .function .Supplier ;
6
4
7
5
import io .neonbee .NeonBee ;
10
8
import io .vertx .core .Vertx ;
11
9
import io .vertx .core .json .JsonArray ;
12
10
import io .vertx .core .shareddata .AsyncMap ;
11
+ import io .vertx .core .shareddata .SharedData ;
13
12
14
13
/**
15
14
* A registry to manage values in the {@link SharedDataAccessor} shared map.
@@ -22,7 +21,9 @@ public class WriteSafeRegistry<T> implements Registry<T> {
22
21
23
22
private final LoggingFacade logger = LoggingFacade .create ();
24
23
25
- private final SharedDataAccessor sharedDataAccessor ;
24
+ private final SharedData sharedData ;
25
+
26
+ private final Registry <T > sharedRegistry ;
26
27
27
28
private final String registryName ;
28
29
@@ -33,8 +34,30 @@ public class WriteSafeRegistry<T> implements Registry<T> {
33
34
* @param registryName the name of the map registry
34
35
*/
35
36
public WriteSafeRegistry (Vertx vertx , String registryName ) {
37
+ this (registryName , new SharedDataAccessor (vertx , WriteSafeRegistry .class ));
38
+ }
39
+
40
+ /**
41
+ * Create a new {@link WriteSafeRegistry}.
42
+ *
43
+ * @param registryName the name of the map registry
44
+ * @param sharedData the shared data
45
+ */
46
+ public WriteSafeRegistry (String registryName , SharedData sharedData ) {
47
+ this (registryName , sharedData , new SharedRegistry <>(registryName , sharedData ));
48
+ }
49
+
50
+ /**
51
+ * Create a new {@link WriteSafeRegistry}.
52
+ *
53
+ * @param registryName the name of the map registry
54
+ * @param sharedData the shared data
55
+ * @param registry the shared registry
56
+ */
57
+ WriteSafeRegistry (String registryName , SharedData sharedData , Registry <T > registry ) {
36
58
this .registryName = registryName ;
37
- this .sharedDataAccessor = new SharedDataAccessor (vertx , this .getClass ());
59
+ this .sharedData = sharedData ;
60
+ this .sharedRegistry = registry ;
38
61
}
39
62
40
63
/**
@@ -48,12 +71,26 @@ public WriteSafeRegistry(Vertx vertx, String registryName) {
48
71
public Future <Void > register (String sharedMapKey , T value ) {
49
72
logger .info ("register value: \" {}\" in shared map: \" {}\" " , sharedMapKey , value );
50
73
51
- return lock (sharedMapKey , () -> addValue (sharedMapKey , value ));
74
+ return lock (sharedMapKey , () -> sharedRegistry . register (sharedMapKey , value ));
52
75
}
53
76
54
77
@ Override
55
78
public Future <JsonArray > get (String sharedMapKey ) {
56
- return getSharedMap ().compose (map -> map .get (sharedMapKey )).map (o -> (JsonArray ) o );
79
+ return sharedRegistry .get (sharedMapKey );
80
+ }
81
+
82
+ /**
83
+ * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey.
84
+ *
85
+ * @param sharedMapKey the shared map key
86
+ * @param value the value to unregister
87
+ * @return the future
88
+ */
89
+ @ Override
90
+ public Future <Void > unregister (String sharedMapKey , T value ) {
91
+ logger .debug ("unregister value: \" {}\" from shared map: \" {}\" " , sharedMapKey , value );
92
+
93
+ return lock (sharedMapKey , () -> sharedRegistry .unregister (sharedMapKey , value ));
57
94
}
58
95
59
96
/**
@@ -65,7 +102,7 @@ public Future<JsonArray> get(String sharedMapKey) {
65
102
*/
66
103
protected Future <Void > lock (String sharedMapKey , Supplier <Future <Void >> futureSupplier ) {
67
104
logger .debug ("Get lock for {}" , sharedMapKey );
68
- return sharedDataAccessor .getLock (sharedMapKey ).onFailure (throwable -> {
105
+ return sharedData .getLock (sharedMapKey ).onFailure (throwable -> {
69
106
logger .error ("Error acquiring lock for {}" , sharedMapKey , throwable );
70
107
}).compose (lock -> Future .<Void >future (event -> futureSupplier .get ().onComplete (event ))
71
108
.onComplete (anyResult -> {
@@ -74,62 +111,15 @@ protected Future<Void> lock(String sharedMapKey, Supplier<Future<Void>> futureSu
74
111
}));
75
112
}
76
113
77
- private Future <Void > addValue (String sharedMapKey , Object value ) {
78
- Future <AsyncMap <String , Object >> sharedMap = getSharedMap ();
79
-
80
- return sharedMap .compose (map -> map .get (sharedMapKey ))
81
- .map (valueOrNull -> valueOrNull != null ? (JsonArray ) valueOrNull : new JsonArray ())
82
- .compose (valueArray -> {
83
- if (!valueArray .contains (value )) {
84
- valueArray .add (value );
85
- }
86
-
87
- if (logger .isInfoEnabled ()) {
88
- logger .info ("Registered verticle {} in shared map." , value );
89
- }
90
-
91
- return sharedMap .compose (map -> map .put (sharedMapKey , valueArray ));
92
- });
93
- }
94
-
95
- /**
96
- * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey.
97
- *
98
- * @param sharedMapKey the shared map key
99
- * @param value the value to unregister
100
- * @return the future
101
- */
102
- @ Override
103
- public Future <Void > unregister (String sharedMapKey , T value ) {
104
- logger .debug ("unregister value: \" {}\" from shared map: \" {}\" " , sharedMapKey , value );
105
-
106
- return lock (sharedMapKey , () -> removeValue (sharedMapKey , value ));
107
- }
108
-
109
- private Future <Void > removeValue (String sharedMapKey , Object value ) {
110
- Future <AsyncMap <String , Object >> sharedMap = getSharedMap ();
111
-
112
- return sharedMap .compose (map -> map .get (sharedMapKey )).map (jsonArray -> (JsonArray ) jsonArray )
113
- .compose (values -> {
114
- if (values == null ) {
115
- return succeededFuture ();
116
- }
117
-
118
- if (logger .isInfoEnabled ()) {
119
- logger .info ("Unregistered verticle {} in shared map." , value );
120
- }
121
-
122
- values .remove (value );
123
- return sharedMap .compose (map -> map .put (sharedMapKey , values ));
124
- });
125
- }
126
-
127
114
/**
128
115
* Shared map that is used as registry.
116
+ * <p>
117
+ * It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)}
118
+ * and {@link WriteSafeRegistry#unregister(String, Object)} methods.
129
119
*
130
120
* @return Future to the shared map
131
121
*/
132
122
public Future <AsyncMap <String , Object >> getSharedMap () {
133
- return sharedDataAccessor .getAsyncMap (registryName );
123
+ return sharedData .getAsyncMap (registryName );
134
124
}
135
125
}
0 commit comments