Skip to content

Add onChange API #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.0.0-BETA32

* Added `onChange` method to the PowerSync client. This allows for observing table changes.

## 1.0.0-BETA31

* Added helpers for Attachment syncing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,25 @@ class DatabaseTest {
}
}

@Test
fun testTableChangesUpdates() =
databaseTest {
turbineScope {
val query = database.onChange(tables = setOf("users")).testIn(this)

database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("Test", "[email protected]"),
)

val changeSet = query.awaitItem()
changeSet.count() shouldBe 1
changeSet.contains("users") shouldBe true

query.cancel()
}
}

@Test
fun testClosingReadPool() =
databaseTest {
Expand Down Expand Up @@ -373,11 +392,20 @@ class DatabaseTest {
@Test
fun testCrudTransaction() =
databaseTest {
database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("a", "[email protected]"))
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("a", "[email protected]"),
)

database.writeTransaction {
it.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("b", "[email protected]"))
it.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("c", "[email protected]"))
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("b", "[email protected]"),
)
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("c", "[email protected]"),
)
}

var transaction = database.getNextCrudTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,21 @@ internal class PowerSyncDatabaseImpl(
return internalDb.getOptional(sql, parameters, mapper)
}

override fun onChange(
tables: Set<String>,
throttleMs: Long,
): Flow<Set<String>> =
flow {
waitReady()
emitAll(
internalDb.onChange(tables, throttleMs),
)
}

override fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>?,
throttleMs: Long?,
throttleMs: Long,
mapper: (SqlCursor) -> RowType,
): Flow<List<RowType>> =
flow {
Expand Down
126 changes: 93 additions & 33 deletions core/src/commonMain/kotlin/com/powersync/db/Queries.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import com.powersync.db.internal.ConnectionContext
import com.powersync.db.internal.PowerSyncTransaction
import kotlinx.coroutines.flow.Flow
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

public fun interface ThrowableTransactionCallback<R> {
@Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class)
Expand All @@ -17,8 +19,21 @@ public fun interface ThrowableLockCallback<R> {
}

public interface Queries {
public companion object {
/**
* The default throttle duration for [onChange] and [watch] operations.
*/
public val DEFAULT_THROTTLE: Duration = 30.milliseconds
}

/**
* Execute a write query (INSERT, UPDATE, DELETE)
* Executes a write query (INSERT, UPDATE, DELETE).
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @return The number of rows affected by the query.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun execute(
Expand All @@ -27,9 +42,14 @@ public interface Queries {
): Long

/**
* Execute a read-only (SELECT) query and return a single result.
* If there is no result, throws an [IllegalArgumentException].
* See [getOptional] for queries where the result might be empty.
* Executes a read-only (SELECT) query and returns a single result.
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @param mapper A function to map the result set to the desired type.
* @return The single result of the query.
* @throws PowerSyncException If a database error occurs or no result is found.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <RowType : Any> get(
Expand All @@ -39,7 +59,14 @@ public interface Queries {
): RowType

/**
* Execute a read-only (SELECT) query and return the results.
* Executes a read-only (SELECT) query and returns all results.
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @param mapper A function to map the result set to the desired type.
* @return A list of results.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <RowType : Any> getAll(
Expand All @@ -49,7 +76,14 @@ public interface Queries {
): List<RowType>

/**
* Execute a read-only (SELECT) query and return a single optional result.
* Executes a read-only (SELECT) query and returns a single optional result.
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @param mapper A function to map the result set to the desired type.
* @return The single result of the query, or null if no result is found.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <RowType : Any> getOptional(
Expand All @@ -59,65 +93,91 @@ public interface Queries {
): RowType?

/**
* Execute a read-only (SELECT) query every time the source tables are modified and return the results as a List in [Flow].
* Returns a [Flow] that emits whenever the source tables are modified.
*
* @param tables The set of tables to monitor for changes.
* @param throttleMs The minimum interval, in milliseconds, between emissions. Defaults to [DEFAULT_THROTTLE]. Table changes are accumulated while throttling is active. The accumulated set of tables will be emitted on the trailing edge of the throttle.
* @return A [Flow] emitting the set of modified tables.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public fun onChange(
tables: Set<String>,
throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds,
): Flow<Set<String>>

/**
* Executes a read-only (SELECT) query every time the source tables are modified and returns the results as a [Flow] of lists.
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @param throttleMs The minimum interval, in milliseconds, between queries. Defaults to [DEFAULT_THROTTLE].
* @param mapper A function to map the result set to the desired type.
* @return A [Flow] emitting lists of results.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>? = listOf(),
/**
* Specify the minimum interval, in milliseconds, between queries.
*/
throttleMs: Long? = null,
throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds,
mapper: (SqlCursor) -> RowType,
): Flow<List<RowType>>

/**
* Takes a global lock, without starting a transaction.
* Takes a global lock without starting a transaction.
*
* This takes a global lock - only one write transaction can execute against
* the database at a time. This applies even when constructing separate
* database instances for the same database file.
*
* Locks for separate database instances on the same database file
* may be held concurrently.
* This lock ensures that only one write transaction can execute against the database at a time, even across separate database instances for the same file.
*
* In most cases, [writeTransaction] should be used instead.
*
* @param callback The callback to execute while holding the lock.
* @return The result of the callback.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <R> writeLock(callback: ThrowableLockCallback<R>): R

/**
* Open a read-write transaction.
* Opens a read-write transaction.
*
* This takes a global lock - only one write transaction can execute against
* the database at a time. This applies even when constructing separate
* database instances for the same database file.
* This takes a global lock, ensuring that only one write transaction can execute against the database at a time, even across separate database instances for the same file.
*
* Statements within the transaction must be done on the provided
* [PowerSyncTransaction] - attempting statements on the database
* instance will error cause a dead-lock.
* Statements within the transaction must be done on the provided [PowerSyncTransaction] - attempting statements on the database instance will error cause a dead-lock.
*
* @param callback The callback to execute within the transaction.
* @return The result of the callback.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <R> writeTransaction(callback: ThrowableTransactionCallback<R>): R

/**
* Takes a read lock, without starting a transaction.
* Takes a read lock without starting a transaction.
*
* The lock only applies to a single SQLite connection, and multiple
* connections may hold read locks at the same time.
* The lock applies only to a single SQLite connection, allowing multiple connections to hold read locks simultaneously.
*
* In most cases, [readTransaction] should be used instead.
* @param callback The callback to execute while holding the lock.
* @return The result of the callback.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <R> readLock(callback: ThrowableLockCallback<R>): R

/**
* Open a read-only transaction.
* Opens a read-only transaction.
*
* Statements within the transaction must be done on the provided [PowerSyncTransaction] - executing statements on the database level will be executed on separate connections.
*
* Statements within the transaction must be done on the provided
* [PowerSyncTransaction] - executing statements on the database level
* will be executed on separate connections.
* @param callback The callback to execute within the transaction.
* @return The result of the callback.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.powersync.db.ThrowableLockCallback
import com.powersync.db.ThrowableTransactionCallback
import com.powersync.db.runWrapped
import com.powersync.db.runWrappedSuspending
import com.powersync.utils.AtomicMutableSet
import com.powersync.utils.JsonUtil
import com.powersync.utils.throttle
import kotlinx.coroutines.CoroutineScope
Expand All @@ -19,6 +20,7 @@ import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -55,10 +57,6 @@ internal class InternalDatabaseImpl(
// Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss.
private val dbContext = Dispatchers.IO

companion object {
val DEFAULT_WATCH_THROTTLE = 30.milliseconds
}

override suspend fun execute(
sql: String,
parameters: List<Any?>?,
Expand Down Expand Up @@ -105,10 +103,44 @@ internal class InternalDatabaseImpl(
mapper: (SqlCursor) -> RowType,
): RowType? = readLock { connection -> connection.getOptional(sql, parameters, mapper) }

override fun onChange(
tables: Set<String>,
throttleMs: Long,
): Flow<Set<String>> =
channelFlow {
// Match all possible internal table combinations
val watchedTables =
tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet()

// Accumulate updates between throttles
val batchedUpdates = AtomicMutableSet<String>()

updatesOnTables()
.transform { updates ->
val intersection = updates.intersect(watchedTables)
if (intersection.isNotEmpty()) {
// Transform table names using friendlyTableName
val friendlyTableNames = intersection.map { friendlyTableName(it) }.toSet()
batchedUpdates.addAll(friendlyTableNames)
emit(Unit)
}
}
// Throttling here is a feature which prevents watch queries from spamming updates.
// Throttling by design discards and delays events within the throttle window. Discarded events
// still trigger a trailing edge update.
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
.throttle(throttleMs.milliseconds)
.collect {
// Emit the transformed tables which have changed
val copy = batchedUpdates.toSetAndClear()
send(copy)
}
}

override fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>?,
throttleMs: Long?,
throttleMs: Long,
mapper: (SqlCursor) -> RowType,
): Flow<List<RowType>> =
// Use a channel flow here since we throttle (buffer used under the hood)
Expand All @@ -134,7 +166,7 @@ internal class InternalDatabaseImpl(
// still trigger a trailing edge update.
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
// Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results.
.throttle(throttleMs?.milliseconds ?: DEFAULT_WATCH_THROTTLE)
.throttle(throttleMs.milliseconds)
.collect {
send(getAll(sql, parameters = parameters, mapper = mapper))
}
Expand Down Expand Up @@ -272,6 +304,18 @@ internal class InternalDatabaseImpl(
)
}

/**
* Converts internal table names (e.g., prefixed with "ps_data__" or "ps_data_local__")
* to their original friendly names by removing the prefixes. If no prefix matches,
* the original table name is returned.
*/
private fun friendlyTableName(table: String): String {
val re = Regex("^ps_data__(.+)$")
val re2 = Regex("^ps_data_local__(.+)$")
val match = re.matchEntire(table) ?: re2.matchEntire(table)
return match?.groupValues?.get(1) ?: table
}

internal fun getBindersFromParams(parameters: List<Any?>?): (SqlPreparedStatement.() -> Unit)? {
if (parameters.isNullOrEmpty()) {
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ public class AtomicMutableSet<T> : SynchronizedObject() {
return set.add(element)
}

public fun addAll(elements: Set<T>): Boolean =
synchronized(this) {
return set.addAll(elements)
}

// Synchronized clear method
public fun clear(): Unit =
synchronized(this) {
Expand Down