-
Notifications
You must be signed in to change notification settings - Fork 17
Upgrade Cassandra Driver to 4.3.1 #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
| Version | Change | | ||
|---------|--------------------------------------------------| | ||
| 0.1.6 | Dependent on Cassandra Driver 3.2.0 and Guava 19 | | ||
| 0.2.0 | Dependent on Cassandra Driver 4.3.1 | |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
distributionBase=GRADLE_USER_HOME | ||
distributionPath=wrapper/dists | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0.1-bin.zip | ||
zipStoreBase=GRADLE_USER_HOME | ||
zipStorePath=wrapper/dists |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,14 @@ | ||
package smartthings.cassandra; | ||
|
||
import com.datastax.driver.core.*; | ||
import com.google.common.base.Charsets; | ||
import com.google.common.io.Files; | ||
import com.datastax.oss.driver.api.core.*; | ||
import com.datastax.oss.driver.api.core.config.DefaultDriverOption; | ||
import com.datastax.oss.driver.api.core.config.DriverConfigLoader; | ||
import com.datastax.oss.driver.api.core.cql.ResultSet; | ||
import com.datastax.oss.driver.api.core.cql.Row; | ||
import com.datastax.oss.driver.api.core.cql.SimpleStatement; | ||
import com.datastax.oss.driver.api.core.ssl.ProgrammaticSslEngineFactory; | ||
import com.datastax.oss.driver.shaded.guava.common.base.Charsets; | ||
import com.datastax.oss.driver.shaded.guava.common.io.Files; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import smartthings.migration.CassandraMigrationException; | ||
|
@@ -14,8 +20,10 @@ | |
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.IOException; | ||
import java.net.InetSocketAddress; | ||
import java.security.KeyStore; | ||
import java.security.SecureRandom; | ||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
|
@@ -30,11 +38,12 @@ public class CassandraConnection implements AutoCloseable { | |
private String truststorePassword; | ||
private String keystorePath; | ||
private String keystorePassword; | ||
private Cluster cluster; | ||
private Session session; | ||
private CqlSession session; | ||
private boolean mySession = false; | ||
private String keyspace; | ||
private String host; | ||
private int port; | ||
private String localDatacenter; | ||
private String username; | ||
private String password; | ||
|
||
|
@@ -53,38 +62,44 @@ public CassandraConnection(MigrationParameters parameters, String ownerName) { | |
if (session == null) { | ||
this.host = parameters.getHost(); | ||
this.port = parameters.getPort(); | ||
this.localDatacenter = parameters.getLocalDatacenter(); | ||
this.username = parameters.getUsername(); | ||
this.password = parameters.getPassword(); | ||
this.truststorePassword = parameters.getTruststorePassword(); | ||
this.truststorePath = parameters.getTruststorePath(); | ||
this.keystorePassword = parameters.getKeystorePassword(); | ||
this.keystorePath = parameters.getKeystorePath(); | ||
this.mySession = true; | ||
} | ||
this.keyspace = parameters.getKeyspace(); | ||
|
||
} | ||
|
||
public void connect() throws Exception { | ||
if (session == null) { | ||
logger.debug("Connecting to Cassandra at " + host + ":" + port); | ||
|
||
QueryOptions queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM); | ||
|
||
Cluster.Builder builder = Cluster.builder().addContactPoint(host).withPort(port).withMaxSchemaAgreementWaitSeconds(20).withQueryOptions(queryOptions); | ||
CqlSessionBuilder builder = CqlSession.builder() | ||
.addContactPoint(new InetSocketAddress(host, port)) | ||
.withLocalDatacenter(localDatacenter) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cassandra Driver 4.3 will fail to startup when a host / port is specified without also setting the local datacenter field. Added it as a settable parameter w/ a default of "datacenter1" which matches the drivers default value. This has more details: Reading of the docs seems that as we upgrade to the 4.x drivers we should be setting this field in higher environments.. Though I'm a little unsure to what. Could use some feedback here from those closer to Cassandra. |
||
.withConfigLoader( | ||
DriverConfigLoader.programmaticBuilder() | ||
.withDuration(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(20)) | ||
.withString(DefaultDriverOption.REQUEST_CONSISTENCY, ConsistencyLevel.QUORUM.toString()) | ||
.build() | ||
); | ||
|
||
if (all(truststorePath, truststorePassword, keystorePath, keystorePassword)) { | ||
logger.debug("Using SSL for the connection"); | ||
SSLContext sslContext = getSSLContext(truststorePath, truststorePassword, keystorePath, keystorePassword); | ||
builder.withSSL(JdkSSLOptions.builder().withSSLContext(sslContext).withCipherSuites(cipherSuites).build()); | ||
builder.withSslEngineFactory(new ProgrammaticSslEngineFactory(sslContext, cipherSuites)); | ||
} | ||
|
||
if (username != null && password != null) { | ||
logger.debug("Using withCredentials for the connection"); | ||
builder.withCredentials(username, password); | ||
builder.withAuthCredentials(username, password); | ||
} | ||
|
||
cluster = builder.build(); | ||
session = cluster.connect(); | ||
session = builder.build(); | ||
} | ||
|
||
if (keyspace != null) { | ||
|
@@ -103,9 +118,9 @@ public void close() { | |
lock.unlock(); | ||
} | ||
|
||
if (cluster != null) { | ||
if (session != null && isMySession()) { | ||
//We don't close the connection if we were given a session | ||
cluster.close(); | ||
session.close(); | ||
} | ||
} | ||
|
||
|
@@ -135,7 +150,7 @@ public void setKeyspace(String keyspace) { | |
} | ||
|
||
public ResultSet execute(String query, Object... params) { | ||
return session.execute(query, params); | ||
return session.execute(SimpleStatement.newInstance(query, params)); | ||
} | ||
|
||
|
||
|
@@ -174,7 +189,7 @@ public void backfillMigrations() { | |
} | ||
|
||
public void setupMigration() { | ||
if (!session.getCluster().getMetadata().checkSchemaAgreement()) { | ||
if (!session.checkSchemaAgreement()) { | ||
throw new CassandraMigrationException("Migration table setup precheck: schema not in agreement"); | ||
} | ||
if (!tableExists("migrations")) { | ||
|
@@ -291,18 +306,18 @@ public void keepLockAlive() { | |
public String getMigrationMd5(String fileName) { | ||
File file = new File(fileName); | ||
ResultSet result = executeWithLock("SELECT sha FROM migrations WHERE name=?", file.getName()); | ||
if (result.isExhausted()) { | ||
if (result.isFullyFetched()) { | ||
return null; | ||
} | ||
|
||
return result.one().getString("sha"); | ||
} | ||
|
||
public Session getSession() { | ||
public CqlSession getSession() { | ||
return session; | ||
} | ||
|
||
public void setSession(Session session) { | ||
public void setSession(CqlSession session) { | ||
this.session = session; | ||
} | ||
|
||
|
@@ -377,4 +392,12 @@ public void setKeystorePassword(String keystorePassword) { | |
public String getOwnerName() { | ||
return ownerName; | ||
} | ||
|
||
/** | ||
* Returns `true` if CassandraConnection created it's own CqlSession. `false` if connection was parameterized by caller. | ||
* @return | ||
*/ | ||
public boolean isMySession() { | ||
return mySession; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
package smartthings.cassandra; | ||
|
||
import com.datastax.driver.core.*; | ||
import com.datastax.oss.driver.api.core.CqlSession; | ||
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; | ||
import com.datastax.oss.driver.api.core.cql.*; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import smartthings.migration.CassandraMigrationException; | ||
|
@@ -13,7 +15,7 @@ public class CassandraLock implements AutoCloseable { | |
private final int ttl; | ||
private final CassandraConnection cassandraConnection; | ||
private final String owner; | ||
private final Session session; | ||
private final CqlSession session; | ||
private final PreparedStatement insertLock; | ||
private final PreparedStatement deleteLock; | ||
private final PreparedStatement selectLock; | ||
|
@@ -31,22 +33,35 @@ public CassandraLock(CassandraConnection cassandraConnection, int ttl) { | |
|
||
setupTables(); | ||
|
||
insertLock = session.prepare("INSERT INTO databasechangelock(id, lockedby) VALUES (:lockId, :owner) IF NOT EXISTS USING TTL :ttl"); | ||
insertLock.setConsistencyLevel(ConsistencyLevel.QUORUM); | ||
|
||
deleteLock = session.prepare("DELETE FROM databasechangelock WHERE id = :lockId IF lockedby = :owner"); | ||
deleteLock.setConsistencyLevel(ConsistencyLevel.QUORUM); | ||
|
||
selectLock = session.prepare("SELECT lockedby, TTL(lockedby) AS ttl FROM databasechangelock WHERE id = :lockId"); | ||
selectLock.setConsistencyLevel(ConsistencyLevel.SERIAL); | ||
|
||
updateLock = session.prepare("UPDATE databasechangelock USING TTL :ttl SET lockedby = :owner WHERE id = :lockId IF lockedby = :owner"); | ||
deleteLock.setConsistencyLevel(ConsistencyLevel.QUORUM); | ||
insertLock = session.prepare( | ||
SimpleStatement.newInstance("INSERT INTO databasechangelock(id, lockedby) VALUES (:lockId, :owner) IF NOT EXISTS USING TTL :ttl") | ||
.setConsistencyLevel(DefaultConsistencyLevel.QUORUM) | ||
); | ||
|
||
deleteLock = session.prepare( | ||
SimpleStatement.newInstance("DELETE FROM databasechangelock WHERE id = :lockId IF lockedby = :owner") | ||
.setConsistencyLevel(DefaultConsistencyLevel.QUORUM) | ||
); | ||
|
||
selectLock = session.prepare( | ||
SimpleStatement.newInstance("SELECT lockedby, TTL(lockedby) AS ttl FROM databasechangelock WHERE id = :lockId") | ||
.setConsistencyLevel(DefaultConsistencyLevel.SERIAL) | ||
); | ||
|
||
updateLock = session.prepare( | ||
SimpleStatement.newInstance("UPDATE databasechangelock USING TTL :ttl SET lockedby = :owner WHERE id = :lockId IF lockedby = :ifowner") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit obnoxious. All the bind variable - set by name methods appear only to support setting the first occurrence. Best solution I've determined so far is just make each bind name unique.. Couldn't find a method/impl that automagically set them all. If you leave it the same it results in a RuntimeException essentially saying you have unset binds. |
||
.setConsistencyLevel(DefaultConsistencyLevel.QUORUM) | ||
); | ||
} | ||
|
||
public boolean tryLock() { | ||
ResultSet rs = session.execute(insertLock.bind().setInt("lockId", lockId) | ||
.setInt("ttl", ttl).setString("owner", owner)); | ||
ResultSet rs = session.execute( | ||
insertLock.boundStatementBuilder() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched |
||
.setInt("lockId", lockId) | ||
.setInt("ttl", ttl) | ||
.setString("owner", owner) | ||
.build() | ||
); | ||
if (rs.wasApplied()) { | ||
return true; | ||
} else { | ||
|
@@ -58,7 +73,12 @@ public boolean tryLock() { | |
public void unlock() { | ||
// Only try to release lock if its mine | ||
if (isMine()) { | ||
ResultSet rs = session.execute(deleteLock.bind().setInt("lockId", lockId).setString("owner", owner)); | ||
ResultSet rs = session.execute( | ||
deleteLock.boundStatementBuilder() | ||
.setInt("lockId", lockId) | ||
.setString("owner", owner) | ||
.build() | ||
); | ||
if (!rs.wasApplied()) { | ||
// if ownership was lost, should be fine, since we are relinquishing ownership | ||
if (isMine()) { | ||
|
@@ -69,15 +89,25 @@ public void unlock() { | |
} | ||
|
||
public void keepAlive() { | ||
ResultSet rs = session.execute(updateLock.bind().setInt("lockId", lockId) | ||
.setInt("ttl", ttl).setString("owner", owner)); | ||
ResultSet rs = session.execute( | ||
updateLock.boundStatementBuilder() | ||
.setInt("lockId", lockId) | ||
.setInt("ttl", ttl) | ||
.setString("owner", owner) | ||
.setString("ifowner", owner) | ||
.build() | ||
); | ||
if (!rs.wasApplied()) { | ||
throw new CassandraLockException("unable to keep alive lock"); | ||
} | ||
} | ||
|
||
public String getOwner() { | ||
Row row = session.execute(selectLock.bind().setInt("lockId", lockId)).one(); | ||
Row row = session.execute( | ||
selectLock.boundStatementBuilder() | ||
.setInt("lockId", lockId) | ||
.build() | ||
).one(); | ||
if (row != null) { | ||
return row.getString("lockedby"); | ||
} | ||
|
@@ -86,7 +116,11 @@ public String getOwner() { | |
} | ||
|
||
public int getTtl() { | ||
Row row = session.execute(selectLock.bind().setInt("lockId", lockId)).one(); | ||
Row row = session.execute( | ||
selectLock.boundStatementBuilder() | ||
.setInt("lockId", lockId) | ||
.build() | ||
).one(); | ||
if (row != null) { | ||
return row.getInt("ttl"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jsevellec/cassandra-unit#300