Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public interface DistributedStorageAdmin extends Admin, AutoCloseable {
/**
* Returns the storage information.
*
* <p>Note: This feature is primarily for internal use. Breaking changes can and will be
* introduced to it. Users should not depend on it.
*
* @param namespace the namespace to get the storage information for
* @return the storage information
* @throws ExecutionException if the operation fails
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/com/scalar/db/api/StorageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ public interface StorageInfo {
*/
int getMaxAtomicMutationsCount();

/**
* Returns whether the storage guarantees consistent reads for virtual tables.
*
* @return true if the storage guarantees consistent reads for virtual tables, false otherwise
*/
boolean isConsistentVirtualTableReadGuaranteed();

/**
* The mutation atomicity unit of the storage.
*
Expand Down
21 changes: 18 additions & 3 deletions core/src/main/java/com/scalar/db/common/StorageInfoImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ public class StorageInfoImpl implements StorageInfo {
private final String storageName;
private final MutationAtomicityUnit mutationAtomicityUnit;
private final int maxAtomicMutationsCount;
private final boolean consistentVirtualTableReadGuaranteed;

public StorageInfoImpl(
String storageName,
MutationAtomicityUnit mutationAtomicityUnit,
int maxAtomicMutationsCount) {
int maxAtomicMutationsCount,
boolean consistentVirtualTableReadGuaranteed) {
this.storageName = storageName;
this.mutationAtomicityUnit = mutationAtomicityUnit;
this.maxAtomicMutationsCount = maxAtomicMutationsCount;
this.consistentVirtualTableReadGuaranteed = consistentVirtualTableReadGuaranteed;
}

@Override
Expand All @@ -36,6 +39,11 @@ public int getMaxAtomicMutationsCount() {
return maxAtomicMutationsCount;
}

@Override
public boolean isConsistentVirtualTableReadGuaranteed() {
return consistentVirtualTableReadGuaranteed;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -47,12 +55,18 @@ public boolean equals(Object o) {
StorageInfoImpl that = (StorageInfoImpl) o;
return getMaxAtomicMutationsCount() == that.getMaxAtomicMutationsCount()
&& Objects.equals(getStorageName(), that.getStorageName())
&& getMutationAtomicityUnit() == that.getMutationAtomicityUnit();
&& getMutationAtomicityUnit() == that.getMutationAtomicityUnit()
&& isConsistentVirtualTableReadGuaranteed()
== that.isConsistentVirtualTableReadGuaranteed();
}

@Override
public int hashCode() {
return Objects.hash(getStorageName(), getMutationAtomicityUnit(), getMaxAtomicMutationsCount());
return Objects.hash(
getStorageName(),
getMutationAtomicityUnit(),
getMaxAtomicMutationsCount(),
isConsistentVirtualTableReadGuaranteed());
}

@Override
Expand All @@ -61,6 +75,7 @@ public String toString() {
.add("storageName", storageName)
.add("mutationAtomicityUnit", mutationAtomicityUnit)
.add("maxAtomicMutationsCount", maxAtomicMutationsCount)
.add("consistentVirtualTableReadGuaranteed", consistentVirtualTableReadGuaranteed)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class CassandraAdmin implements DistributedStorageAdmin {
"cassandra",
StorageInfo.MutationAtomicityUnit.PARTITION,
// No limit on the number of mutations
Integer.MAX_VALUE);
Integer.MAX_VALUE,
false);

private final ClusterManager clusterManager;
private final String metadataKeyspace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public class CosmosAdmin implements DistributedStorageAdmin {
"cosmos",
StorageInfo.MutationAtomicityUnit.PARTITION,
// No limit on the number of mutations
Integer.MAX_VALUE);
Integer.MAX_VALUE,
false);

private final CosmosClient client;
private final String metadataDatabase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public class DynamoAdmin implements DistributedStorageAdmin {
"dynamo",
StorageInfo.MutationAtomicityUnit.STORAGE,
// DynamoDB has a limit of 100 items per transactional batch write operation
100);
100,
false);

private final DynamoDbClient client;
private final ApplicationAutoScalingClient applicationAutoScalingClient;
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ public class JdbcAdmin implements DistributedStorageAdmin {
@VisibleForTesting static final String JDBC_COL_DECIMAL_DIGITS = "DECIMAL_DIGITS";

private static final String INDEX_NAME_PREFIX = "index";
private static final StorageInfo STORAGE_INFO =
new StorageInfoImpl(
"jdbc",
StorageInfo.MutationAtomicityUnit.STORAGE,
// No limit on the number of mutations
Integer.MAX_VALUE);

private final RdbEngineStrategy rdbEngine;
private final BasicDataSource dataSource;
Expand Down Expand Up @@ -1011,8 +1005,22 @@ public void upgrade(Map<String, String> options) throws ExecutionException {
}

@Override
public StorageInfo getStorageInfo(String namespace) {
return STORAGE_INFO;
public StorageInfo getStorageInfo(String namespace) throws ExecutionException {
boolean consistentVirtualTableReadGuaranteed;
try (Connection connection = dataSource.getConnection()) {
int isolationLevel = connection.getTransactionIsolation();
consistentVirtualTableReadGuaranteed =
isolationLevel >= rdbEngine.getMinimumIsolationLevelForConsistentVirtualTableRead();
} catch (SQLException e) {
throw new ExecutionException("Getting the transaction isolation level failed", e);
}

return new StorageInfoImpl(
"jdbc",
StorageInfo.MutationAtomicityUnit.STORAGE,
// No limit on the number of mutations
Integer.MAX_VALUE,
consistentVirtualTableReadGuaranteed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.scalar.db.storage.jdbc.query.SelectQuery;
import com.scalar.db.storage.jdbc.query.SelectWithLimitQuery;
import com.scalar.db.storage.jdbc.query.UpsertQuery;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.JDBCType;
import java.sql.ResultSet;
Expand Down Expand Up @@ -588,4 +589,10 @@ public void throwIfCrossPartitionScanOrderingOnBlobColumnNotSupported(
public String getTableNamesInNamespaceSql() {
return "SELECT TABNAME FROM SYSCAT.TABLES WHERE TABSCHEMA = ? AND TYPE = 'T'";
}

@Override
public int getMinimumIsolationLevelForConsistentVirtualTableRead() {
// In Db2, REPEATABLE READ and SERIALIZABLE isolation levels guarantee consistent reads
return Connection.TRANSACTION_REPEATABLE_READ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,4 +509,10 @@ public void setConnectionToReadOnly(Connection connection, boolean readOnly) thr
public String getTableNamesInNamespaceSql() {
return "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ?";
}

@Override
public int getMinimumIsolationLevelForConsistentVirtualTableRead() {
// In MySQL, REPEATABLE READ and SERIALIZABLE isolation levels guarantee consistent reads
return Connection.TRANSACTION_REPEATABLE_READ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.scalar.db.storage.jdbc.query.UpsertQuery;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -540,4 +541,10 @@ public void bindBlobColumnToPreparedStatement(
public String getTableNamesInNamespaceSql() {
return "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?";
}

@Override
public int getMinimumIsolationLevelForConsistentVirtualTableRead() {
// In Oracle, only the SERIALIZABLE isolation level guarantees consistent reads
return Connection.TRANSACTION_SERIALIZABLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.scalar.db.storage.jdbc.query.SelectQuery;
import com.scalar.db.storage.jdbc.query.SelectWithLimitQuery;
import com.scalar.db.storage.jdbc.query.UpsertQuery;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.JDBCType;
import java.sql.SQLException;
Expand Down Expand Up @@ -400,4 +401,10 @@ public String tryAddIfNotExistsToCreateIndexSql(String createIndexSql) {
public String getTableNamesInNamespaceSql() {
return "SELECT table_name FROM information_schema.tables WHERE table_schema = ?";
}

@Override
public int getMinimumIsolationLevelForConsistentVirtualTableRead() {
// In PostgreSQL, REPEATABLE READ and SERIALIZABLE isolation levels guarantee consistent reads
return Connection.TRANSACTION_REPEATABLE_READ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.scalar.db.storage.jdbc.query.SelectQuery;
import com.scalar.db.storage.jdbc.query.SelectWithTop;
import com.scalar.db.storage.jdbc.query.UpsertQuery;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.JDBCType;
import java.sql.SQLException;
Expand Down Expand Up @@ -436,4 +437,10 @@ public Map<String, String> getConnectionProperties(JdbcConfig config) {
public String getTableNamesInNamespaceSql() {
return "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ?";
}

@Override
public int getMinimumIsolationLevelForConsistentVirtualTableRead() {
// In SQL Server, REPEATABLE READ or higher isolation level guarantees consistent reads
return Connection.TRANSACTION_REPEATABLE_READ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,4 +389,10 @@ public String getTableNamesInNamespaceSql() {
// Do nothing. Namespace is just a table prefix in the SQLite implementation.
return null;
}

@Override
public int getMinimumIsolationLevelForConsistentVirtualTableRead() {
// In SQLite, READ COMMITTED and higher isolation levels guarantee consistent reads
return Connection.TRANSACTION_READ_COMMITTED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,17 @@ default void throwIfConjunctionsColumnNotSupported(
Set<Conjunction> conjunctions, TableMetadata metadata) {}

String getTableNamesInNamespaceSql();

/**
* Returns the minimum isolation level required to ensure consistent reads across virtual tables.
*
* <p>A virtual table read involves querying multiple underlying source tables. When using a lower
* isolation level, there is a risk of observing an inconsistent snapshot where data from
* different source tables reflects different points in time. This method returns the minimum
* isolation level that guarantees a consistent snapshot across all source tables involved in a
* virtual table read.
*
* @return the minimum isolation level required for consistent virtual table reads
*/
int getMinimumIsolationLevelForConsistentVirtualTableRead();
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ public StorageInfo getStorageInfo(String namespace) throws ExecutionException {
return new StorageInfoImpl(
holder.storageName,
storageInfo.getMutationAtomicityUnit(),
storageInfo.getMaxAtomicMutationsCount());
storageInfo.getMaxAtomicMutationsCount(),
storageInfo.isConsistentVirtualTableReadGuaranteed());
} catch (RuntimeException e) {
if (e.getCause() instanceof ExecutionException) {
throw (ExecutionException) e.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class ObjectStorageAdmin implements DistributedStorageAdmin {
"object_storage",
StorageInfo.MutationAtomicityUnit.PARTITION,
// No limit on the number of mutations
Integer.MAX_VALUE);
Integer.MAX_VALUE,
false);

private final ObjectStorageWrapper wrapper;
private final String metadataNamespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public interface ObjectStorageWrapper {
*/
void deleteByPrefix(String prefix) throws ObjectStorageWrapperException;

/** Close the storage wrapper. */
/**
* Close the storage wrapper.
*
* @throws ObjectStorageWrapperException if an error occurs
*/
void close() throws ObjectStorageWrapperException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class OperationCheckerTest {
private static final String COL3 = "v3";
private static final StorageInfo STORAGE_INFO =
new StorageInfoImpl(
"cassandra", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE);
"cassandra", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE, false);

@Mock private DatabaseConfig databaseConfig;
@Mock private TableMetadataManager metadataManager;
Expand Down Expand Up @@ -2059,9 +2059,11 @@ public void check_MutationsGiven_ForAtomicityUnit_ShouldBehaveCorrectly(
.addClusteringKey(CKEY1)
.build());

StorageInfo storageInfo1 = new StorageInfoImpl("s1", mutationAtomicityUnit, Integer.MAX_VALUE);
StorageInfo storageInfo1 =
new StorageInfoImpl("s1", mutationAtomicityUnit, Integer.MAX_VALUE, false);
StorageInfo storageInfo2 =
new StorageInfoImpl("s2", StorageInfo.MutationAtomicityUnit.STORAGE, Integer.MAX_VALUE);
new StorageInfoImpl(
"s2", StorageInfo.MutationAtomicityUnit.STORAGE, Integer.MAX_VALUE, false);
when(storageInfoProvider.getStorageInfo("ns")).thenReturn(storageInfo1);
when(storageInfoProvider.getStorageInfo("ns2")).thenReturn(storageInfo1);
when(storageInfoProvider.getStorageInfo("other_ns")).thenReturn(storageInfo2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public class CosmosOperationCheckerTest {
private static final String COL1 = "v1";
private static final String COL2 = "v2";
private static final StorageInfo STORAGE_INFO =
new StorageInfoImpl("cosmos", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE);
new StorageInfoImpl(
"cosmos", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE, false);

private static final TableMetadata TABLE_METADATA1 =
TableMetadata.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class DynamoOperationCheckerTest {
private static final String COL3 = "v3";
private static final String COL4 = "v4";
private static final StorageInfo STORAGE_INFO =
new StorageInfoImpl("dynamo", StorageInfo.MutationAtomicityUnit.STORAGE, 100);
new StorageInfoImpl("dynamo", StorageInfo.MutationAtomicityUnit.STORAGE, 100, false);

@Mock private DatabaseConfig databaseConfig;
@Mock private TableMetadataManager metadataManager;
Expand Down
Loading