Skip to content

Commit 0d67cd8

Browse files
committed
[feat][dingo-executor] Add batch resolve lock for gc
1 parent a462464 commit 0d67cd8

File tree

3 files changed

+318
-56
lines changed

3 files changed

+318
-56
lines changed

dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/Gc.java

Lines changed: 132 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,9 @@
3333
import io.dingodb.meta.MetaService;
3434
import io.dingodb.net.api.ApiRegistry;
3535
import io.dingodb.sdk.service.CoordinatorService;
36-
import io.dingodb.sdk.service.DocumentService;
37-
import io.dingodb.sdk.service.IndexService;
3836
import io.dingodb.sdk.service.Services;
39-
import io.dingodb.sdk.service.StoreService;
40-
import io.dingodb.sdk.service.entity.common.IndexParameter;
41-
import io.dingodb.sdk.service.entity.common.IndexType;
4237
import io.dingodb.sdk.service.entity.common.Location;
4338
import io.dingodb.sdk.service.entity.common.Region;
44-
import io.dingodb.sdk.service.entity.common.RegionDefinition;
4539
import io.dingodb.sdk.service.entity.coordinator.DropRegionRequest;
4640
import io.dingodb.sdk.service.entity.coordinator.GetGCSafePointRequest;
4741
import io.dingodb.sdk.service.entity.coordinator.GetGCSafePointResponse;
@@ -60,8 +54,12 @@
6054
import io.dingodb.sdk.service.entity.store.TxnScanLockResponse;
6155
import io.dingodb.store.api.transaction.data.IsolationLevel;
6256
import io.dingodb.store.api.transaction.data.checkstatus.AsyncResolveData;
57+
import io.dingodb.store.api.transaction.data.resolvelock.ResolveLockStatus;
6358
import io.dingodb.store.api.transaction.exception.NonAsyncCommitLockException;
6459
import io.dingodb.store.proxy.Configuration;
60+
import io.dingodb.store.proxy.common.transaction.LockResolveManager;
61+
import io.dingodb.store.proxy.common.transaction.ResolveLocksOptions;
62+
import io.dingodb.store.proxy.common.transaction.TxnStatus;
6563
import io.dingodb.transaction.api.GcApi;
6664
import io.dingodb.transaction.api.GcObj;
6765
import io.dingodb.tso.TsoService;
@@ -71,6 +69,7 @@
7169
import java.util.ArrayList;
7270
import java.util.Arrays;
7371
import java.util.Collections;
72+
import java.util.HashMap;
7473
import java.util.HashSet;
7574
import java.util.List;
7675
import java.util.Map;
@@ -86,8 +85,16 @@
8685
import static io.dingodb.sdk.service.entity.store.Action.TTLExpireRollback;
8786
import static io.dingodb.sdk.service.entity.store.Op.Lock;
8887
import static io.dingodb.store.proxy.Configuration.coordinatorSet;
88+
import static io.dingodb.store.utils.ResolveLockUtil.checkAllSecondaries;
8989
import static io.dingodb.store.utils.ResolveLockUtil.checkSecondaryAllLocks;
90+
import static io.dingodb.store.utils.ResolveLockUtil.commitBatchResolve;
91+
import static io.dingodb.store.utils.ResolveLockUtil.documentService;
92+
import static io.dingodb.store.utils.ResolveLockUtil.getTxnStatusFromLockForGC;
93+
import static io.dingodb.store.utils.ResolveLockUtil.indexRegionService;
94+
import static io.dingodb.store.utils.ResolveLockUtil.isDocumentRegion;
95+
import static io.dingodb.store.utils.ResolveLockUtil.isIndexRegion;
9096
import static io.dingodb.store.utils.ResolveLockUtil.resolveAsyncResolveData;
97+
import static io.dingodb.store.utils.ResolveLockUtil.storeRegionService;
9198
import static java.lang.Math.min;
9299

93100
@Slf4j
@@ -138,7 +145,7 @@ public static Pair<String, Long> safePointUpdate() {
138145
scanLockResponse = storeRegionService(regionId).txnScanLock(reqTs, req);
139146
}
140147
if (scanLockResponse.getLocks() != null && !scanLockResponse.getLocks().isEmpty()) {
141-
safeTs = resolveLock(safeTs, reqTs, scanLockResponse.getLocks(), coordinators, region);
148+
safeTs = gcBatchResolveLocks(safeTs, reqTs, scanLockResponse.getLocks(), coordinators, region);
142149
}
143150
if (scanLockResponse.isHasMore()) {
144151
startKey = scanLockResponse.getEndKey();
@@ -257,7 +264,7 @@ public static GcObj startBackUpSafeByPoint(long point, long latestTso) {
257264
scanLockResponse = storeRegionService(regionId).txnScanLock(latestTso, req);
258265
}
259266
if (scanLockResponse.getLocks() != null && !scanLockResponse.getLocks().isEmpty()) {
260-
safeTs = resolveLock(safeTs, latestTso, scanLockResponse.getLocks(), coordinators, region);
267+
safeTs = gcBatchResolveLocks(safeTs, latestTso, scanLockResponse.getLocks(), coordinators, region);
261268
}
262269
if (scanLockResponse.isHasMore()) {
263270
startKey = scanLockResponse.getEndKey();
@@ -381,7 +388,7 @@ public static List<GcObj> startTenantsBackUpSafeByPoint(long point, long latestT
381388
scanLockResponse = storeRegionService(regionId).txnScanLock(latestTso, req);
382389
}
383390
if (scanLockResponse.getLocks() != null && !scanLockResponse.getLocks().isEmpty()) {
384-
safeTs = resolveLock(safeTs, latestTso, scanLockResponse.getLocks(), coordinators, region);
391+
safeTs = gcBatchResolveLocks(safeTs, latestTso, scanLockResponse.getLocks(), coordinators, region);
385392
}
386393
if (scanLockResponse.isHasMore()) {
387394
startKey = scanLockResponse.getEndKey();
@@ -484,42 +491,12 @@ private static long tso() {
484491
return tsoService().tso();
485492
}
486493

487-
private static boolean isIndexRegion(Region region) {
488-
return Optional.ofNullable(region)
489-
.map(Region::getDefinition)
490-
.map(RegionDefinition::getIndexParameter)
491-
.map(IndexParameter::getIndexType)
492-
.filter($ -> $ == IndexType.INDEX_TYPE_VECTOR)
493-
.isPresent();
494-
}
495-
496-
private static boolean isDocumentRegion(Region region) {
497-
return Optional.ofNullable(region)
498-
.map(Region::getDefinition)
499-
.map(RegionDefinition::getIndexParameter)
500-
.map(IndexParameter::getIndexType)
501-
.filter($ -> $ == IndexType.INDEX_TYPE_DOCUMENT)
502-
.isPresent();
503-
}
504-
505494
private static boolean enable(long reqTs) {
506495
Map<String,String> globalVariables = InfoSchemaService.root().getGlobalVariables();
507496
String enableGc = globalVariables.get(GcApi.enableKeyStr);
508497
return "1".equalsIgnoreCase(enableGc);
509498
}
510499

511-
private static StoreService storeRegionService(long regionId) {
512-
return Services.storeRegionService(Configuration.coordinatorSet(), regionId, 30);
513-
}
514-
515-
private static IndexService indexRegionService(long regionId) {
516-
return Services.indexRegionService(Configuration.coordinatorSet(), regionId, 30);
517-
}
518-
519-
private static DocumentService documentService(long regionId) {
520-
return Services.documentRegionService(Configuration.coordinatorSet(), regionId, 30);
521-
}
522-
523500
private static boolean resolve(
524501
long reqTs, LockInfo lock, long commitTs, Set<Location> coordinators, Region region
525502
) {
@@ -581,6 +558,122 @@ private static boolean pessimisticRollback(
581558
return storeRegionService(region.getId()).txnPessimisticRollback(reqTs, req).getTxnResult() == null;
582559
}
583560

561+
private static long gcBatchResolveLocks(
562+
long safeTs, long reqTs, List<LockInfo> locks, Set<Location> coordinators, Region region
563+
) {
564+
long result = safeTs;
565+
if (locks.isEmpty()) {
566+
return result;
567+
}
568+
// txnId -> commitTs
569+
Map<Long, Long> txnInfos = new HashMap<>();
570+
boolean forceSyncCommit = false;
571+
ResolveLocksOptions opts = ResolveLocksOptions.builder()
572+
.callerStartTS(safeTs)
573+
.locks(locks)
574+
.forRead(false)
575+
.lite(true)
576+
.isolationLevel(IsolationLevel.SnapshotIsolation.getCode())
577+
.funName("forGc")
578+
.rollbackIfNotExist(true)
579+
.build();
580+
for (LockInfo lock : locks) {
581+
if (txnInfos.containsKey(lock.getLockTs())) {
582+
continue;
583+
}
584+
try {
585+
result = resolveLockConflict(
586+
reqTs,
587+
coordinators,
588+
region,
589+
result,
590+
txnInfos,
591+
forceSyncCommit,
592+
opts,
593+
lock
594+
);
595+
} catch (NonAsyncCommitLockException e) {
596+
opts.setForceSyncCommit(true);
597+
result = resolveLockConflict(
598+
reqTs,
599+
coordinators,
600+
region,
601+
result,
602+
txnInfos,
603+
forceSyncCommit,
604+
opts,
605+
lock
606+
);
607+
} catch (Exception e) {
608+
LogUtils.error(log, "BatchResolveLocks error for lock: {}", lock, e);
609+
throw new RuntimeException(e.getMessage());
610+
}
611+
}
612+
if (!commitBatchResolve(reqTs, txnInfos, region)) {
613+
result = txnInfos.keySet().stream()
614+
.mapToLong(Long::longValue)
615+
.reduce(result, Math::min);
616+
}
617+
return result;
618+
}
619+
620+
private static long resolveLockConflict(long reqTs,
621+
Set<Location> coordinators,
622+
Region region,
623+
long result,
624+
Map<Long, Long> txnInfos,
625+
boolean forceSyncCommit,
626+
ResolveLocksOptions opts,
627+
LockInfo lock) {
628+
// Use MaxLong as the current TS to force a transaction rollback.
629+
TxnStatus status = getTxnStatusFromLockForGC(lock, Long.MAX_VALUE, opts);
630+
631+
// Handling pessimistic locking - Direct rollback
632+
if (isPessimisticRollbackStatus(lock, status.getAction())) {
633+
if (!pessimisticRollback(reqTs, lock, coordinators, region)) {
634+
result = min(result, lock.getLockTs());
635+
} else {
636+
status.setResolveLockStatus(ResolveLockStatus.PESSIMISTIC_ROLLBACK);
637+
}
638+
} else if (status.getPrimaryLock() != null && status.getPrimaryLock().isUseAsyncCommit() && !forceSyncCommit) {
639+
// Handling asynchronous transaction commits
640+
long lockTtl = status.getPrimaryLock().getLockTtl();
641+
if (lockTtl > 0 && !tsoService().IsExpired(lockTtl)) {
642+
LogUtils.warn(log, "reqTs:{}, lockTs:{} useAsyncCommit lockTtl not IsExpired, " +
643+
"lockTtl:{}", reqTs, status.getPrimaryLock().getLockTs(), lockTtl);
644+
result = min(result, lock.getLockTs());
645+
} else {
646+
AsyncResolveData asyncResolveData = checkAllSecondaries(opts, lock, status);
647+
txnInfos.put(lock.getLockTs(), asyncResolveData.getCommitTs());
648+
LogUtils.info(log, "reqTs:{}, asyncResolveData:{}", reqTs, asyncResolveData);
649+
status.setCommitTs(asyncResolveData.getCommitTs());
650+
if (status.isStatusCacheable()) {
651+
LockResolveManager.saveResolved(lock.getLockTs(), status);
652+
}
653+
if (asyncResolveData.getCommitTs() > 0) {
654+
status.setResolveLockStatus(ResolveLockStatus.COMMIT);
655+
} else {
656+
status.setResolveLockStatus(ResolveLockStatus.ROLLBACK);
657+
}
658+
}
659+
} else if (status.getTtl() > 0) {
660+
LogUtils.error(log, "BatchResolveLocks fail to clean locks, ttl still exists for txn:{}",
661+
lock.getLockTs());
662+
result = min(result, lock.getLockTs());
663+
} else if (status.isRolledBack() || status.isCommitted()) {
664+
txnInfos.put(lock.getLockTs(), status.getCommitTs());
665+
if (status.getCommitTs() > 0) {
666+
status.setResolveLockStatus(ResolveLockStatus.COMMIT);
667+
} else {
668+
status.setResolveLockStatus(ResolveLockStatus.ROLLBACK);
669+
}
670+
} else {
671+
result = min(result, lock.getLockTs());
672+
}
673+
return result;
674+
}
675+
676+
584677
private static long resolveLock(
585678
long safeTs, long reqTs, List<LockInfo> locks, Set<Location> coordinators, Region region
586679
) {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2021 DataCanvas
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.dingodb.store.proxy.common.transaction;
18+
19+
import lombok.Builder;
20+
import lombok.Data;
21+
22+
@Data
23+
@Builder
24+
public class TxnInfo {
25+
private long txnId;
26+
private long status; // commitTs or 0 for rollback
27+
}

0 commit comments

Comments
 (0)