diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index d8252877..eeefc0c3 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -22,6 +22,8 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString @@ -69,15 +71,23 @@ internal class InternalDatabaseImpl( init { scope.launch { val accumulatedUpdates = mutableSetOf() + val accumulatedUpdatesSetMutex = Mutex() + tableUpdates() // Debounce will discard any events which occur inside the debounce window // This will accumulate those table updates - .onEach { tables -> accumulatedUpdates.addAll(tables) } + .onEach { tables -> + accumulatedUpdatesSetMutex.withLock { + accumulatedUpdates.addAll(tables) + } + } .debounce(DEFAULT_WATCH_THROTTLE_MS) .collect { - val dataTables = accumulatedUpdates.map { toFriendlyTableName(it) }.filter { it.isNotBlank() } - driver.notifyListeners(queryKeys = dataTables.toTypedArray()) - accumulatedUpdates.clear() + accumulatedUpdatesSetMutex.withLock { + val dataTables = accumulatedUpdates.map { toFriendlyTableName(it) }.filter { it.isNotBlank() } + driver.notifyListeners(queryKeys = dataTables.toTypedArray()) + accumulatedUpdates.clear() + } } } }