diff --git a/CHANGELOG.md b/CHANGELOG.md index 546ac324..37c2c096 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA32 + +* Added `onChange` method to the PowerSync client. This allows for observing table changes. + ## 1.0.0-BETA31 * Added helpers for Attachment syncing. diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 250d72c3..48283139 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -183,6 +183,25 @@ class DatabaseTest { } } + @Test + fun testTableChangesUpdates() = + databaseTest { + turbineScope { + val query = database.onChange(tables = setOf("users")).testIn(this) + + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("Test", "test@example.org"), + ) + + val changeSet = query.awaitItem() + changeSet.count() shouldBe 1 + changeSet.contains("users") shouldBe true + + query.cancel() + } + } + @Test fun testClosingReadPool() = databaseTest { @@ -373,11 +392,20 @@ class DatabaseTest { @Test fun testCrudTransaction() = databaseTest { - database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("a", "a@example.org")) + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("a", "a@example.org"), + ) database.writeTransaction { - it.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("b", "b@example.org")) - it.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("c", "c@example.org")) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("b", "b@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("c", "c@example.org"), + ) } var transaction = database.getNextCrudTransaction() diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index cb56d606..5a67b551 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -348,10 +348,21 @@ internal class PowerSyncDatabaseImpl( return internalDb.getOptional(sql, parameters, mapper) } + override fun onChange( + tables: Set, + throttleMs: Long, + ): Flow> = + flow { + waitReady() + emitAll( + internalDb.onChange(tables, throttleMs), + ) + } + override fun watch( sql: String, parameters: List?, - throttleMs: Long?, + throttleMs: Long, mapper: (SqlCursor) -> RowType, ): Flow> = flow { diff --git a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt index 3ab9a7bc..c8f3d870 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt @@ -5,6 +5,8 @@ import com.powersync.db.internal.ConnectionContext import com.powersync.db.internal.PowerSyncTransaction import kotlinx.coroutines.flow.Flow import kotlin.coroutines.cancellation.CancellationException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds public fun interface ThrowableTransactionCallback { @Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class) @@ -17,8 +19,21 @@ public fun interface ThrowableLockCallback { } public interface Queries { + public companion object { + /** + * The default throttle duration for [onChange] and [watch] operations. + */ + public val DEFAULT_THROTTLE: Duration = 30.milliseconds + } + /** - * Execute a write query (INSERT, UPDATE, DELETE) + * Executes a write query (INSERT, UPDATE, DELETE). + * + * @param sql The SQL query to execute. + * @param parameters The parameters for the query, or an empty list if none. + * @return The number of rows affected by the query. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public suspend fun execute( @@ -27,9 +42,14 @@ public interface Queries { ): Long /** - * Execute a read-only (SELECT) query and return a single result. - * If there is no result, throws an [IllegalArgumentException]. - * See [getOptional] for queries where the result might be empty. + * Executes a read-only (SELECT) query and returns a single result. + * + * @param sql The SQL query to execute. + * @param parameters The parameters for the query, or an empty list if none. + * @param mapper A function to map the result set to the desired type. + * @return The single result of the query. + * @throws PowerSyncException If a database error occurs or no result is found. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public suspend fun get( @@ -39,7 +59,14 @@ public interface Queries { ): RowType /** - * Execute a read-only (SELECT) query and return the results. + * Executes a read-only (SELECT) query and returns all results. + * + * @param sql The SQL query to execute. + * @param parameters The parameters for the query, or an empty list if none. + * @param mapper A function to map the result set to the desired type. + * @return A list of results. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public suspend fun getAll( @@ -49,7 +76,14 @@ public interface Queries { ): List /** - * Execute a read-only (SELECT) query and return a single optional result. + * Executes a read-only (SELECT) query and returns a single optional result. + * + * @param sql The SQL query to execute. + * @param parameters The parameters for the query, or an empty list if none. + * @param mapper A function to map the result set to the desired type. + * @return The single result of the query, or null if no result is found. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public suspend fun getOptional( @@ -59,65 +93,91 @@ public interface Queries { ): RowType? /** - * Execute a read-only (SELECT) query every time the source tables are modified and return the results as a List in [Flow]. + * Returns a [Flow] that emits whenever the source tables are modified. + * + * @param tables The set of tables to monitor for changes. + * @param throttleMs The minimum interval, in milliseconds, between emissions. Defaults to [DEFAULT_THROTTLE]. Table changes are accumulated while throttling is active. The accumulated set of tables will be emitted on the trailing edge of the throttle. + * @return A [Flow] emitting the set of modified tables. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public fun onChange( + tables: Set, + throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds, + ): Flow> + + /** + * Executes a read-only (SELECT) query every time the source tables are modified and returns the results as a [Flow] of lists. + * + * @param sql The SQL query to execute. + * @param parameters The parameters for the query, or an empty list if none. + * @param throttleMs The minimum interval, in milliseconds, between queries. Defaults to [DEFAULT_THROTTLE]. + * @param mapper A function to map the result set to the desired type. + * @return A [Flow] emitting lists of results. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public fun watch( sql: String, parameters: List? = listOf(), - /** - * Specify the minimum interval, in milliseconds, between queries. - */ - throttleMs: Long? = null, + throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds, mapper: (SqlCursor) -> RowType, ): Flow> /** - * Takes a global lock, without starting a transaction. + * Takes a global lock without starting a transaction. * - * This takes a global lock - only one write transaction can execute against - * the database at a time. This applies even when constructing separate - * database instances for the same database file. - * - * Locks for separate database instances on the same database file - * may be held concurrently. + * This lock ensures that only one write transaction can execute against the database at a time, even across separate database instances for the same file. * * In most cases, [writeTransaction] should be used instead. + * + * @param callback The callback to execute while holding the lock. + * @return The result of the callback. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public suspend fun writeLock(callback: ThrowableLockCallback): R /** - * Open a read-write transaction. + * Opens a read-write transaction. * - * This takes a global lock - only one write transaction can execute against - * the database at a time. This applies even when constructing separate - * database instances for the same database file. + * This takes a global lock, ensuring that only one write transaction can execute against the database at a time, even across separate database instances for the same file. * - * Statements within the transaction must be done on the provided - * [PowerSyncTransaction] - attempting statements on the database - * instance will error cause a dead-lock. + * Statements within the transaction must be done on the provided [PowerSyncTransaction] - attempting statements on the database instance will error cause a dead-lock. + * + * @param callback The callback to execute within the transaction. + * @return The result of the callback. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public suspend fun writeTransaction(callback: ThrowableTransactionCallback): R /** - * Takes a read lock, without starting a transaction. + * Takes a read lock without starting a transaction. * - * The lock only applies to a single SQLite connection, and multiple - * connections may hold read locks at the same time. + * The lock applies only to a single SQLite connection, allowing multiple connections to hold read locks simultaneously. * - * In most cases, [readTransaction] should be used instead. + * @param callback The callback to execute while holding the lock. + * @return The result of the callback. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public suspend fun readLock(callback: ThrowableLockCallback): R /** - * Open a read-only transaction. + * Opens a read-only transaction. + * + * Statements within the transaction must be done on the provided [PowerSyncTransaction] - executing statements on the database level will be executed on separate connections. * - * Statements within the transaction must be done on the provided - * [PowerSyncTransaction] - executing statements on the database level - * will be executed on separate connections. + * @param callback The callback to execute within the transaction. + * @return The result of the callback. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) public suspend fun readTransaction(callback: ThrowableTransactionCallback): R diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index edf5130d..7477f4f8 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -8,6 +8,7 @@ import com.powersync.db.ThrowableLockCallback import com.powersync.db.ThrowableTransactionCallback import com.powersync.db.runWrapped import com.powersync.db.runWrappedSuspending +import com.powersync.utils.AtomicMutableSet import com.powersync.utils.JsonUtil import com.powersync.utils.throttle import kotlinx.coroutines.CoroutineScope @@ -19,6 +20,7 @@ import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.onSubscription +import kotlinx.coroutines.flow.transform import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext @@ -55,10 +57,6 @@ internal class InternalDatabaseImpl( // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. private val dbContext = Dispatchers.IO - companion object { - val DEFAULT_WATCH_THROTTLE = 30.milliseconds - } - override suspend fun execute( sql: String, parameters: List?, @@ -105,10 +103,44 @@ internal class InternalDatabaseImpl( mapper: (SqlCursor) -> RowType, ): RowType? = readLock { connection -> connection.getOptional(sql, parameters, mapper) } + override fun onChange( + tables: Set, + throttleMs: Long, + ): Flow> = + channelFlow { + // Match all possible internal table combinations + val watchedTables = + tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet() + + // Accumulate updates between throttles + val batchedUpdates = AtomicMutableSet() + + updatesOnTables() + .transform { updates -> + val intersection = updates.intersect(watchedTables) + if (intersection.isNotEmpty()) { + // Transform table names using friendlyTableName + val friendlyTableNames = intersection.map { friendlyTableName(it) }.toSet() + batchedUpdates.addAll(friendlyTableNames) + emit(Unit) + } + } + // Throttling here is a feature which prevents watch queries from spamming updates. + // Throttling by design discards and delays events within the throttle window. Discarded events + // still trigger a trailing edge update. + // Backpressure is avoided on the throttling and consumer level by buffering the last upstream value. + .throttle(throttleMs.milliseconds) + .collect { + // Emit the transformed tables which have changed + val copy = batchedUpdates.toSetAndClear() + send(copy) + } + } + override fun watch( sql: String, parameters: List?, - throttleMs: Long?, + throttleMs: Long, mapper: (SqlCursor) -> RowType, ): Flow> = // Use a channel flow here since we throttle (buffer used under the hood) @@ -134,7 +166,7 @@ internal class InternalDatabaseImpl( // still trigger a trailing edge update. // Backpressure is avoided on the throttling and consumer level by buffering the last upstream value. // Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results. - .throttle(throttleMs?.milliseconds ?: DEFAULT_WATCH_THROTTLE) + .throttle(throttleMs.milliseconds) .collect { send(getAll(sql, parameters = parameters, mapper = mapper)) } @@ -272,6 +304,18 @@ internal class InternalDatabaseImpl( ) } +/** + * Converts internal table names (e.g., prefixed with "ps_data__" or "ps_data_local__") + * to their original friendly names by removing the prefixes. If no prefix matches, + * the original table name is returned. + */ +private fun friendlyTableName(table: String): String { + val re = Regex("^ps_data__(.+)$") + val re2 = Regex("^ps_data_local__(.+)$") + val match = re.matchEntire(table) ?: re2.matchEntire(table) + return match?.groupValues?.get(1) ?: table +} + internal fun getBindersFromParams(parameters: List?): (SqlPreparedStatement.() -> Unit)? { if (parameters.isNullOrEmpty()) { return null diff --git a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt index 536ddd1a..05327ef8 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt @@ -11,6 +11,11 @@ public class AtomicMutableSet : SynchronizedObject() { return set.add(element) } + public fun addAll(elements: Set): Boolean = + synchronized(this) { + return set.addAll(elements) + } + // Synchronized clear method public fun clear(): Unit = synchronized(this) {