Skip to content

Commit 4737d5c

Browse files
authored
Merge pull request #53 from MatrixAI/feature-dangling-transactions
Fix committing or rollbacking dangling transactions
2 parents 6b825ab + 634935f commit 4737d5c

File tree

4 files changed

+160
-53
lines changed

4 files changed

+160
-53
lines changed

src/DB.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,15 @@ class DB {
168168
await iterator.destroy();
169169
}
170170
for (const transaction of this._transactionRefs) {
171-
await transaction.rollback();
171+
if (!transaction.committing && !transaction.rollbacking) {
172+
// If any transactions is still pending at this point
173+
// then if they try to commit, that will be an error because
174+
// the transaction is already rollbacked
175+
await transaction.rollback();
176+
} else {
177+
// This will wait for committing or rollbacking to complete
178+
await transaction.destroy();
179+
}
172180
}
173181
await rocksdbP.dbClose(this._db);
174182
this.logger.info(`Stopped ${this.constructor.name}`);
@@ -212,6 +220,9 @@ class DB {
212220
await tran.rollback(e);
213221
}
214222
} finally {
223+
// If already destroyed, this is a noop
224+
// this will only have affect if there was an
225+
// exception during commit or rollback
215226
await tran.destroy();
216227
}
217228
},

src/DBTransaction.ts

Lines changed: 81 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type {
2020
} from './native/types';
2121
import Logger from '@matrixai/logger';
2222
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
23-
import { RWLockWriter } from '@matrixai/async-locks';
23+
import { Lock, RWLockWriter } from '@matrixai/async-locks';
2424
import DBIterator from './DBIterator';
2525
import { rocksdbP } from './native';
2626
import * as utils from './utils';
@@ -49,8 +49,11 @@ class DBTransaction {
4949
protected _callbacksSuccess: Array<() => any> = [];
5050
protected _callbacksFailure: Array<(e?: Error) => any> = [];
5151
protected _callbacksFinally: Array<(e?: Error) => any> = [];
52+
protected _committing: boolean = false;
5253
protected _committed: boolean = false;
54+
protected _rollbacking: boolean = false;
5355
protected _rollbacked: boolean = false;
56+
protected commitOrRollbackLock: Lock = new Lock();
5457

5558
public constructor({
5659
db,
@@ -86,9 +89,12 @@ class DBTransaction {
8689
*/
8790
public async destroy() {
8891
this.logger.debug(`Destroying ${this.constructor.name} ${this.id}`);
89-
if (!this._committed && !this._rollbacked) {
92+
if (!this._committing && !this._rollbacking) {
9093
throw new errors.ErrorDBTransactionNotCommittedNorRollbacked();
9194
}
95+
// Wait for commit or rollback to finish
96+
// this then allows the destruction to proceed
97+
await this.commitOrRollbackLock.waitForUnlock();
9298
this._db.transactionRefs.delete(this);
9399
// Unlock all locked keys in reverse
94100
const lockedKeys = [...this._locks.keys()].reverse();
@@ -116,10 +122,30 @@ class DBTransaction {
116122
return this._callbacksFinally;
117123
}
118124

125+
/**
126+
* Indicates when `this.commit` is first called
127+
*/
128+
get committing(): boolean {
129+
return this._committing;
130+
}
131+
132+
/**
133+
* Indicates when the transaction is committed
134+
*/
119135
get committed(): boolean {
120136
return this._committed;
121137
}
122138

139+
/**
140+
* Indicates when `this.rollback` is first called
141+
*/
142+
get rollbacking(): boolean {
143+
return this._rollbacking;
144+
}
145+
146+
/**
147+
* Indicates when the transaction is rollbacked
148+
*/
123149
get rollbacked(): boolean {
124150
return this._rollbacked;
125151
}
@@ -437,75 +463,79 @@ class DBTransaction {
437463

438464
@ready(new errors.ErrorDBTransactionDestroyed())
439465
public async commit(): Promise<void> {
440-
if (this._rollbacked) {
466+
if (this._rollbacking) {
441467
throw new errors.ErrorDBTransactionRollbacked();
442468
}
443-
if (this._committed) {
469+
if (this._committing) {
444470
return;
445471
}
472+
this._committing = true;
446473
this.logger.debug(`Committing ${this.constructor.name} ${this.id}`);
447-
for (const iterator of this._iteratorRefs) {
448-
await iterator.destroy();
449-
}
450-
this._committed = true;
451-
try {
474+
await this.commitOrRollbackLock.withF(async () => {
475+
for (const iterator of this._iteratorRefs) {
476+
await iterator.destroy();
477+
}
452478
try {
453-
// If this fails, the `DBTransaction` is still considered committed
454-
// it must be destroyed, it cannot be reused
455-
await rocksdbP.transactionCommit(this._transaction);
456-
} catch (e) {
457-
if (e.code === 'TRANSACTION_CONFLICT') {
458-
this.logger.debug(
459-
`Failed Committing ${this.constructor.name} ${this.id} due to ${errors.ErrorDBTransactionConflict.name}`,
460-
);
461-
throw new errors.ErrorDBTransactionConflict(undefined, {
462-
cause: e,
463-
});
464-
} else {
465-
this.logger.debug(
466-
`Failed Committing ${this.constructor.name} ${this.id} due to ${e.message}`,
467-
);
468-
throw e;
479+
try {
480+
// If this fails, the `DBTransaction` is still considered committed
481+
// it must be destroyed, it cannot be reused
482+
await rocksdbP.transactionCommit(this._transaction);
483+
} catch (e) {
484+
if (e.code === 'TRANSACTION_CONFLICT') {
485+
this.logger.debug(
486+
`Failed Committing ${this.constructor.name} ${this.id} due to ${errors.ErrorDBTransactionConflict.name}`,
487+
);
488+
throw new errors.ErrorDBTransactionConflict(undefined, {
489+
cause: e,
490+
});
491+
} else {
492+
this.logger.debug(
493+
`Failed Committing ${this.constructor.name} ${this.id} due to ${e.message}`,
494+
);
495+
throw e;
496+
}
497+
}
498+
for (const f of this._callbacksSuccess) {
499+
await f();
500+
}
501+
} finally {
502+
for (const f of this._callbacksFinally) {
503+
await f();
469504
}
470505
}
471-
for (const f of this._callbacksSuccess) {
472-
await f();
473-
}
474-
} finally {
475-
for (const f of this._callbacksFinally) {
476-
await f();
477-
}
478-
}
479-
await this.destroy();
506+
this._committed = true;
507+
});
480508
this.logger.debug(`Committed ${this.constructor.name} ${this.id}`);
481509
}
482510

483511
@ready(new errors.ErrorDBTransactionDestroyed())
484512
public async rollback(e?: Error): Promise<void> {
485-
if (this._committed) {
513+
if (this._committing) {
486514
throw new errors.ErrorDBTransactionCommitted();
487515
}
488-
if (this._rollbacked) {
516+
if (this._rollbacking) {
489517
return;
490518
}
519+
this._rollbacking = true;
491520
this.logger.debug(`Rollbacking ${this.constructor.name} ${this.id}`);
492-
for (const iterator of this._iteratorRefs) {
493-
await iterator.destroy();
494-
}
495-
this._rollbacked = true;
496-
try {
497-
// If this fails, the `DBTransaction` is still considered rollbacked
498-
// it must be destroyed, it cannot be reused
499-
await rocksdbP.transactionRollback(this._transaction);
500-
for (const f of this._callbacksFailure) {
501-
await f(e);
521+
await this.commitOrRollbackLock.withF(async () => {
522+
for (const iterator of this._iteratorRefs) {
523+
await iterator.destroy();
502524
}
503-
} finally {
504-
for (const f of this._callbacksFinally) {
505-
await f(e);
525+
try {
526+
// If this fails, the `DBTransaction` is still considered rollbacked
527+
// it must be destroyed, it cannot be reused
528+
await rocksdbP.transactionRollback(this._transaction);
529+
for (const f of this._callbacksFailure) {
530+
await f(e);
531+
}
532+
} finally {
533+
for (const f of this._callbacksFinally) {
534+
await f(e);
535+
}
506536
}
507-
}
508-
await this.destroy();
537+
this._rollbacked = true;
538+
});
509539
this.logger.debug(`Rollbacked ${this.constructor.name} ${this.id}`);
510540
}
511541

tests/DB.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import type DBTransaction from '@/DBTransaction';
12
import type { KeyPath } from '@/types';
3+
import type { ResourceRelease } from '@matrixai/resources';
24
import type { DBWorkerModule } from './workers/dbWorkerModule';
35
import os from 'os';
46
import path from 'path';
@@ -569,4 +571,68 @@ describe(DB.name, () => {
569571
]);
570572
await db.stop();
571573
});
574+
test('dangling transactions are automatically rollbacked', async () => {
575+
const dbPath = `${dataDir}/db`;
576+
let db = await DB.createDB({ dbPath, crypto, logger });
577+
// This is a pending transaction to be committed
578+
// however it hasn't started committing until it's already rollbacked
579+
const p = expect(
580+
db.withTransactionF(async (tran) => {
581+
await tran.put('foo', 'bar');
582+
expect(tran.committing).toBe(false);
583+
}),
584+
).rejects.toThrow(errors.ErrorDBTransactionRollbacked);
585+
await db.stop();
586+
await p;
587+
db = await DB.createDB({ dbPath, crypto, logger });
588+
const acquireTran = db.transaction();
589+
const [releaseTran, tran] = (await acquireTran()) as [
590+
ResourceRelease,
591+
DBTransaction,
592+
];
593+
await tran.put('foo', 'bar');
594+
expect(tran.rollbacking).toBe(false);
595+
expect(tran.rollbacked).toBe(false);
596+
await db.stop();
597+
expect(tran.rollbacking).toBe(true);
598+
expect(tran.rollbacked).toBe(true);
599+
await expect(releaseTran()).rejects.toThrow(
600+
errors.ErrorDBTransactionRollbacked,
601+
);
602+
});
603+
test('dangling committing transactions are waited for', async () => {
604+
const dbPath = `${dataDir}/db`;
605+
const db = await DB.createDB({ dbPath, crypto, logger });
606+
const acquireTran = db.transaction();
607+
const [releaseTran, tran] = (await acquireTran()) as [
608+
ResourceRelease,
609+
DBTransaction,
610+
];
611+
await tran.put('foo', 'bar');
612+
const p = releaseTran();
613+
expect(tran.committing).toBe(true);
614+
expect(tran.committed).toBe(false);
615+
// This will wait for the transaction to be committed
616+
await db.stop();
617+
await p;
618+
expect(tran.committing).toBe(true);
619+
expect(tran.committed).toBe(true);
620+
});
621+
test('dangling rollbacking transactions are waited for', async () => {
622+
const dbPath = `${dataDir}/db`;
623+
const db = await DB.createDB({ dbPath, crypto, logger });
624+
const acquireTran = db.transaction();
625+
const [releaseTran, tran] = (await acquireTran()) as [
626+
ResourceRelease,
627+
DBTransaction,
628+
];
629+
await tran.put('foo', 'bar');
630+
const p = releaseTran(new Error('Trigger Rollback'));
631+
expect(tran.rollbacking).toBe(true);
632+
expect(tran.rollbacked).toBe(false);
633+
await db.stop();
634+
await p;
635+
expect(tran.rollbacking).toBe(true);
636+
expect(tran.rollbacked).toBe(true);
637+
});
572638
});

tests/DBIterator.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { KeyPath } from '@';
1+
import type { KeyPath } from '@/types';
22
import os from 'os';
33
import path from 'path';
44
import fs from 'fs';

0 commit comments

Comments
 (0)