1
1
package uk.dsxt.rhea
2
2
3
+ import kotlinx.coroutines.CoroutineScope
4
+ import kotlinx.coroutines.channels.BroadcastChannel
5
+ import kotlinx.coroutines.channels.Channel
6
+ import kotlinx.coroutines.flow.Flow
7
+ import kotlinx.coroutines.flow.asFlow
3
8
import kotlinx.coroutines.flow.filter
4
9
import kotlinx.coroutines.flow.map
10
+ import kotlinx.coroutines.launch
11
+ import mu.KotlinLogging
12
+ import java.util.concurrent.ConcurrentHashMap
13
+ import kotlin.coroutines.EmptyCoroutineContext
5
14
6
15
/* *
7
- * [ConfigSource] that reads configuration from Vault .
16
+ * [ReactiveConfig] manages configuration.
8
17
*
9
18
* **Note: use [Builder] to build instances of this class.
10
19
*/
11
- class ReactiveConfig private constructor(val manager : ConfigManager ) {
20
+ class ReactiveConfig private constructor(
21
+ private val mapOfSources : MutableMap <String , ConfigSource >,
22
+ private val configScope : CoroutineScope ,
23
+ private val channelOfChanges : BroadcastChannel <RawProperty >
24
+ ) {
25
+ private val flowOfChanges: Flow <RawProperty > = channelOfChanges.asFlow()
26
+ private val mapOfProperties: MutableMap <String , Reloadable <* >> = ConcurrentHashMap ()
27
+
28
+ companion object {
29
+ val reactiveConfigLogger = KotlinLogging .logger {}
30
+ }
12
31
13
32
/* *
14
33
* Builder for [ReactiveConfig].
15
34
*/
16
35
class Builder {
17
- private val manager: ConfigManager = ConfigManager ()
36
+ private val mapOfSources: MutableMap <String , ConfigSource > = ConcurrentHashMap ()
37
+ private val configScope = CoroutineScope (EmptyCoroutineContext )
38
+ private val channelOfChanges: BroadcastChannel <RawProperty > = BroadcastChannel (Channel .BUFFERED )
18
39
19
40
/* *
20
41
* Adds configuration [source] in [ReactiveConfig]s built.
@@ -25,8 +46,11 @@ class ReactiveConfig private constructor(val manager: ConfigManager) {
25
46
*/
26
47
fun addSource (name : String , source : ConfigSource ): Builder {
27
48
return apply {
28
- manager.mapOfSources[name] = source
29
- manager.addSource(source)
49
+ mapOfSources[name] = source
50
+ configScope.launch {
51
+ source.subscribe(channelOfChanges, configScope)
52
+ }
53
+ Thread .sleep(100 )
30
54
}
31
55
}
32
56
@@ -36,25 +60,25 @@ class ReactiveConfig private constructor(val manager: ConfigManager) {
36
60
* @return new instance of [ReactiveConfig]
37
61
*/
38
62
fun build (): ReactiveConfig {
39
- return ReactiveConfig (manager )
63
+ return ReactiveConfig (mapOfSources, configScope, channelOfChanges )
40
64
}
41
65
}
42
66
43
67
/* *
44
68
* @return [Reloadable] that holds the freshest value of property with given [key] and [type].
45
69
*/
46
70
operator fun <T > get (key : String , type : PropertyType <T >): Reloadable <T >? {
47
- if (manager. mapOfProperties.containsKey(key)) {
48
- with (manager. mapOfProperties[key]) {
71
+ if (mapOfProperties.containsKey(key)) {
72
+ with (mapOfProperties[key]) {
49
73
return this as Reloadable <T >
50
74
}
51
75
} else {
52
76
synchronized(this ) {
53
- if (! manager. mapOfProperties.containsKey(key)) {
77
+ if (! mapOfProperties.containsKey(key)) {
54
78
var isSet = false
55
79
var initialValue: T = type.initial
56
80
57
- for (source in manager. mapOfSources.values) {
81
+ for (source in mapOfSources.values) {
58
82
with (type.parse(source.getNode(key))) {
59
83
when (this ) {
60
84
is ParseResult .Success -> {
@@ -71,7 +95,7 @@ class ReactiveConfig private constructor(val manager: ConfigManager) {
71
95
if (isSet) {
72
96
return Reloadable (
73
97
initialValue,
74
- manager. flowOfChanges
98
+ flowOfChanges
75
99
.filter { rawProperty: RawProperty ->
76
100
rawProperty.key == key
77
101
}
@@ -89,16 +113,16 @@ class ReactiveConfig private constructor(val manager: ConfigManager) {
89
113
.map {
90
114
it as T
91
115
},
92
- manager. configScope
116
+ configScope
93
117
).also {
94
- manager. mapOfProperties[key] = it
118
+ mapOfProperties[key] = it
95
119
}
96
120
} else {
97
121
reactiveConfigLogger.error(" Couldn't find property with key=$key in any config sources" )
98
122
return null
99
123
}
100
124
} else {
101
- with (manager. mapOfProperties[key]) {
125
+ with (mapOfProperties[key]) {
102
126
return this as Reloadable <T >
103
127
}
104
128
}
0 commit comments