@@ -6,28 +6,31 @@ import kotlinx.coroutines.delay
6
6
import kotlinx.coroutines.launch
7
7
import kotlinx.coroutines.newSingleThreadContext
8
8
import mu.KotlinLogging
9
+ import uk.dsxt.rhea.ReactiveConfig.Builder
9
10
import java.sql.*
10
11
11
- class JDBCConfigSource (private val url : String ,private val scheme : String ) : ConfigSource {
12
-
13
- private lateinit var connection : Connection
12
+ /* *
13
+ * [ConfigSource] that reads configuration from JDBC.
14
+ */
15
+ class JDBCConfigSource (private val url : String , private val scheme : String ) : ConfigSource {
16
+ private lateinit var connection: Connection
14
17
private lateinit var channel: SendChannel <RawProperty >
15
18
private lateinit var configScope: CoroutineScope
16
19
private val map: HashMap <String , Node ?> = HashMap ()
17
20
private val logger = KotlinLogging .logger {}
18
21
19
- init {
20
- try {
22
+ init {
23
+ try {
21
24
connection = DriverManager .getConnection(url)
22
- } catch (e : SQLException ) {
25
+ } catch (e: SQLException ) {
23
26
logger.error(" Couldn't connect to database with this url: \" ${url} \" . Values from this place are no longer updates" )
24
27
}
25
28
}
26
29
27
- constructor (url : String , login : String , password : String , scheme: String ) : this (url, scheme){
28
- try {
30
+ constructor (url: String , login: String , password: String , scheme: String ) : this (url, scheme) {
31
+ try {
29
32
connection = DriverManager .getConnection(url, login, password)
30
- } catch (e : SQLException ) {
33
+ } catch (e: SQLException ) {
31
34
logger.error(" Couldn't connect to database with this url: \" ${url} \" . Values from this place are no longer updates" )
32
35
}
33
36
}
@@ -36,23 +39,23 @@ class JDBCConfigSource(private val url : String,private val scheme : String) : C
36
39
channel = channelOfChanges
37
40
configScope = scope
38
41
39
- if (! connection.isClosed){
42
+ if (! connection.isClosed) {
40
43
configScope.launch(newSingleThreadContext(" watching thread" )) {
41
- try {
42
- if (scheme.contains(" " )){
44
+ try {
45
+ if (scheme.contains(" " )) {
43
46
throw error(" Incorrect name of scheme: \" ${scheme} \" . Values from this place are no longer updates" )
44
47
}
45
48
var query = " SELECT * FROM $scheme "
46
49
val statement = connection.createStatement()
47
50
var result = statement.executeQuery(query)
48
51
49
52
val columnNumber = result.metaData.columnCount
50
- val updateColumnName : String
53
+ val updateColumnName: String
51
54
var latestUpdate = 3
52
55
53
- while (result.next()){
56
+ while (result.next()) {
54
57
map[result.getString(1 )] = toNode(result.getObject(2 ))
55
- if (columnNumber == 3 ){
58
+ if (columnNumber == 3 ) {
56
59
with (result.getInt(3 )) {
57
60
if (this > latestUpdate) {
58
61
latestUpdate = this
@@ -66,19 +69,19 @@ class JDBCConfigSource(private val url : String,private val scheme : String) : C
66
69
query + = " WHERE $updateColumnName > "
67
70
}
68
71
69
- while (true ){
72
+ while (true ) {
70
73
delay(1000 )
71
- if (columnNumber == 3 ){
74
+ if (columnNumber == 3 ) {
72
75
result = statement.executeQuery(query + " $latestUpdate " )
73
76
}
74
- while (result.next()){
77
+ while (result.next()) {
75
78
val first = result.getString(1 )
76
79
val second = toNode(result.getObject(2 ))
77
- if (map[first] != second){
80
+ if (map[first] != second) {
78
81
map[first] = second
79
82
channel.send(RawProperty (first, second))
80
83
}
81
- if (columnNumber == 3 ){
84
+ if (columnNumber == 3 ) {
82
85
with (result.getInt(3 )) {
83
86
if (this > latestUpdate) {
84
87
latestUpdate = this
@@ -87,11 +90,9 @@ class JDBCConfigSource(private val url : String,private val scheme : String) : C
87
90
}
88
91
}
89
92
}
90
- }
91
- catch (e : SQLException ){
93
+ } catch (e: SQLException ) {
92
94
logger.error(" Failed reading from $scheme scheme. Values from this place are no longer updates" )
93
- }
94
- catch (e : Exception ){
95
+ } catch (e: Exception ) {
95
96
logger.error(e.message)
96
97
}
97
98
}
0 commit comments