Skip to content

Commit 2274296

Browse files
committed
Support batch operation in Transaction API (#3082)
1 parent a34b5fe commit 2274296

File tree

43 files changed

+2995
-151
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2995
-151
lines changed

core/src/main/java/com/scalar/db/api/CrudOperable.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,17 @@ public interface CrudOperable<E extends TransactionException> {
156156
*/
157157
void mutate(List<? extends Mutation> mutations) throws E;
158158

159+
/**
160+
* Executes multiple operations in a batch through a transaction with the specified list of {@link
161+
* Operation} commands and returns a list of {@link BatchResult} that contains results of the
162+
* operations. Note that the order of the results corresponds to the order of the operations.
163+
*
164+
* @param operations a list of {@code Operation} commands
165+
* @return a list of {@code BatchResult} that contains results of the operations
166+
* @throws E if any of the transaction CRUD operations fails
167+
*/
168+
List<BatchResult> batch(List<? extends Operation> operations) throws E;
169+
159170
/** A scanner abstraction for iterating results. */
160171
interface Scanner<E extends TransactionException> extends AutoCloseable, Iterable<Result> {
161172
/**
@@ -183,4 +194,38 @@ interface Scanner<E extends TransactionException> extends AutoCloseable, Iterabl
183194
@Override
184195
void close() throws E;
185196
}
197+
198+
/** A batch operation result returned by {@link CrudOperable#batch(List)}. */
199+
interface BatchResult {
200+
/**
201+
* Returns the type of the operation.
202+
*
203+
* @return the operation type
204+
*/
205+
Type getType();
206+
207+
/**
208+
* Returns a result of a get operation.
209+
*
210+
* @return an {@code Optional} with the returned result
211+
*/
212+
Optional<Result> getGetResult();
213+
214+
/**
215+
* Returns a list of results of a scan operation.
216+
*
217+
* @return a list of {@link Result}
218+
*/
219+
List<Result> getScanResult();
220+
221+
enum Type {
222+
GET,
223+
SCAN,
224+
PUT,
225+
INSERT,
226+
UPSERT,
227+
UPDATE,
228+
DELETE
229+
}
230+
}
186231
}

core/src/main/java/com/scalar/db/api/TransactionCrudOperable.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,22 @@ void delete(List<Delete> deletes)
167167
void mutate(List<? extends Mutation> mutations)
168168
throws CrudConflictException, CrudException, UnsatisfiedConditionException;
169169

170+
/**
171+
* {@inheritDoc}
172+
*
173+
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
174+
* (e.g., a conflict error). You can retry the transaction from the beginning
175+
* @throws CrudException if the transaction CRUD operation fails due to transient or nontransient
176+
* faults. You can try retrying the transaction from the beginning, but the transaction may
177+
* still fail if the cause is nontransient
178+
* @throws UnsatisfiedConditionException if a condition is specified in a {@link Put}, {@link
179+
* Delete}, or {@link Update} command, and if the condition is not satisfied or the entry does
180+
* not exist
181+
*/
182+
@Override
183+
List<BatchResult> batch(List<? extends Operation> operations)
184+
throws CrudConflictException, CrudException, UnsatisfiedConditionException;
185+
170186
interface Scanner extends CrudOperable.Scanner<CrudException> {
171187
/**
172188
* {@inheritDoc}

core/src/main/java/com/scalar/db/api/TransactionManagerCrudOperable.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,24 @@ void mutate(List<? extends Mutation> mutations)
190190
throws CrudConflictException, CrudException, UnsatisfiedConditionException,
191191
UnknownTransactionStatusException;
192192

193+
/**
194+
* {@inheritDoc}
195+
*
196+
* @throws CrudConflictException if the transaction CRUD operation fails due to transient faults
197+
* (e.g., a conflict error). You can retry the transaction from the beginning
198+
* @throws CrudException if the transaction CRUD operation fails due to transient or nontransient
199+
* faults. You can try retrying the transaction from the beginning, but the transaction may
200+
* still fail if the cause is nontransient
201+
* @throws UnsatisfiedConditionException if a condition is specified in a {@link Put}, {@link
202+
* Delete}, or {@link Update} command, and if the condition is not satisfied or the entry does
203+
* not exist
204+
* @throws UnknownTransactionStatusException if the status of the commit is unknown
205+
*/
206+
@Override
207+
List<BatchResult> batch(List<? extends Operation> operations)
208+
throws CrudConflictException, CrudException, UnsatisfiedConditionException,
209+
UnknownTransactionStatusException;
210+
193211
interface Scanner extends CrudOperable.Scanner<TransactionException> {
194212
/**
195213
* {@inheritDoc}

core/src/main/java/com/scalar/db/common/AbstractDistributedTransaction.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package com.scalar.db.common;
22

3+
import static com.google.common.base.Preconditions.checkArgument;
4+
35
import com.scalar.db.api.Delete;
46
import com.scalar.db.api.DistributedTransaction;
57
import com.scalar.db.api.Get;
68
import com.scalar.db.api.Insert;
79
import com.scalar.db.api.Mutation;
10+
import com.scalar.db.api.Operation;
811
import com.scalar.db.api.Put;
12+
import com.scalar.db.api.Result;
913
import com.scalar.db.api.Scan;
1014
import com.scalar.db.api.Update;
1115
import com.scalar.db.api.Upsert;
16+
import com.scalar.db.exception.transaction.CrudException;
1217
import com.scalar.db.util.ScalarDbUtils;
18+
import java.util.ArrayList;
1319
import java.util.List;
1420
import java.util.Optional;
1521

@@ -59,8 +65,60 @@ public Optional<String> getTable() {
5965
return tableName;
6066
}
6167

62-
protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
63-
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
68+
@Override
69+
public void mutate(List<? extends Mutation> mutations) throws CrudException {
70+
checkArgument(!mutations.isEmpty(), CoreError.EMPTY_MUTATIONS_SPECIFIED.buildMessage());
71+
for (Mutation mutation : mutations) {
72+
if (mutation instanceof Put) {
73+
put((Put) mutation);
74+
} else if (mutation instanceof Delete) {
75+
delete((Delete) mutation);
76+
} else if (mutation instanceof Insert) {
77+
insert((Insert) mutation);
78+
} else if (mutation instanceof Upsert) {
79+
upsert((Upsert) mutation);
80+
} else {
81+
assert mutation instanceof Update;
82+
update((Update) mutation);
83+
}
84+
}
85+
}
86+
87+
@Override
88+
public List<BatchResult> batch(List<? extends Operation> operations) throws CrudException {
89+
checkArgument(!operations.isEmpty(), CoreError.EMPTY_OPERATIONS_SPECIFIED.buildMessage());
90+
List<BatchResult> ret = new ArrayList<>();
91+
for (Operation operation : operations) {
92+
if (operation instanceof Get) {
93+
Optional<Result> result = get((Get) operation);
94+
ret.add(new BatchResultImpl(result));
95+
} else if (operation instanceof Scan) {
96+
List<Result> results = scan((Scan) operation);
97+
ret.add(new BatchResultImpl(results));
98+
} else if (operation instanceof Put) {
99+
put((Put) operation);
100+
ret.add(BatchResultImpl.PUT_BATCH_RESULT);
101+
} else if (operation instanceof Insert) {
102+
insert((Insert) operation);
103+
ret.add(BatchResultImpl.INSERT_BATCH_RESULT);
104+
} else if (operation instanceof Upsert) {
105+
upsert((Upsert) operation);
106+
ret.add(BatchResultImpl.UPSERT_BATCH_RESULT);
107+
} else if (operation instanceof Update) {
108+
update((Update) operation);
109+
ret.add(BatchResultImpl.UPDATE_BATCH_RESULT);
110+
} else if (operation instanceof Delete) {
111+
delete((Delete) operation);
112+
ret.add(BatchResultImpl.DELETE_BATCH_RESULT);
113+
} else {
114+
throw new AssertionError("Unknown operation: " + operation);
115+
}
116+
}
117+
return ret;
118+
}
119+
120+
protected <T extends Operation> List<T> copyAndSetTargetToIfNot(List<T> operations) {
121+
return ScalarDbUtils.copyAndSetTargetToIfNot(operations, namespace, tableName);
64122
}
65123

66124
protected Get copyAndSetTargetToIfNot(Get get) {

core/src/main/java/com/scalar/db/common/AbstractDistributedTransactionManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.scalar.db.api.DistributedTransactionManager;
66
import com.scalar.db.api.Get;
77
import com.scalar.db.api.Insert;
8-
import com.scalar.db.api.Mutation;
8+
import com.scalar.db.api.Operation;
99
import com.scalar.db.api.Put;
1010
import com.scalar.db.api.Scan;
1111
import com.scalar.db.api.Update;
@@ -73,8 +73,8 @@ public DistributedTransaction resume(String txId) throws TransactionNotFoundExce
7373
throw new UnsupportedOperationException("resume is not supported in this implementation");
7474
}
7575

76-
protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
77-
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
76+
protected <T extends Operation> List<T> copyAndSetTargetToIfNot(List<T> operations) {
77+
return ScalarDbUtils.copyAndSetTargetToIfNot(operations, namespace, tableName);
7878
}
7979

8080
protected Get copyAndSetTargetToIfNot(Get get) {

core/src/main/java/com/scalar/db/common/AbstractTwoPhaseCommitTransaction.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package com.scalar.db.common;
22

3+
import static com.google.common.base.Preconditions.checkArgument;
4+
35
import com.scalar.db.api.Delete;
46
import com.scalar.db.api.Get;
57
import com.scalar.db.api.Insert;
68
import com.scalar.db.api.Mutation;
9+
import com.scalar.db.api.Operation;
710
import com.scalar.db.api.Put;
11+
import com.scalar.db.api.Result;
812
import com.scalar.db.api.Scan;
913
import com.scalar.db.api.TwoPhaseCommitTransaction;
1014
import com.scalar.db.api.Update;
1115
import com.scalar.db.api.Upsert;
16+
import com.scalar.db.exception.transaction.CrudException;
1217
import com.scalar.db.util.ScalarDbUtils;
18+
import java.util.ArrayList;
1319
import java.util.List;
1420
import java.util.Optional;
1521

@@ -59,8 +65,60 @@ public Optional<String> getTable() {
5965
return tableName;
6066
}
6167

62-
protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
63-
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
68+
@Override
69+
public void mutate(List<? extends Mutation> mutations) throws CrudException {
70+
checkArgument(!mutations.isEmpty(), CoreError.EMPTY_MUTATIONS_SPECIFIED.buildMessage());
71+
for (Mutation mutation : mutations) {
72+
if (mutation instanceof Put) {
73+
put((Put) mutation);
74+
} else if (mutation instanceof Delete) {
75+
delete((Delete) mutation);
76+
} else if (mutation instanceof Insert) {
77+
insert((Insert) mutation);
78+
} else if (mutation instanceof Upsert) {
79+
upsert((Upsert) mutation);
80+
} else {
81+
assert mutation instanceof Update;
82+
update((Update) mutation);
83+
}
84+
}
85+
}
86+
87+
@Override
88+
public List<BatchResult> batch(List<? extends Operation> operations) throws CrudException {
89+
checkArgument(!operations.isEmpty(), CoreError.EMPTY_OPERATIONS_SPECIFIED.buildMessage());
90+
List<BatchResult> ret = new ArrayList<>();
91+
for (Operation operation : operations) {
92+
if (operation instanceof Get) {
93+
Optional<Result> result = get((Get) operation);
94+
ret.add(new BatchResultImpl(result));
95+
} else if (operation instanceof Scan) {
96+
List<Result> results = scan((Scan) operation);
97+
ret.add(new BatchResultImpl(results));
98+
} else if (operation instanceof Put) {
99+
put((Put) operation);
100+
ret.add(BatchResultImpl.PUT_BATCH_RESULT);
101+
} else if (operation instanceof Insert) {
102+
insert((Insert) operation);
103+
ret.add(BatchResultImpl.INSERT_BATCH_RESULT);
104+
} else if (operation instanceof Upsert) {
105+
upsert((Upsert) operation);
106+
ret.add(BatchResultImpl.UPSERT_BATCH_RESULT);
107+
} else if (operation instanceof Update) {
108+
update((Update) operation);
109+
ret.add(BatchResultImpl.UPDATE_BATCH_RESULT);
110+
} else if (operation instanceof Delete) {
111+
delete((Delete) operation);
112+
ret.add(BatchResultImpl.DELETE_BATCH_RESULT);
113+
} else {
114+
throw new AssertionError("Unknown operation: " + operation);
115+
}
116+
}
117+
return ret;
118+
}
119+
120+
protected <T extends Operation> List<T> copyAndSetTargetToIfNot(List<T> operations) {
121+
return ScalarDbUtils.copyAndSetTargetToIfNot(operations, namespace, tableName);
64122
}
65123

66124
protected Get copyAndSetTargetToIfNot(Get get) {

core/src/main/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import com.scalar.db.api.Delete;
44
import com.scalar.db.api.Get;
55
import com.scalar.db.api.Insert;
6-
import com.scalar.db.api.Mutation;
6+
import com.scalar.db.api.Operation;
77
import com.scalar.db.api.Put;
88
import com.scalar.db.api.Scan;
99
import com.scalar.db.api.TwoPhaseCommitTransaction;
@@ -68,8 +68,8 @@ public TwoPhaseCommitTransaction resume(String txId) throws TransactionNotFoundE
6868
throw new UnsupportedOperationException("resume is not supported in this implementation");
6969
}
7070

71-
protected <T extends Mutation> List<T> copyAndSetTargetToIfNot(List<T> mutations) {
72-
return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName);
71+
protected <T extends Operation> List<T> copyAndSetTargetToIfNot(List<T> operations) {
72+
return ScalarDbUtils.copyAndSetTargetToIfNot(operations, namespace, tableName);
7373
}
7474

7575
protected Get copyAndSetTargetToIfNot(Get get) {

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.scalar.db.api.Get;
88
import com.scalar.db.api.Insert;
99
import com.scalar.db.api.Mutation;
10+
import com.scalar.db.api.Operation;
1011
import com.scalar.db.api.Put;
1112
import com.scalar.db.api.Result;
1213
import com.scalar.db.api.Scan;
@@ -21,10 +22,12 @@
2122
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
2223
import com.scalar.db.util.ActiveExpiringMap;
2324
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
25+
import java.util.Iterator;
2426
import java.util.List;
2527
import java.util.Optional;
2628
import java.util.concurrent.atomic.AtomicReference;
2729
import java.util.function.BiConsumer;
30+
import javax.annotation.Nonnull;
2831
import javax.annotation.concurrent.ThreadSafe;
2932
import org.slf4j.Logger;
3033
import org.slf4j.LoggerFactory;
@@ -103,6 +106,11 @@ public DistributedTransaction resume(String txId) throws TransactionNotFoundExce
103106
CoreError.TRANSACTION_NOT_FOUND.buildMessage(), txId));
104107
}
105108

109+
/**
110+
* The methods of this class are synchronized to be thread-safe because the rollback() method may
111+
* be called from the expiration handler in a different thread while other methods are being
112+
* executed.
113+
*/
106114
@VisibleForTesting
107115
class ActiveTransaction extends DecoratedDistributedTransaction {
108116

@@ -124,7 +132,37 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {
124132

125133
@Override
126134
public synchronized Scanner getScanner(Scan scan) throws CrudException {
127-
return super.getScanner(scan);
135+
Scanner scanner = super.getScanner(scan);
136+
return new Scanner() {
137+
@Override
138+
public Optional<Result> one() throws CrudException {
139+
synchronized (ActiveTransaction.this) {
140+
return scanner.one();
141+
}
142+
}
143+
144+
@Override
145+
public List<Result> all() throws CrudException {
146+
synchronized (ActiveTransaction.this) {
147+
return scanner.all();
148+
}
149+
}
150+
151+
@Override
152+
public void close() throws CrudException {
153+
synchronized (ActiveTransaction.this) {
154+
scanner.close();
155+
}
156+
}
157+
158+
@Nonnull
159+
@Override
160+
public Iterator<Result> iterator() {
161+
synchronized (ActiveTransaction.this) {
162+
return scanner.iterator();
163+
}
164+
}
165+
};
128166
}
129167

130168
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -173,6 +211,12 @@ public synchronized void mutate(List<? extends Mutation> mutations) throws CrudE
173211
super.mutate(mutations);
174212
}
175213

214+
@Override
215+
public synchronized List<BatchResult> batch(List<? extends Operation> operations)
216+
throws CrudException {
217+
return super.batch(operations);
218+
}
219+
176220
@Override
177221
public synchronized void commit() throws CommitException, UnknownTransactionStatusException {
178222
super.commit();

0 commit comments

Comments
 (0)