Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions core/src/main/java/com/scalar/db/api/CrudOperable.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ public interface CrudOperable<E extends TransactionException> {
*/
void mutate(List<? extends Mutation> mutations) throws E;

/**
* Executes multiple operations in a batch through a transaction with the specified list of {@link
* Operation} commands and returns a list of {@link BatchResult} that contains results of the
* operations. Note that the order of the results corresponds to the order of the operations.
*
* @param operations a list of {@code Operation} commands
* @return a list of {@code BatchResult} that contains results of the operations
* @throws E if any of the transaction CRUD operations fails
*/
List<BatchResult> batch(List<? extends Operation> operations) throws E;

/** A scanner abstraction for iterating results. */
interface Scanner<E extends TransactionException> extends AutoCloseable, Iterable<Result> {
/**
Expand Down Expand Up @@ -183,4 +194,38 @@ interface Scanner<E extends TransactionException> extends AutoCloseable, Iterabl
@Override
void close() throws E;
}

/** A batch operation result returned by {@link CrudOperable#batch(List)}. */
interface BatchResult {
/**
* Returns the type of the operation.
*
* @return the operation type
*/
Type getType();

/**
* Returns a result of a get operation.
*
* @return an {@code Optional} with the returned result
*/
Optional<Result> getGetResult();

/**
* Returns a list of results of a scan operation.
*
* @return a list of {@link Result}
*/
List<Result> getScanResult();

enum Type {
GET,
SCAN,
PUT,
INSERT,
UPSERT,
UPDATE,
DELETE
}
}
}
16 changes: 16 additions & 0 deletions core/src/main/java/com/scalar/db/api/TransactionCrudOperable.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ void delete(List<Delete> deletes)
void mutate(List<? extends Mutation> mutations)
throws CrudConflictException, CrudException, UnsatisfiedConditionException;

/**
* {@inheritDoc}
*
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
* (e.g., a conflict error). You can retry the transaction from the beginning
* @throws CrudException if the transaction CRUD operation fails due to transient or nontransient
* faults. You can try retrying the transaction from the beginning, but the transaction may
* still fail if the cause is nontransient
* @throws UnsatisfiedConditionException if a condition is specified in a {@link Put}, {@link
* Delete}, or {@link Update} command, and if the condition is not satisfied or the entry does
* not exist
*/
@Override
List<BatchResult> batch(List<? extends Operation> operations)
throws CrudConflictException, CrudException, UnsatisfiedConditionException;

interface Scanner extends CrudOperable.Scanner<CrudException> {
/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,24 @@ void mutate(List<? extends Mutation> mutations)
throws CrudConflictException, CrudException, UnsatisfiedConditionException,
UnknownTransactionStatusException;

/**
* {@inheritDoc}
*
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
* (e.g., a conflict error). You can retry the transaction from the beginning
* @throws CrudException if the transaction CRUD operation fails due to transient or nontransient
* faults. You can try retrying the transaction from the beginning, but the transaction may
* still fail if the cause is nontransient
* @throws UnsatisfiedConditionException if a condition is specified in a {@link Put}, {@link
* Delete}, or {@link Update} command, and if the condition is not satisfied or the entry does
* not exist
* @throws UnknownTransactionStatusException if the status of the commit is unknown
*/
@Override
List<BatchResult> batch(List<? extends Operation> operations)
throws CrudConflictException, CrudException, UnsatisfiedConditionException,
UnknownTransactionStatusException;

interface Scanner extends CrudOperable.Scanner<TransactionException> {
/**
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package com.scalar.db.common;

import static com.google.common.base.Preconditions.checkArgument;

import com.scalar.db.api.Delete;
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.Get;
import com.scalar.db.api.Insert;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Update;
import com.scalar.db.api.Upsert;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.util.ScalarDbUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -59,8 +65,60 @@ public Optional<String> getTable() {
return tableName;
}

protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
@Override
public void mutate(List<? extends Mutation> mutations) throws CrudException {
checkArgument(!mutations.isEmpty(), CoreError.EMPTY_MUTATIONS_SPECIFIED.buildMessage());
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
put((Put) mutation);
} else if (mutation instanceof Delete) {
delete((Delete) mutation);
} else if (mutation instanceof Insert) {
insert((Insert) mutation);
} else if (mutation instanceof Upsert) {
upsert((Upsert) mutation);
} else {
assert mutation instanceof Update;
update((Update) mutation);
}
}
}

@Override
public List<BatchResult> batch(List<? extends Operation> operations) throws CrudException {
checkArgument(!operations.isEmpty(), CoreError.EMPTY_OPERATIONS_SPECIFIED.buildMessage());
List<BatchResult> ret = new ArrayList<>();
for (Operation operation : operations) {
if (operation instanceof Get) {
Optional<Result> result = get((Get) operation);
ret.add(new BatchResultImpl(result));
} else if (operation instanceof Scan) {
List<Result> results = scan((Scan) operation);
ret.add(new BatchResultImpl(results));
} else if (operation instanceof Put) {
put((Put) operation);
ret.add(BatchResultImpl.PUT_BATCH_RESULT);
} else if (operation instanceof Insert) {
insert((Insert) operation);
ret.add(BatchResultImpl.INSERT_BATCH_RESULT);
} else if (operation instanceof Upsert) {
upsert((Upsert) operation);
ret.add(BatchResultImpl.UPSERT_BATCH_RESULT);
} else if (operation instanceof Update) {
update((Update) operation);
ret.add(BatchResultImpl.UPDATE_BATCH_RESULT);
} else if (operation instanceof Delete) {
delete((Delete) operation);
ret.add(BatchResultImpl.DELETE_BATCH_RESULT);
} else {
throw new AssertionError("Unknown operation: " + operation);
}
}
return ret;
}

protected <T extends Operation> List<T> copyAndSetTargetToIfNot(List<T> operations) {
return ScalarDbUtils.copyAndSetTargetToIfNot(operations, namespace, tableName);
}

protected Get copyAndSetTargetToIfNot(Get get) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Get;
import com.scalar.db.api.Insert;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Update;
Expand Down Expand Up @@ -73,8 +73,8 @@ public DistributedTransaction resume(String txId) throws TransactionNotFoundExce
throw new UnsupportedOperationException("resume is not supported in this implementation");
}

protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
protected <T extends Operation> List<T> copyAndSetTargetToIfNot(List<T> operations) {
return ScalarDbUtils.copyAndSetTargetToIfNot(operations, namespace, tableName);
}

protected Get copyAndSetTargetToIfNot(Get get) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package com.scalar.db.common;

import static com.google.common.base.Preconditions.checkArgument;

import com.scalar.db.api.Delete;
import com.scalar.db.api.Get;
import com.scalar.db.api.Insert;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.TwoPhaseCommitTransaction;
import com.scalar.db.api.Update;
import com.scalar.db.api.Upsert;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.util.ScalarDbUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -59,8 +65,60 @@ public Optional<String> getTable() {
return tableName;
}

protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
@Override
public void mutate(List<? extends Mutation> mutations) throws CrudException {
checkArgument(!mutations.isEmpty(), CoreError.EMPTY_MUTATIONS_SPECIFIED.buildMessage());
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
put((Put) mutation);
} else if (mutation instanceof Delete) {
delete((Delete) mutation);
} else if (mutation instanceof Insert) {
insert((Insert) mutation);
} else if (mutation instanceof Upsert) {
upsert((Upsert) mutation);
} else {
assert mutation instanceof Update;
update((Update) mutation);
}
}
}

@Override
public List<BatchResult> batch(List<? extends Operation> operations) throws CrudException {
checkArgument(!operations.isEmpty(), CoreError.EMPTY_OPERATIONS_SPECIFIED.buildMessage());
List<BatchResult> ret = new ArrayList<>();
for (Operation operation : operations) {
if (operation instanceof Get) {
Optional<Result> result = get((Get) operation);
ret.add(new BatchResultImpl(result));
} else if (operation instanceof Scan) {
List<Result> results = scan((Scan) operation);
ret.add(new BatchResultImpl(results));
} else if (operation instanceof Put) {
put((Put) operation);
ret.add(BatchResultImpl.PUT_BATCH_RESULT);
} else if (operation instanceof Insert) {
insert((Insert) operation);
ret.add(BatchResultImpl.INSERT_BATCH_RESULT);
} else if (operation instanceof Upsert) {
upsert((Upsert) operation);
ret.add(BatchResultImpl.UPSERT_BATCH_RESULT);
} else if (operation instanceof Update) {
update((Update) operation);
ret.add(BatchResultImpl.UPDATE_BATCH_RESULT);
} else if (operation instanceof Delete) {
delete((Delete) operation);
ret.add(BatchResultImpl.DELETE_BATCH_RESULT);
} else {
throw new AssertionError("Unknown operation: " + operation);
}
}
return ret;
}

protected <T extends Operation> List<T> copyAndSetTargetToIfNot(List<T> operations) {
return ScalarDbUtils.copyAndSetTargetToIfNot(operations, namespace, tableName);
}

protected Get copyAndSetTargetToIfNot(Get get) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.scalar.db.api.Delete;
import com.scalar.db.api.Get;
import com.scalar.db.api.Insert;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.Scan;
import com.scalar.db.api.TwoPhaseCommitTransaction;
Expand Down Expand Up @@ -68,8 +68,8 @@ public TwoPhaseCommitTransaction resume(String txId) throws TransactionNotFoundE
throw new UnsupportedOperationException("resume is not supported in this implementation");
}

protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
protected <T extends Operation> List<T> copyAndSetTargetToIfNot(List<T> operations) {
return ScalarDbUtils.copyAndSetTargetToIfNot(operations, namespace, tableName);
}

protected Get copyAndSetTargetToIfNot(Get get) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.scalar.db.api.Get;
import com.scalar.db.api.Insert;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
Expand All @@ -21,10 +22,12 @@
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.util.ActiveExpiringMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,6 +106,11 @@ public DistributedTransaction resume(String txId) throws TransactionNotFoundExce
CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId));
}

/**
* The methods of this class are synchronized to be thread-safe because the rollback() method may
* be called from the expiration handler in a different thread while other methods are being
* executed.
*/
@VisibleForTesting
class ActiveTransaction extends DecoratedDistributedTransaction {

Expand All @@ -124,7 +132,37 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {

@Override
public synchronized Scanner getScanner(Scan scan) throws CrudException {
return super.getScanner(scan);
Scanner scanner = super.getScanner(scan);
return new Scanner() {
@Override
public Optional<Result> one() throws CrudException {
synchronized (ActiveTransaction.this) {
return scanner.one();
}
}

@Override
public List<Result> all() throws CrudException {
synchronized (ActiveTransaction.this) {
return scanner.all();
}
}

@Override
public void close() throws CrudException {
synchronized (ActiveTransaction.this) {
scanner.close();
}
}

@Nonnull
@Override
public Iterator<Result> iterator() {
synchronized (ActiveTransaction.this) {
return scanner.iterator();
}
}
};
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand Down Expand Up @@ -173,6 +211,12 @@ public synchronized void mutate(List<? extends Mutation> mutations) throws CrudE
super.mutate(mutations);
}

@Override
public synchronized List<BatchResult> batch(List<? extends Operation> operations)
throws CrudException {
return super.batch(operations);
}

@Override
public synchronized void commit() throws CommitException, UnknownTransactionStatusException {
super.commit();
Expand Down
Loading