1
1
package smartthings .cassandra ;
2
2
3
- import com .datastax .driver .core .*;
4
- import com .google .common .base .Charsets ;
5
- import com .google .common .io .Files ;
3
+ import com .datastax .oss .driver .api .core .*;
4
+ import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
5
+ import com .datastax .oss .driver .api .core .config .DriverConfigLoader ;
6
+ import com .datastax .oss .driver .api .core .cql .ResultSet ;
7
+ import com .datastax .oss .driver .api .core .cql .Row ;
8
+ import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
9
+ import com .datastax .oss .driver .api .core .ssl .ProgrammaticSslEngineFactory ;
10
+ import com .datastax .oss .driver .shaded .guava .common .base .Charsets ;
11
+ import com .datastax .oss .driver .shaded .guava .common .io .Files ;
6
12
import org .slf4j .Logger ;
7
13
import org .slf4j .LoggerFactory ;
8
14
import smartthings .migration .CassandraMigrationException ;
14
20
import java .io .File ;
15
21
import java .io .FileInputStream ;
16
22
import java .io .IOException ;
23
+ import java .net .InetSocketAddress ;
17
24
import java .security .KeyStore ;
18
25
import java .security .SecureRandom ;
26
+ import java .time .Duration ;
19
27
import java .util .ArrayList ;
20
28
import java .util .Arrays ;
21
29
import java .util .List ;
@@ -30,11 +38,12 @@ public class CassandraConnection implements AutoCloseable {
30
38
private String truststorePassword ;
31
39
private String keystorePath ;
32
40
private String keystorePassword ;
33
- private Cluster cluster ;
34
- private Session session ;
41
+ private CqlSession session ;
42
+ private boolean mySession = false ;
35
43
private String keyspace ;
36
44
private String host ;
37
45
private int port ;
46
+ private String localDatacenter ;
38
47
private String username ;
39
48
private String password ;
40
49
@@ -53,38 +62,44 @@ public CassandraConnection(MigrationParameters parameters, String ownerName) {
53
62
if (session == null ) {
54
63
this .host = parameters .getHost ();
55
64
this .port = parameters .getPort ();
65
+ this .localDatacenter = parameters .getLocalDatacenter ();
56
66
this .username = parameters .getUsername ();
57
67
this .password = parameters .getPassword ();
58
68
this .truststorePassword = parameters .getTruststorePassword ();
59
69
this .truststorePath = parameters .getTruststorePath ();
60
70
this .keystorePassword = parameters .getKeystorePassword ();
61
71
this .keystorePath = parameters .getKeystorePath ();
72
+ this .mySession = true ;
62
73
}
63
74
this .keyspace = parameters .getKeyspace ();
64
-
65
75
}
66
76
67
77
public void connect () throws Exception {
68
78
if (session == null ) {
69
79
logger .debug ("Connecting to Cassandra at " + host + ":" + port );
70
80
71
- QueryOptions queryOptions = new QueryOptions ().setConsistencyLevel (ConsistencyLevel .QUORUM );
72
-
73
- Cluster .Builder builder = Cluster .builder ().addContactPoint (host ).withPort (port ).withMaxSchemaAgreementWaitSeconds (20 ).withQueryOptions (queryOptions );
81
+ CqlSessionBuilder builder = CqlSession .builder ()
82
+ .addContactPoint (new InetSocketAddress (host , port ))
83
+ .withLocalDatacenter (localDatacenter )
84
+ .withConfigLoader (
85
+ DriverConfigLoader .programmaticBuilder ()
86
+ .withDuration (DefaultDriverOption .CONTROL_CONNECTION_AGREEMENT_TIMEOUT , Duration .ofSeconds (20 ))
87
+ .withString (DefaultDriverOption .REQUEST_CONSISTENCY , ConsistencyLevel .QUORUM .toString ())
88
+ .build ()
89
+ );
74
90
75
91
if (all (truststorePath , truststorePassword , keystorePath , keystorePassword )) {
76
92
logger .debug ("Using SSL for the connection" );
77
93
SSLContext sslContext = getSSLContext (truststorePath , truststorePassword , keystorePath , keystorePassword );
78
- builder .withSSL ( JdkSSLOptions . builder (). withSSLContext ( sslContext ). withCipherSuites ( cipherSuites ). build ( ));
94
+ builder .withSslEngineFactory ( new ProgrammaticSslEngineFactory ( sslContext , cipherSuites ));
79
95
}
80
96
81
97
if (username != null && password != null ) {
82
98
logger .debug ("Using withCredentials for the connection" );
83
- builder .withCredentials (username , password );
99
+ builder .withAuthCredentials (username , password );
84
100
}
85
101
86
- cluster = builder .build ();
87
- session = cluster .connect ();
102
+ session = builder .build ();
88
103
}
89
104
90
105
if (keyspace != null ) {
@@ -103,9 +118,9 @@ public void close() {
103
118
lock .unlock ();
104
119
}
105
120
106
- if (cluster != null ) {
121
+ if (session != null && isMySession () ) {
107
122
//We don't close the connection if we were given a session
108
- cluster .close ();
123
+ session .close ();
109
124
}
110
125
}
111
126
@@ -135,7 +150,7 @@ public void setKeyspace(String keyspace) {
135
150
}
136
151
137
152
public ResultSet execute (String query , Object ... params ) {
138
- return session .execute (query , params );
153
+ return session .execute (SimpleStatement . newInstance ( query , params ) );
139
154
}
140
155
141
156
@@ -174,7 +189,7 @@ public void backfillMigrations() {
174
189
}
175
190
176
191
public void setupMigration () {
177
- if (!session .getCluster (). getMetadata (). checkSchemaAgreement ()) {
192
+ if (!session .checkSchemaAgreement ()) {
178
193
throw new CassandraMigrationException ("Migration table setup precheck: schema not in agreement" );
179
194
}
180
195
if (!tableExists ("migrations" )) {
@@ -291,18 +306,18 @@ public void keepLockAlive() {
291
306
public String getMigrationMd5 (String fileName ) {
292
307
File file = new File (fileName );
293
308
ResultSet result = executeWithLock ("SELECT sha FROM migrations WHERE name=?" , file .getName ());
294
- if (result .isExhausted ()) {
309
+ if (result .isFullyFetched ()) {
295
310
return null ;
296
311
}
297
312
298
313
return result .one ().getString ("sha" );
299
314
}
300
315
301
- public Session getSession () {
316
+ public CqlSession getSession () {
302
317
return session ;
303
318
}
304
319
305
- public void setSession (Session session ) {
320
+ public void setSession (CqlSession session ) {
306
321
this .session = session ;
307
322
}
308
323
@@ -377,4 +392,12 @@ public void setKeystorePassword(String keystorePassword) {
377
392
public String getOwnerName () {
378
393
return ownerName ;
379
394
}
395
+
396
+ /**
397
+ * Returns `true` if CassandraConnection created it's own CqlSession. `false` if connection was parameterized by caller.
398
+ * @return
399
+ */
400
+ public boolean isMySession () {
401
+ return mySession ;
402
+ }
380
403
}
0 commit comments