Skip to content

Commit 025b1d2

Browse files
Merge pull request #136 from powersync-ja/watches
Improve Watch Queries
1 parent 54628cf commit 025b1d2

File tree

15 files changed

+196
-181
lines changed

15 files changed

+196
-181
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ captures
2222
Pods/
2323
dialect/bin
2424
.build
25+
.vscode

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 1.0.0-BETA27
4+
5+
* Improved watch query internals. Added the ability to throttle watched queries.
6+
37
## 1.0.0-BETA26
48

59
* Support bucket priorities and partial syncs.

core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
package com.powersync
22

3-
import androidx.test.platform.app.InstrumentationRegistry
43
import androidx.test.ext.junit.runners.AndroidJUnit4
4+
import androidx.test.platform.app.InstrumentationRegistry
55
import app.cash.turbine.turbineScope
66
import com.powersync.db.schema.Schema
77
import com.powersync.testutils.UserRow
8+
import kotlinx.coroutines.*
89
import kotlinx.coroutines.runBlocking
910
import kotlinx.coroutines.test.runTest
1011
import org.junit.After
11-
12-
import org.junit.Test
13-
import org.junit.runner.RunWith
14-
1512
import org.junit.Assert.*
1613
import org.junit.Before
14+
import org.junit.Test
15+
import org.junit.runner.RunWith
1716

1817
@RunWith(AndroidJUnit4::class)
1918
class AndroidDatabaseTest {
@@ -91,4 +90,4 @@ class AndroidDatabaseTest {
9190
query.cancel()
9291
}
9392
}
94-
}
93+
}
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
package com.powersync
22

33
import app.cash.sqldelight.db.SqlDriver
4+
import com.powersync.utils.AtomicMutableSet
45
import kotlinx.coroutines.CoroutineScope
5-
import kotlinx.coroutines.flow.Flow
66
import kotlinx.coroutines.flow.MutableSharedFlow
7+
import kotlinx.coroutines.flow.SharedFlow
78
import kotlinx.coroutines.flow.asSharedFlow
8-
import kotlinx.coroutines.flow.filter
9-
import kotlinx.coroutines.flow.map
10-
import kotlinx.coroutines.launch
119

1210
internal class PsSqlDriver(
1311
private val driver: SqlDriver,
1412
private val scope: CoroutineScope,
1513
) : SqlDriver by driver {
1614
// MutableSharedFlow to emit batched table updates
17-
private val tableUpdatesFlow = MutableSharedFlow<List<String>>(replay = 0)
15+
private val tableUpdatesFlow = MutableSharedFlow<Set<String>>(replay = 0)
1816

1917
// In-memory buffer to store table names before flushing
20-
private val pendingUpdates = mutableSetOf<String>()
18+
private val pendingUpdates = AtomicMutableSet<String>()
2119

2220
fun updateTable(tableName: String) {
2321
pendingUpdates.add(tableName)
@@ -27,20 +25,13 @@ internal class PsSqlDriver(
2725
pendingUpdates.clear()
2826
}
2927

30-
// Flows on table updates
31-
fun tableUpdates(): Flow<List<String>> = tableUpdatesFlow.asSharedFlow()
28+
// Flows on any table change
29+
// This specifically returns a SharedFlow for downstream timing considerations
30+
fun updatesOnTables(): SharedFlow<Set<String>> =
31+
tableUpdatesFlow
32+
.asSharedFlow()
3233

33-
// Flows on table updates containing a specific table
34-
fun updatesOnTable(tableName: String): Flow<Unit> = tableUpdates().filter { it.contains(tableName) }.map { }
35-
36-
fun fireTableUpdates() {
37-
val updates = pendingUpdates.toList()
38-
if (updates.isEmpty()) {
39-
return
40-
}
41-
scope.launch {
42-
tableUpdatesFlow.emit(updates)
43-
}
44-
pendingUpdates.clear()
34+
suspend fun fireTableUpdates() {
35+
tableUpdatesFlow.emit(pendingUpdates.toSetAndClear())
4536
}
4637
}

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt

-13
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ internal class BucketStorageImpl(
1818
private val db: InternalDatabase,
1919
private val logger: Logger,
2020
) : BucketStorage {
21-
private val tableNames: MutableSet<String> = mutableSetOf()
2221
private var hasCompletedSync = AtomicBoolean(false)
2322
private var pendingBucketDeletes = AtomicBoolean(false)
2423

@@ -32,18 +31,6 @@ internal class BucketStorageImpl(
3231
const val COMPACT_OPERATION_INTERVAL = 1_000
3332
}
3433

35-
init {
36-
readTableNames()
37-
}
38-
39-
private fun readTableNames() {
40-
tableNames.clear()
41-
// Query to get existing table names
42-
val names = db.getExistingTableNames("ps_data_*")
43-
44-
tableNames.addAll(names)
45-
}
46-
4734
override fun getMaxOpId(): String = MAX_OP_ID
4835

4936
override suspend fun getClientId(): String {

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

+16-6
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ import com.powersync.sync.SyncStatusData
2222
import com.powersync.sync.SyncStream
2323
import com.powersync.utils.JsonParam
2424
import com.powersync.utils.JsonUtil
25+
import com.powersync.utils.throttle
2526
import com.powersync.utils.toJsonObject
2627
import kotlinx.coroutines.CoroutineScope
2728
import kotlinx.coroutines.FlowPreview
2829
import kotlinx.coroutines.Job
2930
import kotlinx.coroutines.cancelAndJoin
3031
import kotlinx.coroutines.flow.Flow
31-
import kotlinx.coroutines.flow.debounce
32+
import kotlinx.coroutines.flow.filter
3233
import kotlinx.coroutines.flow.first
3334
import kotlinx.coroutines.launch
3435
import kotlinx.coroutines.runBlocking
@@ -141,9 +142,13 @@ internal class PowerSyncDatabaseImpl(
141142

142143
uploadJob =
143144
scope.launch {
144-
internalDb.updatesOnTable(InternalTable.CRUD.toString()).debounce(crudThrottleMs).collect {
145-
syncStream!!.triggerCrudUpload()
146-
}
145+
internalDb
146+
.updatesOnTables()
147+
.filter { it.contains(InternalTable.CRUD.toString()) }
148+
.throttle(crudThrottleMs)
149+
.collect {
150+
syncStream!!.triggerCrudUpload()
151+
}
147152
}
148153
}
149154

@@ -233,8 +238,9 @@ internal class PowerSyncDatabaseImpl(
233238
override fun <RowType : Any> watch(
234239
sql: String,
235240
parameters: List<Any?>?,
241+
throttleMs: Long?,
236242
mapper: (SqlCursor) -> RowType,
237-
): Flow<List<RowType>> = internalDb.watch(sql, parameters, mapper)
243+
): Flow<List<RowType>> = internalDb.watch(sql, parameters, throttleMs, mapper)
238244

239245
override suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R = internalDb.writeTransaction(callback)
240246

@@ -280,7 +286,11 @@ internal class PowerSyncDatabaseImpl(
280286
syncStream = null
281287
}
282288

283-
currentStatus.update(connected = false, connecting = false, lastSyncedAt = currentStatus.lastSyncedAt)
289+
currentStatus.update(
290+
connected = false,
291+
connecting = false,
292+
lastSyncedAt = currentStatus.lastSyncedAt,
293+
)
284294
}
285295

286296
override suspend fun disconnectAndClear(clearLocal: Boolean) {

core/src/commonMain/kotlin/com/powersync/db/Queries.kt

+4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public interface Queries {
5454
public fun <RowType : Any> watch(
5555
sql: String,
5656
parameters: List<Any?>? = listOf(),
57+
/**
58+
* Specify the minimum interval, in milliseconds, between queries.
59+
*/
60+
throttleMs: Long? = null,
5761
mapper: (SqlCursor) -> RowType,
5862
): Flow<List<RowType>>
5963

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,13 @@ import app.cash.sqldelight.db.Closeable
44
import com.persistence.PowersyncQueries
55
import com.powersync.db.Queries
66
import com.powersync.persistence.PsDatabase
7-
import kotlinx.coroutines.flow.Flow
7+
import kotlinx.coroutines.flow.SharedFlow
88

99
internal interface InternalDatabase :
1010
Queries,
1111
Closeable {
1212
val transactor: PsDatabase
1313
val queries: PowersyncQueries
1414

15-
fun getExistingTableNames(tableGlob: String): List<String>
16-
17-
fun updatesOnTable(tableName: String): Flow<Unit>
15+
fun updatesOnTables(): SharedFlow<Set<String>>
1816
}

0 commit comments

Comments
 (0)