Skip to content

Commit 8a6d767

Browse files
Accumulate updated during throttle. Cleanup default throttle duration. Cleanup comments.
1 parent cb81f6b commit 8a6d767

File tree

4 files changed

+33
-16
lines changed

4 files changed

+33
-16
lines changed

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ internal class PowerSyncDatabaseImpl(
350350

351351
override fun onChange(
352352
tables: Set<String>,
353-
throttleMs: Long?,
353+
throttleMs: Long,
354354
): Flow<Set<String>> =
355355
flow {
356356
waitReady()
@@ -362,7 +362,7 @@ internal class PowerSyncDatabaseImpl(
362362
override fun <RowType : Any> watch(
363363
sql: String,
364364
parameters: List<Any?>?,
365-
throttleMs: Long?,
365+
throttleMs: Long,
366366
mapper: (SqlCursor) -> RowType,
367367
): Flow<List<RowType>> =
368368
flow {

core/src/commonMain/kotlin/com/powersync/db/Queries.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import com.powersync.db.internal.ConnectionContext
55
import com.powersync.db.internal.PowerSyncTransaction
66
import kotlinx.coroutines.flow.Flow
77
import kotlin.coroutines.cancellation.CancellationException
8+
import kotlin.time.Duration
9+
import kotlin.time.Duration.Companion.milliseconds
810

911
public fun interface ThrowableTransactionCallback<R> {
1012
@Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class)
@@ -17,6 +19,13 @@ public fun interface ThrowableLockCallback<R> {
1719
}
1820

1921
public interface Queries {
22+
public companion object {
23+
/**
24+
* The default throttle duration for [onChange] and [watch] operations.
25+
*/
26+
public val DEFAULT_THROTTLE: Duration = 30.milliseconds
27+
}
28+
2029
/**
2130
* Executes a write query (INSERT, UPDATE, DELETE).
2231
*
@@ -87,23 +96,23 @@ public interface Queries {
8796
* Returns a [Flow] that emits whenever the source tables are modified.
8897
*
8998
* @param tables The set of tables to monitor for changes.
90-
* @param throttleMs The minimum interval, in milliseconds, between queries. Defaults to null.
99+
* @param throttleMs The minimum interval, in milliseconds, between emissions. Defaults to [DEFAULT_THROTTLE]. Table changes are accumulated while throttling is active. The accumulated set of tables will be emitted on the trailing edge of the throttle.
91100
* @return A [Flow] emitting the set of modified tables.
92101
* @throws PowerSyncException If a database error occurs.
93102
* @throws CancellationException If the operation is cancelled.
94103
*/
95104
@Throws(PowerSyncException::class, CancellationException::class)
96105
public fun onChange(
97106
tables: Set<String>,
98-
throttleMs: Long? = null,
107+
throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds,
99108
): Flow<Set<String>>
100109

101110
/**
102111
* Executes a read-only (SELECT) query every time the source tables are modified and returns the results as a [Flow] of lists.
103112
*
104113
* @param sql The SQL query to execute.
105114
* @param parameters The parameters for the query, or an empty list if none.
106-
* @param throttleMs The minimum interval, in milliseconds, between queries. Defaults to null.
115+
* @param throttleMs The minimum interval, in milliseconds, between queries. Defaults to [DEFAULT_THROTTLE].
107116
* @param mapper A function to map the result set to the desired type.
108117
* @return A [Flow] emitting lists of results.
109118
* @throws PowerSyncException If a database error occurs.
@@ -113,7 +122,7 @@ public interface Queries {
113122
public fun <RowType : Any> watch(
114123
sql: String,
115124
parameters: List<Any?>? = listOf(),
116-
throttleMs: Long? = null,
125+
throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds,
117126
mapper: (SqlCursor) -> RowType,
118127
): Flow<List<RowType>>
119128

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.powersync.db.ThrowableLockCallback
88
import com.powersync.db.ThrowableTransactionCallback
99
import com.powersync.db.runWrapped
1010
import com.powersync.db.runWrappedSuspending
11+
import com.powersync.utils.AtomicMutableSet
1112
import com.powersync.utils.JsonUtil
1213
import com.powersync.utils.throttle
1314
import kotlinx.coroutines.CoroutineScope
@@ -56,10 +57,6 @@ internal class InternalDatabaseImpl(
5657
// Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss.
5758
private val dbContext = Dispatchers.IO
5859

59-
companion object {
60-
val DEFAULT_WATCH_THROTTLE = 30.milliseconds
61-
}
62-
6360
override suspend fun execute(
6461
sql: String,
6562
parameters: List<Any?>?,
@@ -108,36 +105,42 @@ internal class InternalDatabaseImpl(
108105

109106
override fun onChange(
110107
tables: Set<String>,
111-
throttleMs: Long?,
108+
throttleMs: Long,
112109
): Flow<Set<String>> =
113110
channelFlow {
114111
// Match all possible internal table combinations
115112
val watchedTables =
116113
tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet()
117114

115+
// Accumulate updates between throttles
116+
val batchedUpdates = AtomicMutableSet<String>()
117+
118118
updatesOnTables()
119119
.transform { updates ->
120120
val intersection = updates.intersect(watchedTables)
121121
if (intersection.isNotEmpty()) {
122122
// Transform table names using friendlyTableName
123-
emit(intersection.map { friendlyTableName(it) }.toSet())
123+
val friendlyTableNames = intersection.map { friendlyTableName(it) }.toSet()
124+
batchedUpdates.addAll(friendlyTableNames)
125+
emit(Unit)
124126
}
125127
}
126128
// Throttling here is a feature which prevents watch queries from spamming updates.
127129
// Throttling by design discards and delays events within the throttle window. Discarded events
128130
// still trigger a trailing edge update.
129131
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
130-
.throttle(throttleMs?.milliseconds ?: DEFAULT_WATCH_THROTTLE)
132+
.throttle(throttleMs.milliseconds)
131133
.collect {
132134
// Emit the transformed tables which have changed
133-
send(it)
135+
val copy = batchedUpdates.toSetAndClear()
136+
send(copy)
134137
}
135138
}
136139

137140
override fun <RowType : Any> watch(
138141
sql: String,
139142
parameters: List<Any?>?,
140-
throttleMs: Long?,
143+
throttleMs: Long,
141144
mapper: (SqlCursor) -> RowType,
142145
): Flow<List<RowType>> =
143146
// Use a channel flow here since we throttle (buffer used under the hood)
@@ -163,7 +166,7 @@ internal class InternalDatabaseImpl(
163166
// still trigger a trailing edge update.
164167
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
165168
// Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results.
166-
.throttle(throttleMs?.milliseconds ?: DEFAULT_WATCH_THROTTLE)
169+
.throttle(throttleMs.milliseconds)
167170
.collect {
168171
send(getAll(sql, parameters = parameters, mapper = mapper))
169172
}

core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ public class AtomicMutableSet<T> : SynchronizedObject() {
1111
return set.add(element)
1212
}
1313

14+
public fun addAll(elements: Set<T>): Boolean =
15+
synchronized(this) {
16+
return set.addAll(elements)
17+
}
18+
1419
// Synchronized clear method
1520
public fun clear(): Unit =
1621
synchronized(this) {

0 commit comments

Comments
 (0)