From dc7ab1326987b9bc166326599deb31e2ddc20288 Mon Sep 17 00:00:00 2001 From: Benoit Letondor Date: Mon, 17 Feb 2025 21:14:35 +0100 Subject: [PATCH] fix: Fix concurrency crash in InternalDatabaseImpl --- .../db/internal/InternalDatabaseImpl.kt | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index d8252877..eeefc0c3 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -22,6 +22,8 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString @@ -69,15 +71,23 @@ internal class InternalDatabaseImpl( init { scope.launch { val accumulatedUpdates = mutableSetOf() + val accumulatedUpdatesSetMutex = Mutex() + tableUpdates() // Debounce will discard any events which occur inside the debounce window // This will accumulate those table updates - .onEach { tables -> accumulatedUpdates.addAll(tables) } + .onEach { tables -> + accumulatedUpdatesSetMutex.withLock { + accumulatedUpdates.addAll(tables) + } + } .debounce(DEFAULT_WATCH_THROTTLE_MS) .collect { - val dataTables = accumulatedUpdates.map { toFriendlyTableName(it) }.filter { it.isNotBlank() } - driver.notifyListeners(queryKeys = dataTables.toTypedArray()) - accumulatedUpdates.clear() + accumulatedUpdatesSetMutex.withLock { + val dataTables = accumulatedUpdates.map { toFriendlyTableName(it) }.filter { it.isNotBlank() } + driver.notifyListeners(queryKeys = dataTables.toTypedArray()) + accumulatedUpdates.clear() + } } } }