Skip to content

Commit 08495cf

Browse files
authored
Merge pull request #14 from journeyapps/fix-sharedmutex-close
Fix closing of SharedMutex
2 parents d937c5d + d1cdcf4 commit 08495cf

File tree

5 files changed

+93
-2
lines changed

5 files changed

+93
-2
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 0.5.2
2+
3+
- Fix releasing of locks when closing `SharedMutex``.
4+
15
## 0.5.1
26

37
- Fix `watch` when called with query parameters.

lib/src/mutex.dart

+32
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ class SerializedMutex {
126126
/// Uses a [SendPort] to communicate with the source mutex.
127127
class SharedMutex implements Mutex {
128128
final ChildPortClient client;
129+
bool closed = false;
129130

130131
SharedMutex._(this.client);
131132

@@ -135,6 +136,9 @@ class SharedMutex implements Mutex {
135136
throw LockError('Recursive lock is not allowed');
136137
}
137138
return runZoned(() async {
139+
if (closed) {
140+
throw const ClosedException();
141+
}
138142
await _acquire(timeout: timeout);
139143
try {
140144
final T result = await callback();
@@ -174,7 +178,20 @@ class SharedMutex implements Mutex {
174178
}
175179

176180
@override
181+
182+
/// Wait for existing locks to be released, then close this SharedMutex
183+
/// and prevent further locks from being taken out.
177184
Future<void> close() async {
185+
if (closed) {
186+
return;
187+
}
188+
closed = true;
189+
// Wait for any existing locks to complete, then prevent any further locks from being taken out.
190+
await _acquire();
191+
client.fire(const _CloseMessage());
192+
// Close client immediately after _unlock(),
193+
// so that we're sure no further locks are acquired.
194+
// This also cancels any lock request in process.
178195
client.close();
179196
}
180197
}
@@ -184,6 +201,7 @@ class _SharedMutexServer {
184201
Completer? unlock;
185202
late final SerializedMutex serialized;
186203
final Mutex mutex;
204+
bool closed = false;
187205

188206
late final PortServer server;
189207

@@ -198,6 +216,11 @@ class _SharedMutexServer {
198216
if (arg is _AcquireMessage) {
199217
var lock = Completer.sync();
200218
mutex.lock(() async {
219+
if (closed) {
220+
// The client will error already - we just need to ensure
221+
// we don't take out another lock.
222+
return;
223+
}
201224
assert(unlock == null);
202225
unlock = Completer.sync();
203226
lock.complete();
@@ -208,6 +231,10 @@ class _SharedMutexServer {
208231
} else if (arg is _UnlockMessage) {
209232
assert(unlock != null);
210233
unlock!.complete();
234+
} else if (arg is _CloseMessage) {
235+
// Unlock and close (from client side)
236+
closed = true;
237+
unlock?.complete();
211238
}
212239
}
213240

@@ -224,6 +251,11 @@ class _UnlockMessage {
224251
const _UnlockMessage();
225252
}
226253

254+
/// Unlock and close
255+
class _CloseMessage {
256+
const _CloseMessage();
257+
}
258+
227259
class LockError extends Error {
228260
final String message;
229261

lib/src/port_channel.dart

+9-1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class ChildPortClient implements PortClient {
125125
final SendPort sendPort;
126126
final ReceivePort receivePort = ReceivePort();
127127
int _nextId = 1;
128+
bool closed = false;
128129

129130
final Map<int, Completer<Object?>> handlers = HashMap();
130131

@@ -144,6 +145,9 @@ class ChildPortClient implements PortClient {
144145

145146
@override
146147
Future<T> post<T>(Object message) async {
148+
if (closed) {
149+
throw const ClosedException();
150+
}
147151
var completer = Completer<T>.sync();
148152
var id = _nextId++;
149153
handlers[id] = completer;
@@ -153,18 +157,22 @@ class ChildPortClient implements PortClient {
153157

154158
@override
155159
void fire(Object message) {
160+
if (closed) {
161+
throw ClosedException();
162+
}
156163
sendPort.send(_FireMessage(message));
157164
}
158165

159166
void _cancelAll(Object error) {
160-
var handlers = this.handlers;
167+
var handlers = HashMap<int, Completer<Object?>>.from(this.handlers);
161168
this.handlers.clear();
162169
for (var message in handlers.values) {
163170
message.completeError(error);
164171
}
165172
}
166173

167174
void close() {
175+
closed = true;
168176
_cancelAll(const ClosedException());
169177
receivePort.close();
170178
}

pubspec.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: sqlite_async
22
description: High-performance asynchronous interface for SQLite on Dart and Flutter.
3-
version: 0.5.1
3+
version: 0.5.2
44
repository: https://github.com/journeyapps/sqlite_async.dart
55
environment:
66
sdk: '>=2.19.1 <4.0.0'

test/mutex_test.dart

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import 'dart:isolate';
2+
3+
import 'package:sqlite_async/mutex.dart';
4+
import 'package:test/test.dart';
5+
6+
void main() {
7+
group('Mutex Tests', () {
8+
test('Closing', () async {
9+
// Test that locks are properly released when calling SharedMutex.close()
10+
// in in Isolate.
11+
// A timeout in this test indicates a likely error.
12+
for (var i = 0; i < 50; i++) {
13+
final mutex = SimpleMutex();
14+
final serialized = mutex.shared;
15+
16+
final result = await Isolate.run(() async {
17+
return _lockInIsolate(serialized);
18+
});
19+
20+
await mutex.lock(() async {});
21+
22+
expect(result, equals(5));
23+
}
24+
});
25+
}, timeout: const Timeout(Duration(milliseconds: 5000)));
26+
}
27+
28+
Future<Object> _lockInIsolate(
29+
SerializedMutex smutex,
30+
) async {
31+
final mutex = smutex.open();
32+
// Start a "thread" that repeatedly takes a lock
33+
_infiniteLock(mutex).ignore();
34+
await Future.delayed(const Duration(milliseconds: 10));
35+
// Then close the mutex while the above loop is running.
36+
await mutex.close();
37+
38+
return 5;
39+
}
40+
41+
Future<void> _infiniteLock(SharedMutex mutex) async {
42+
while (true) {
43+
await mutex.lock(() async {
44+
await Future.delayed(const Duration(milliseconds: 1));
45+
});
46+
}
47+
}

0 commit comments

Comments
 (0)