diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 87a44af8..3b41fe64 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -175,6 +175,8 @@ kotlin { implementation(libs.ktor.client.contentnegotiation) implementation(libs.ktor.serialization.json) implementation(libs.kotlinx.io) + api(libs.rsocket.core) + implementation(libs.rsocket.transport.websocket) implementation(libs.kotlinx.coroutines.core) implementation(libs.kotlinx.datetime) implementation(libs.stately.concurrency) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/AbstractSyncTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/AbstractSyncTest.kt new file mode 100644 index 00000000..6e54618a --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/AbstractSyncTest.kt @@ -0,0 +1,16 @@ +package com.powersync.sync + +import com.powersync.ExperimentalPowerSyncAPI + +/** + * Small utility to run tests both with the legacy Kotlin sync implementation and the new + * implementation from the core extension. + */ +abstract class AbstractSyncTest( + private val useNewSyncImplementation: Boolean, +) { + @OptIn(ExperimentalPowerSyncAPI::class) + val options: SyncOptions get() { + return SyncOptions(useNewSyncImplementation) + } +} diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index fd91ff77..de823ca8 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -1,7 +1,10 @@ -package com.powersync +package com.powersync.sync import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi +import com.powersync.PowerSyncDatabase +import com.powersync.PowerSyncException +import com.powersync.TestConnector import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint @@ -11,36 +14,43 @@ import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema -import com.powersync.sync.SyncLine import com.powersync.testutils.UserRow import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor import com.powersync.utils.JsonUtil +import dev.mokkery.answering.returns +import dev.mokkery.every import dev.mokkery.verify +import dev.mokkery.verifyNoMoreCalls +import dev.mokkery.verifySuspend import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.shouldBe import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.serialization.encodeToString +import kotlinx.coroutines.launch import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertNotNull import kotlin.time.Duration.Companion.seconds -class SyncIntegrationTest { +@OptIn(LegacySyncImplementation::class) +abstract class BaseSyncIntegrationTest( + useNewSyncImplementation: Boolean, +) : AbstractSyncTest( + useNewSyncImplementation, + ) { private suspend fun PowerSyncDatabase.expectUserCount(amount: Int) { val users = getAll("SELECT * FROM users;") { UserRow.from(it) } users shouldHaveSize amount } @Test - @OptIn(DelicateCoroutinesApi::class) fun connectImmediately() = databaseTest(createInitialDatabase = false) { // Regression test for https://github.com/powersync-ja/powersync-kotlin/issues/169 val database = openDatabase() - database.connect(connector) + database.connect(connector, options = options) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -53,7 +63,7 @@ class SyncIntegrationTest { @OptIn(DelicateCoroutinesApi::class) fun closesResponseStreamOnDatabaseClose() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -72,7 +82,7 @@ class SyncIntegrationTest { @OptIn(DelicateCoroutinesApi::class) fun cleansResourcesOnDisconnect() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -93,7 +103,7 @@ class SyncIntegrationTest { @Test fun cannotUpdateSchemaWhileConnected() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -111,7 +121,7 @@ class SyncIntegrationTest { @Test fun testPartialSync() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) val checksums = buildList { @@ -202,7 +212,7 @@ class SyncIntegrationTest { @Test fun testRemembersLastPartialSync() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) syncLines.send( SyncLine.FullCheckpoint( @@ -238,7 +248,7 @@ class SyncIntegrationTest { @Test fun setsDownloadingState() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -272,7 +282,7 @@ class SyncIntegrationTest { turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) - database.connect(connector) + database.connect(connector, options = options) turbine.waitFor { it.connecting } database.disconnect() @@ -285,7 +295,7 @@ class SyncIntegrationTest { @Test fun testMultipleSyncsDoNotCreateMultipleStatusEntries() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -331,8 +341,8 @@ class SyncIntegrationTest { turbineScope(timeout = 10.0.seconds) { // Connect the first database - database.connect(connector) - db2.connect(connector) + database.connect(connector, options = options) + db2.connect(connector, options = options) waitFor { assertNotNull( @@ -357,10 +367,10 @@ class SyncIntegrationTest { val turbine2 = db2.currentStatus.asFlow().testIn(this) // Connect the first database - database.connect(connector) + database.connect(connector, options = options) turbine1.waitFor { it.connecting } - db2.connect(connector) + db2.connect(connector, options = options) // Should not be connecting yet db2.currentStatus.connecting shouldBe false @@ -384,13 +394,13 @@ class SyncIntegrationTest { turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) - database.connect(connector, 1000L) + database.connect(connector, 1000L, options = options) turbine.waitFor { it.connecting } database.disconnect() turbine.waitFor { !it.connecting } - database.connect(connector, 1000L) + database.connect(connector, 1000L, options = options) turbine.waitFor { it.connecting } database.disconnect() turbine.waitFor { !it.connecting } @@ -405,10 +415,10 @@ class SyncIntegrationTest { turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) - database.connect(connector, 1000L, retryDelayMs = 5000) + database.connect(connector, 1000L, retryDelayMs = 5000, options = options) turbine.waitFor { it.connecting } - database.connect(connector, 1000L, retryDelayMs = 5000) + database.connect(connector, 1000L, retryDelayMs = 5000, options = options) turbine.waitFor { it.connecting } turbine.cancel() @@ -421,7 +431,7 @@ class SyncIntegrationTest { databaseTest { val testConnector = TestConnector() connector = testConnector - database.connect(testConnector) + database.connect(testConnector, options = options) suspend fun expectUserRows(amount: Int) { val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! } @@ -440,8 +450,15 @@ class SyncIntegrationTest { // Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully // connected). + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + syncLines.send(SyncLine.KeepAlive(1234)) + turbine.waitFor { it.connected } + turbine.cancelAndIgnoreRemainingEvents() + } + database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "local@example.org")) - syncLines.send(SyncLine.KeepAlive(1234)) + expectUserRows(1) uploadStarted.await() @@ -508,6 +525,7 @@ class SyncIntegrationTest { } completeUpload.complete(Unit) requestedCheckpoint.await() + logger.d { "Did request checkpoint" } // This should apply the checkpoint turbineScope { @@ -519,4 +537,70 @@ class SyncIntegrationTest { // Meaning that the two rows are now visible database.expectUserCount(2) } + + @Test + fun testTokenExpired() = + databaseTest { + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + + database.connect(connector, 1000L, retryDelayMs = 5000, options = options) + turbine.waitFor { it.connecting } + + syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000)) + turbine.waitFor { it.connected } + verifySuspend { connector.getCredentialsCached() } + verifyNoMoreCalls(connector) + + // Should invalidate credentials when token expires + syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 0)) + turbine.waitFor { !it.connected } + verify { connector.invalidateCredentials() } + + turbine.cancel() + } + } +} + +class LegacySyncIntegrationTest : BaseSyncIntegrationTest(false) + +class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { + // The legacy sync implementation doesn't prefetch credentials. + + @OptIn(LegacySyncImplementation::class) + @Test + fun testTokenPrefetch() = + databaseTest { + val prefetchCalled = CompletableDeferred() + val completePrefetch = CompletableDeferred() + every { connector.prefetchCredentials() } returns + scope.launch { + prefetchCalled.complete(Unit) + completePrefetch.await() + } + + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + + database.connect(connector, 1000L, retryDelayMs = 5000, options = options) + turbine.waitFor { it.connecting } + + syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000)) + turbine.waitFor { it.connected } + verifySuspend { connector.getCredentialsCached() } + verifyNoMoreCalls(connector) + + syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10)) + prefetchCalled.complete(Unit) + // Should still be connected before prefetch completes + database.currentStatus.connected shouldBe true + + // After the prefetch completes, we should reconnect + completePrefetch.complete(Unit) + turbine.waitFor { !it.connected } + + turbine.waitFor { it.connected } + turbine.cancel() + } + } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index e7fe1d57..93739e3e 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -18,7 +18,12 @@ import kotlin.test.assertFalse import kotlin.test.assertNull import kotlin.test.assertTrue -class SyncProgressTest { +@OptIn(LegacySyncImplementation::class) +abstract class BaseSyncProgressTest( + useNewSyncImplementation: Boolean, +) : AbstractSyncTest( + useNewSyncImplementation, + ) { private var lastOpId = 0 @BeforeTest @@ -104,7 +109,7 @@ class SyncProgressTest { @Test fun withoutPriorities() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) @@ -153,7 +158,7 @@ class SyncProgressTest { @Test fun interruptedSync() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) @@ -183,7 +188,7 @@ class SyncProgressTest { // And reconnecting database = openDatabase() syncLines = Channel() - database.connect(connector) + database.connect(connector, options = options) turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) @@ -217,7 +222,7 @@ class SyncProgressTest { @Test fun interruptedSyncWithNewCheckpoint() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) @@ -243,7 +248,7 @@ class SyncProgressTest { syncLines.close() database = openDatabase() syncLines = Channel() - database.connect(connector) + database.connect(connector, options = options) turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) @@ -276,7 +281,7 @@ class SyncProgressTest { @Test fun differentPriorities() = databaseTest { - database.connect(connector) + database.connect(connector, options = options) turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) @@ -341,3 +346,7 @@ class SyncProgressTest { syncLines.close() } } + +class LegacySyncProgressTest : BaseSyncProgressTest(false) + +class NewSyncProgressTest : BaseSyncProgressTest(true) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index f4353eac..f55f2440 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -1,3 +1,5 @@ +@file:OptIn(LegacySyncImplementation::class) + package com.powersync.testutils import co.touchlab.kermit.ExperimentalKermitApi @@ -14,6 +16,7 @@ import com.powersync.connectors.PowerSyncCredentials import com.powersync.createPowerSyncDatabaseImpl import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema +import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncLine import dev.mokkery.answering.returns import dev.mokkery.everySuspend @@ -77,11 +80,12 @@ internal class ActiveDatabaseTest( val logger = Logger( TestConfig( - minSeverity = Severity.Debug, + minSeverity = Severity.Verbose, logWriterList = listOf(logWriter, generatePrintLogWriter()), ), ) + @OptIn(LegacySyncImplementation::class) var syncLines = Channel() var checkpointResponse: () -> WriteCheckpointResponse = { WriteCheckpointResponse(WriteCheckpointData("1000")) @@ -143,10 +147,7 @@ internal class ActiveDatabaseTest( item() } - var path = databaseName - testDirectory?.let { - path = Path(it, path).name - } + val path = Path(testDirectory, databaseName).name cleanup(path) } } diff --git a/core/src/commonMain/kotlin/com/powersync/ExperimentalPowerSyncAPI.kt b/core/src/commonMain/kotlin/com/powersync/ExperimentalPowerSyncAPI.kt new file mode 100644 index 00000000..8ec3c06b --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/ExperimentalPowerSyncAPI.kt @@ -0,0 +1,12 @@ +package com.powersync + +@RequiresOptIn(message = "This API is experimental and not covered by PowerSync semver releases. It can be changed at any time") +@Retention(AnnotationRetention.BINARY) +@Target( + AnnotationTarget.CLASS, + AnnotationTarget.FUNCTION, + AnnotationTarget.CONSTRUCTOR, + AnnotationTarget.PROPERTY, + AnnotationTarget.VALUE_PARAMETER, +) +public annotation class ExperimentalPowerSyncAPI diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index 43f278a3..d587932d 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -6,6 +6,7 @@ import com.powersync.db.Queries import com.powersync.db.crud.CrudBatch import com.powersync.db.crud.CrudTransaction import com.powersync.db.schema.Schema +import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus import com.powersync.utils.JsonParam import kotlin.coroutines.cancellation.CancellationException @@ -94,6 +95,7 @@ public interface PowerSyncDatabase : Queries { crudThrottleMs: Long = 1000L, retryDelayMs: Long = 5000L, params: Map = emptyMap(), + options: SyncOptions = SyncOptions.defaults, ) /** diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncException.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncException.kt index 0ac7a40d..6e31d720 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncException.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncException.kt @@ -2,5 +2,5 @@ package com.powersync public class PowerSyncException( message: String, - cause: Throwable, + cause: Throwable?, ) : Exception(message, cause) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt index 335b4429..2fe4c042 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt @@ -1,8 +1,10 @@ package com.powersync.bucket +import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +@LegacySyncImplementation @Serializable internal data class BucketChecksum( val bucket: String, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketRequest.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketRequest.kt index b797842c..34a5db60 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketRequest.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketRequest.kt @@ -1,7 +1,9 @@ package com.powersync.bucket +import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.Serializable +@LegacySyncImplementation @Serializable internal data class BucketRequest( val name: String, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index ab278ee0..6741777c 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -2,6 +2,8 @@ package com.powersync.bucket import com.powersync.db.crud.CrudEntry import com.powersync.db.internal.PowerSyncTransaction +import com.powersync.sync.Instruction +import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult @@ -25,20 +27,36 @@ internal interface BucketStorage { suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean - suspend fun saveSyncData(syncDataBatch: SyncDataBatch) + suspend fun hasCompletedSync(): Boolean + @LegacySyncImplementation suspend fun getBucketStates(): List + @LegacySyncImplementation suspend fun getBucketOperationProgress(): Map + @LegacySyncImplementation suspend fun removeBuckets(bucketsToDelete: List) - suspend fun hasCompletedSync(): Boolean + @LegacySyncImplementation + fun setTargetCheckpoint(checkpoint: Checkpoint) + @LegacySyncImplementation + suspend fun saveSyncData(syncDataBatch: SyncDataBatch) + + @LegacySyncImplementation suspend fun syncLocalDatabase( targetCheckpoint: Checkpoint, partialPriority: BucketPriority? = null, ): SyncLocalDatabaseResult - fun setTargetCheckpoint(checkpoint: Checkpoint) + suspend fun control( + op: String, + payload: String?, + ): List + + suspend fun control( + op: String, + payload: ByteArray, + ): List } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 75a23dfa..1bbc3a9e 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -8,27 +8,21 @@ import com.powersync.db.crud.CrudRow import com.powersync.db.internal.InternalDatabase import com.powersync.db.internal.InternalTable import com.powersync.db.internal.PowerSyncTransaction +import com.powersync.sync.Instruction +import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import com.powersync.utils.JsonUtil import kotlinx.serialization.Serializable -import kotlinx.serialization.encodeToString internal class BucketStorageImpl( private val db: InternalDatabase, private val logger: Logger, ) : BucketStorage { private var hasCompletedSync = AtomicBoolean(false) - private var pendingBucketDeletes = AtomicBoolean(false) - - /** - * Count up, and do a compact on startup. - */ - private var compactCounter = COMPACT_OPERATION_INTERVAL companion object { const val MAX_OP_ID = "9223372036854775807" - const val COMPACT_OPERATION_INTERVAL = 1_000 } override fun getMaxOpId(): String = MAX_OP_ID @@ -130,6 +124,7 @@ internal class BucketStorageImpl( } } + @LegacySyncImplementation override suspend fun saveSyncData(syncDataBatch: SyncDataBatch) { db.writeTransaction { tx -> val jsonString = JsonUtil.json.encodeToString(syncDataBatch) @@ -138,9 +133,9 @@ internal class BucketStorageImpl( listOf("save", jsonString), ) } - this.compactCounter += syncDataBatch.buckets.sumOf { it.data.size } } + @LegacySyncImplementation override suspend fun getBucketStates(): List = db.getAll( "SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0 AND name != '\$local'", @@ -152,6 +147,7 @@ internal class BucketStorageImpl( }, ) + @LegacySyncImplementation override suspend fun getBucketOperationProgress(): Map = buildMap { val rows = @@ -168,12 +164,7 @@ internal class BucketStorageImpl( } } - override suspend fun removeBuckets(bucketsToDelete: List) { - bucketsToDelete.forEach { bucketName -> - deleteBucket(bucketName) - } - } - + @LegacySyncImplementation private suspend fun deleteBucket(bucketName: String) { db.writeTransaction { tx -> tx.execute( @@ -183,8 +174,13 @@ internal class BucketStorageImpl( } Logger.d("[deleteBucket] Done deleting") + } - this.pendingBucketDeletes.value = true + @LegacySyncImplementation + override suspend fun removeBuckets(bucketsToDelete: List) { + bucketsToDelete.forEach { bucketName -> + deleteBucket(bucketName) + } } override suspend fun hasCompletedSync(): Boolean { @@ -208,6 +204,7 @@ internal class BucketStorageImpl( } } + @LegacySyncImplementation override suspend fun syncLocalDatabase( targetCheckpoint: Checkpoint, partialPriority: BucketPriority?, @@ -256,13 +253,12 @@ internal class BucketStorageImpl( ) } - this.forceCompact() - return SyncLocalDatabaseResult( ready = true, ) } + @LegacySyncImplementation private suspend fun validateChecksums( checkpoint: Checkpoint, priority: BucketPriority? = null, @@ -298,6 +294,7 @@ internal class BucketStorageImpl( * * This includes creating new tables, dropping old tables, and copying data over from the oplog. */ + @LegacySyncImplementation private suspend fun updateObjectsFromBuckets( checkpoint: Checkpoint, priority: BucketPriority? = null, @@ -356,54 +353,34 @@ internal class BucketStorageImpl( } } - private suspend fun forceCompact() { - // Reset counter - this.compactCounter = COMPACT_OPERATION_INTERVAL - this.pendingBucketDeletes.value = true - - this.autoCompact() + @LegacySyncImplementation + override fun setTargetCheckpoint(checkpoint: Checkpoint) { + // No-op } - private suspend fun autoCompact() { - // 1. Delete buckets - deletePendingBuckets() + private fun handleControlResult(cursor: SqlCursor): List { + val result = cursor.getString(0)!! + logger.v { "control result: $result" } - // 2. Clear REMOVE operations, only keeping PUT ones - clearRemoveOps() + return JsonUtil.json.decodeFromString>(result) } - private suspend fun deletePendingBuckets() { - if (!this.pendingBucketDeletes.value) { - return - } - + override suspend fun control( + op: String, + payload: String?, + ): List = db.writeTransaction { tx -> - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", - listOf("delete_pending_buckets", ""), - ) + logger.v { "powersync_control($op, $payload)" } - // Executed once after start-up, and again when there are pending deletes. - pendingBucketDeletes.value = false - } - } - - private suspend fun clearRemoveOps() { - if (this.compactCounter < COMPACT_OPERATION_INTERVAL) { - return + tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult) } + override suspend fun control( + op: String, + payload: ByteArray, + ): List = db.writeTransaction { tx -> - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", - listOf("clear_remove_ops", ""), - ) + logger.v { "powersync_control($op, binary payload)" } + tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult) } - this.compactCounter = 0 - } - - @Suppress("UNUSED_PARAMETER") - override fun setTargetCheckpoint(checkpoint: Checkpoint) { - // No-op for now - } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt index f0088dbf..5dc4823b 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt @@ -1,8 +1,10 @@ package com.powersync.bucket +import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +@LegacySyncImplementation @Serializable internal data class Checkpoint( @SerialName("last_op_id") val lastOpId: String, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/ChecksumCache.kt b/core/src/commonMain/kotlin/com/powersync/bucket/ChecksumCache.kt index bd1ce5c6..44faa005 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/ChecksumCache.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/ChecksumCache.kt @@ -1,8 +1,10 @@ package com.powersync.bucket +import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +@LegacySyncImplementation @Serializable internal data class ChecksumCache( @SerialName("last_op_id") val lostOpId: String, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt b/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt index 73f6a6fc..b18ae227 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt @@ -1,5 +1,8 @@ package com.powersync.bucket +import com.powersync.sync.LegacySyncImplementation + +@LegacySyncImplementation internal data class LocalOperationCounters( val atLast: Int, val sinceLast: Int, diff --git a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt index e1ce40af..9c8edbc3 100644 --- a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt +++ b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt @@ -33,7 +33,7 @@ public abstract class PowerSyncBackendConnector { public open suspend fun getCredentialsCached(): PowerSyncCredentials? { return runWrappedSuspending { cachedCredentials?.let { return@runWrappedSuspending it } - prefetchCredentials()?.join() + prefetchCredentials().join() cachedCredentials } } @@ -56,10 +56,10 @@ public abstract class PowerSyncBackendConnector { * This may be called before the current credentials have expired. */ @Throws(PowerSyncException::class, CancellationException::class) - public open suspend fun prefetchCredentials(): Job? { + public open fun prefetchCredentials(): Job { fetchRequest?.takeIf { it.isActive }?.let { return it } - fetchRequest = + val request = scope.launch { fetchCredentials().also { value -> cachedCredentials = value @@ -67,7 +67,8 @@ public abstract class PowerSyncBackendConnector { } } - return fetchRequest + fetchRequest = request + return request } /** diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index d58156e2..f2d9cf3a 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -18,6 +18,7 @@ import com.powersync.db.internal.PowerSyncVersion import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable import com.powersync.sync.PriorityStatusEntry +import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus import com.powersync.sync.SyncStatusData import com.powersync.sync.SyncStream @@ -29,7 +30,6 @@ import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancelAndJoin @@ -46,7 +46,6 @@ import kotlinx.datetime.Instant import kotlinx.datetime.LocalDateTime import kotlinx.datetime.TimeZone import kotlinx.datetime.toInstant -import kotlinx.serialization.encodeToString import kotlin.time.Duration.Companion.milliseconds /** @@ -153,12 +152,13 @@ internal class PowerSyncDatabaseImpl( crudThrottleMs: Long, retryDelayMs: Long, params: Map, + options: SyncOptions, ) { waitReady() mutex.withLock { disconnectInternal() - connectInternal( + connectInternal(crudThrottleMs) { scope -> SyncStream( bucketStorage = bucketStorage, connector = connector, @@ -166,23 +166,29 @@ internal class PowerSyncDatabaseImpl( retryDelayMs = retryDelayMs, logger = logger, params = params.toJsonObject(), - scope = scope, + uploadScope = scope, createClient = createClient, - ), - crudThrottleMs, - ) + options = options, + ) + } } } - @OptIn(FlowPreview::class) - internal fun connectInternal( - stream: SyncStream, + private fun connectInternal( crudThrottleMs: Long, + createStream: (CoroutineScope) -> SyncStream, ) { val db = this val job = SupervisorJob(scope.coroutineContext[Job]) syncSupervisorJob = job + var activeStream: SyncStream? = null + scope.launch(job) { + // Create the stream in this scope so that everything launched by the stream is bound to + // this coroutine scope that can be cancelled independently. + val stream = createStream(this) + activeStream = stream + launch { // Get a global lock for checking mutex maps val streamMutex = resource.group.syncMutex @@ -234,7 +240,7 @@ internal class PowerSyncDatabaseImpl( job.invokeOnCompletion { if (it is DisconnectRequestedException) { - stream.invalidateCredentials() + activeStream?.invalidateCredentials() } } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt b/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt new file mode 100644 index 00000000..1fe66704 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt @@ -0,0 +1,170 @@ +package com.powersync.sync + +import com.powersync.bucket.BucketPriority +import kotlinx.datetime.Instant +import kotlinx.serialization.KSerializer +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.encoding.CompositeDecoder +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.encoding.decodeStructure +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.serializer + +/** + * An instruction sent to this SDK by the core extension to implement sync behavior. + */ +@Serializable(with = Instruction.Serializer::class) +internal sealed interface Instruction { + @Serializable + data class LogLine( + val severity: String, + val line: String, + ) : Instruction + + @Serializable + data class UpdateSyncStatus( + val status: CoreSyncStatus, + ) : Instruction + + @Serializable + data class EstablishSyncStream( + val request: JsonObject, + ) : Instruction + + @Serializable + data class FetchCredentials( + @SerialName("did_expire") + val didExpire: Boolean, + ) : Instruction + + data object FlushSileSystem : Instruction + + data object CloseSyncStream : Instruction + + data object DidCompleteSync : Instruction + + data class UnknownInstruction( + val raw: JsonElement?, + ) : Instruction + + class Serializer : KSerializer { + private val logLine = serializer() + private val updateSyncStatus = serializer() + private val establishSyncStream = serializer() + private val fetchCredentials = serializer() + private val flushFileSystem = serializer() + private val closeSyncStream = serializer() + private val didCompleteSync = serializer() + + override val descriptor = + buildClassSerialDescriptor(Instruction::class.qualifiedName!!) { + element("LogLine", logLine.descriptor, isOptional = true) + element("UpdateSyncStatus", updateSyncStatus.descriptor, isOptional = true) + element("EstablishSyncStream", establishSyncStream.descriptor, isOptional = true) + element("FetchCredentials", fetchCredentials.descriptor, isOptional = true) + element("FlushFileSystem", flushFileSystem.descriptor, isOptional = true) + element("CloseSyncStream", closeSyncStream.descriptor, isOptional = true) + element("DidCompleteSync", didCompleteSync.descriptor, isOptional = true) + } + + override fun deserialize(decoder: Decoder): Instruction = + decoder.decodeStructure(descriptor) { + val value = + when (val index = decodeElementIndex(descriptor)) { + 0 -> decodeSerializableElement(descriptor, 0, logLine) + 1 -> decodeSerializableElement(descriptor, 1, updateSyncStatus) + 2 -> decodeSerializableElement(descriptor, 2, establishSyncStream) + 3 -> decodeSerializableElement(descriptor, 3, fetchCredentials) + 4 -> { + decodeSerializableElement(descriptor, 4, flushFileSystem) + FlushSileSystem + } + 5 -> { + decodeSerializableElement(descriptor, 5, closeSyncStream) + CloseSyncStream + } + 6 -> { + decodeSerializableElement(descriptor, 6, didCompleteSync) + DidCompleteSync + } + CompositeDecoder.UNKNOWN_NAME -> + UnknownInstruction( + decodeSerializableElement(descriptor, index, serializer()), + ) + CompositeDecoder.DECODE_DONE -> UnknownInstruction(null) + else -> error("Unexpected index: $index") + } + + if (decodeElementIndex(descriptor) != CompositeDecoder.DECODE_DONE) { + // Sync lines are single-key objects, make sure there isn't another one. + UnknownInstruction(null) + } else { + value + } + } + + override fun serialize( + encoder: Encoder, + value: Instruction, + ) { + // We don't need this functionality, so... + throw UnsupportedOperationException("Serializing instructions") + } + } +} + +@Serializable +internal data class CoreSyncStatus( + val connected: Boolean, + val connecting: Boolean, + val downloading: CoreDownloadProgress?, + @SerialName("priority_status") + val priorityStatus: List, +) + +@Serializable +internal data class CoreDownloadProgress( + val buckets: Map, +) + +@Serializable +internal data class CoreBucketProgress( + val priority: BucketPriority, + @SerialName("at_last") + val atLast: Long, + @SerialName("since_last") + val sinceLast: Long, + @SerialName("target_count") + val targetCount: Long, +) + +@Serializable +internal data class CorePriorityStatus( + val priority: BucketPriority, + @SerialName("last_synced_at") + @Serializable(with = InstantTimestampSerializer::class) + val lastSyncedAt: Instant?, + @SerialName("has_synced") + val hasSynced: Boolean?, +) + +private object InstantTimestampSerializer : KSerializer { + override val descriptor: SerialDescriptor + get() = PrimitiveSerialDescriptor("kotlinx.datetime.Instant", PrimitiveKind.LONG) + + override fun deserialize(decoder: Decoder): Instant = Instant.fromEpochSeconds(decoder.decodeLong()) + + override fun serialize( + encoder: Encoder, + value: Instant, + ) { + encoder.encodeLong(value.epochSeconds) + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/LegacySyncImplementation.kt b/core/src/commonMain/kotlin/com/powersync/sync/LegacySyncImplementation.kt new file mode 100644 index 00000000..f14077ff --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/LegacySyncImplementation.kt @@ -0,0 +1,10 @@ +package com.powersync.sync + +@RequiresOptIn( + message = + "Marker class for the old Kotlin-based sync implementation, making it easier to " + + "recognize classes we can remove after switching to the Rust sync implementation.", +) +@Retention(AnnotationRetention.BINARY) +@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.CONSTRUCTOR) +internal annotation class LegacySyncImplementation diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index 8c6b64e0..831f5e3c 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -65,8 +65,8 @@ internal data class ProgressInfo( * one-by-one. */ @ConsistentCopyVisibility -public data class SyncDownloadProgress private constructor( - private val buckets: Map, +public data class SyncDownloadProgress internal constructor( + private val buckets: Map, ) : ProgressWithOperations { override val downloadedOperations: Int override val totalOperations: Int @@ -77,10 +77,7 @@ public data class SyncDownloadProgress private constructor( downloadedOperations = completed } - /** - * Creates download progress information from the local progress counters since the last full sync and the target - * checkpoint. - */ + @LegacySyncImplementation internal constructor(localProgress: Map, target: Checkpoint) : this( buildMap { for (entry in target.checksums) { @@ -88,11 +85,11 @@ public data class SyncDownloadProgress private constructor( put( entry.bucket, - BucketProgress( + CoreBucketProgress( priority = entry.priority, - atLast = savedProgress?.atLast ?: 0, - sinceLast = savedProgress?.sinceLast ?: 0, - targetCount = entry.count ?: 0, + atLast = (savedProgress?.atLast ?: 0).toLong(), + sinceLast = (savedProgress?.sinceLast ?: 0).toLong(), + targetCount = (entry.count ?: 0).toLong(), ), ) } @@ -110,6 +107,7 @@ public data class SyncDownloadProgress private constructor( return ProgressInfo(totalOperations = total, downloadedOperations = completed) } + @LegacySyncImplementation internal fun incrementDownloaded(batch: SyncDataBatch): SyncDownloadProgress = SyncDownloadProgress( buildMap { @@ -131,16 +129,7 @@ public data class SyncDownloadProgress private constructor( buckets.values .asSequence() .filter { it.priority >= priority } - .fold(0 to 0) { (prevTarget, prevCompleted), entry -> - (prevTarget + entry.total) to (prevCompleted + entry.sinceLast) - } -} - -private data class BucketProgress( - val priority: BucketPriority, - val atLast: Int, - val sinceLast: Int, - val targetCount: Int, -) { - val total get(): Int = targetCount - atLast + .fold(0L to 0L) { (prevTarget, prevCompleted), entry -> + (prevTarget + entry.targetCount - entry.atLast) to (prevCompleted + entry.sinceLast) + }.let { it.first.toInt() to it.second.toInt() } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt b/core/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt new file mode 100644 index 00000000..12f3f43b --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt @@ -0,0 +1,131 @@ +package com.powersync.sync + +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.connectors.PowerSyncCredentials +import com.powersync.utils.JsonUtil +import io.ktor.client.HttpClient +import io.ktor.client.plugins.websocket.webSocketSession +import io.ktor.http.URLBuilder +import io.ktor.http.URLProtocol +import io.ktor.http.takeFrom +import io.rsocket.kotlin.core.RSocketConnector +import io.rsocket.kotlin.payload.PayloadMimeType +import io.rsocket.kotlin.payload.buildPayload +import io.rsocket.kotlin.payload.data +import io.rsocket.kotlin.payload.metadata +import io.rsocket.kotlin.transport.RSocketClientTarget +import io.rsocket.kotlin.transport.RSocketConnection +import io.rsocket.kotlin.transport.RSocketTransportApi +import io.rsocket.kotlin.transport.ktor.websocket.internal.KtorWebSocketConnection +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import kotlinx.io.readByteArray +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonObject +import kotlin.coroutines.CoroutineContext + +/** + * Connects to the RSocket endpoint for receiving sync lines. + * + * Note that we reconstruct the transport layer for RSocket by opening a WebSocket connection + * manually instead of using the high-level RSocket Ktor integration. + * The reason is that every request to the sync service needs its own metadata and data payload + * (e.g. to transmit the token), but the Ktor integration only supports setting a single payload for + * the entire client. + */ +@OptIn(RSocketTransportApi::class, ExperimentalPowerSyncAPI::class) +internal fun HttpClient.rSocketSyncStream( + options: ConnectionMethod.WebSocket, + req: JsonObject, + credentials: PowerSyncCredentials, +): Flow = + flow { + val flowContext = currentCoroutineContext() + + val websocketUri = + URLBuilder(credentials.endpointUri("sync/stream")).apply { + protocol = + when (protocolOrNull) { + URLProtocol.HTTP -> URLProtocol.WS + else -> URLProtocol.WSS + } + } + + // Note: We're using a custom connector here because we need to set options for each request + // without creating a new HTTP client each time. The recommended approach would be to add an + // RSocket extension to the HTTP client, but that only allows us to set the SETUP metadata for + // all connections (bad because we need a short-lived token in there). + // https://github.com/rsocket/rsocket-kotlin/issues/311 + val target = + object : RSocketClientTarget { + @RSocketTransportApi + override suspend fun connectClient(): RSocketConnection { + val ws = + webSocketSession { + url.takeFrom(websocketUri) + } + return KtorWebSocketConnection(ws) + } + + override val coroutineContext: CoroutineContext + get() = flowContext + } + + val connector = + RSocketConnector { + connectionConfig { + payloadMimeType = + PayloadMimeType( + metadata = "application/json", + data = "application/json", + ) + + setupPayload { + buildPayload { + data("{}") + metadata(JsonUtil.json.encodeToString(ConnectionSetupMetadata(token = "Bearer ${credentials.token}"))) + } + } + + keepAlive = options.keepAlive + } + } + + val rSocket = connector.connect(target) + val syncStream = + rSocket.requestStream( + buildPayload { + data(JsonUtil.json.encodeToString(req)) + metadata(JsonUtil.json.encodeToString(RequestStreamMetadata("/sync/stream"))) + }, + ) + + emitAll(syncStream.map { it.data.readByteArray() }.flowOn(Dispatchers.IO)) + } + +/** + * The metadata payload we need to use when connecting with RSocket. + * + * This corresponds to `RSocketContextMeta` on the sync service. + */ +@Serializable +private class ConnectionSetupMetadata( + val token: String, + @SerialName("user_agent") + val userAgent: String = userAgent(), +) + +/** + * The metadata payload we send for the `REQUEST_STREAM` frame. + */ +@Serializable +private class RequestStreamMetadata( + val path: String, +) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncRequest.kt b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncRequest.kt index 9c77ef8d..a09c15f4 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncRequest.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncRequest.kt @@ -5,6 +5,7 @@ import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonObject +@LegacySyncImplementation @Serializable internal data class StreamingSyncRequest( val buckets: List, diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBatch.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBatch.kt index 65efdb43..133aee21 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBatch.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBatch.kt @@ -2,6 +2,7 @@ package com.powersync.sync import kotlinx.serialization.Serializable +@LegacySyncImplementation @Serializable internal data class SyncDataBatch( val buckets: List, diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt index 242542cd..a9a9a4f3 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt @@ -16,6 +16,7 @@ import kotlinx.serialization.encoding.decodeStructure import kotlinx.serialization.encoding.encodeStructure import kotlinx.serialization.serializer +@LegacySyncImplementation @Serializable(with = SyncLineSerializer::class) internal sealed interface SyncLine { data class FullCheckpoint( @@ -57,6 +58,7 @@ internal sealed interface SyncLine { data object UnknownSyncLine : SyncLine } +@LegacySyncImplementation private class SyncLineSerializer : KSerializer { private val checkpoint = serializer() private val checkpointDiff = serializer() diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt new file mode 100644 index 00000000..d9b6dba2 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -0,0 +1,64 @@ +package com.powersync.sync + +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.PowerSyncDatabase +import io.rsocket.kotlin.keepalive.KeepAlive +import kotlin.time.Duration.Companion.seconds + +/** + * Experimental options that can be passed to [PowerSyncDatabase.connect] to specify an experimental + * connection mechanism. + * + * The new connection implementation is more efficient and we expect it to become the default in + * the future. At the moment, the implementation is not covered by the stability guarantees we offer + * for the rest of the SDK though. + */ +public class SyncOptions + @ExperimentalPowerSyncAPI + constructor( + @property:ExperimentalPowerSyncAPI + public val newClientImplementation: Boolean = false, + @property:ExperimentalPowerSyncAPI + public val method: ConnectionMethod = ConnectionMethod.Http, + ) { + public companion object { + @OptIn(ExperimentalPowerSyncAPI::class) + public val defaults: SyncOptions = SyncOptions() + } + } + +/** + * The connection method to use when the SDK connects to the sync service. + */ +@ExperimentalPowerSyncAPI +public sealed interface ConnectionMethod { + /** + * Receive sync lines via streamed HTTP response from the sync service. + * + * This mode is less efficient than [WebSocket] because it doesn't support backpressure + * properly and uses JSON instead of the more efficient BSON representation for sync lines. + * + * This is currently the default, but this will be changed once [WebSocket] support is stable. + */ + @ExperimentalPowerSyncAPI + public data object Http : ConnectionMethod + + /** + * Receive binary sync lines via RSocket over a WebSocket connection. + * + * This connection mode is currently experimental and requires a recent sync service to work. + * WebSocket support is only available when enabling the [SyncOptions.newClientImplementation]. + */ + @ExperimentalPowerSyncAPI + public data class WebSocket( + val keepAlive: KeepAlive = DefaultKeepAlive, + ) : ConnectionMethod { + private companion object { + val DefaultKeepAlive = + KeepAlive( + interval = 20.0.seconds, + maxLifetime = 30.0.seconds, + ) + } + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 067080b9..06710b7a 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -127,12 +127,35 @@ internal data class SyncStatusDataContainer( override val anyError get() = downloadError ?: uploadError + internal fun applyCoreChanges(status: CoreSyncStatus): SyncStatusDataContainer { + val completeSync = status.priorityStatus.firstOrNull { it.priority == BucketPriority.FULL_SYNC_PRIORITY } + + return copy( + connected = status.connected, + connecting = status.connecting, + downloading = status.downloading != null, + downloadProgress = status.downloading?.let { SyncDownloadProgress(it.buckets) }, + lastSyncedAt = completeSync?.lastSyncedAt, + hasSynced = completeSync != null, + priorityStatusEntries = + status.priorityStatus.map { + PriorityStatusEntry( + priority = it.priority, + lastSyncedAt = it.lastSyncedAt, + hasSynced = it.hasSynced, + ) + }, + ) + } + + @LegacySyncImplementation internal fun abortedDownload() = copy( downloading = false, downloadProgress = null, ) + @LegacySyncImplementation internal fun copyWithCompletedDownload() = copy( lastSyncedAt = Clock.System.now(), diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 81556310..0204892b 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -1,7 +1,10 @@ package com.powersync.sync import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity import co.touchlab.stately.concurrency.AtomicReference +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.PowerSyncException import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketRequest import com.powersync.bucket.BucketStorage @@ -13,9 +16,11 @@ import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig import io.ktor.client.call.body +import io.ktor.client.plugins.DefaultRequest import io.ktor.client.plugins.HttpTimeout import io.ktor.client.plugins.contentnegotiation.ContentNegotiation import io.ktor.client.plugins.timeout +import io.ktor.client.plugins.websocket.WebSockets import io.ktor.client.request.get import io.ktor.client.request.headers import io.ktor.client.request.preparePost @@ -31,14 +36,23 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import kotlinx.datetime.Clock -import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.encodeToJsonElement +@OptIn(ExperimentalPowerSyncAPI::class) internal class SyncStream( private val bucketStorage: BucketStorage, private val connector: PowerSyncBackendConnector, @@ -46,15 +60,17 @@ internal class SyncStream( private val retryDelayMs: Long = 5000L, private val logger: Logger, private val params: JsonObject, - private val scope: CoroutineScope, + private val uploadScope: CoroutineScope, + private val options: SyncOptions, createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient, ) { private var isUploadingCrud = AtomicReference(null) + private var completedCrudUploads = Channel(onBufferOverflow = BufferOverflow.DROP_OLDEST) /** - * The current sync status. This instance is updated as changes occur + * The current sync status. This instance is mutated as changes occur */ - var status = SyncStatus() + val status = SyncStatus() private var clientId: String? = null @@ -62,6 +78,13 @@ internal class SyncStream( createClient { install(HttpTimeout) install(ContentNegotiation) + install(WebSockets) + + install(DefaultRequest) { + headers { + append("User-Agent", userAgent()) + } + } } fun invalidateCredentials() { @@ -96,7 +119,7 @@ internal class SyncStream( } fun triggerCrudUploadAsync(): Job = - scope.launch { + uploadScope.launch { val thisIteration = PendingCrudUpload(CompletableDeferred()) var holdingUploadLock = false @@ -109,6 +132,7 @@ internal class SyncStream( uploadAllCrud() } finally { if (holdingUploadLock) { + completedCrudUploads.send(Unit) isUploadingCrud.set(null) } @@ -181,7 +205,7 @@ internal class SyncStream( return body.data.writeCheckpoint } - private fun streamingSyncRequest(req: StreamingSyncRequest): Flow = + private fun connectViaHttp(req: JsonElement): Flow = flow { val credentials = connector.getCredentialsCached() require(credentials != null) { "Not logged in" } @@ -221,242 +245,418 @@ internal class SyncStream( } } - private suspend fun streamingSyncIteration(): SyncStreamState { - val bucketEntries = bucketStorage.getBucketStates() - val initialBuckets = mutableMapOf() - - var state = - SyncStreamState( - targetCheckpoint = null, - validatedCheckpoint = null, - appliedCheckpoint = null, - bucketSet = initialBuckets.keys.toMutableSet(), + private fun connectViaWebSocket( + req: JsonObject, + options: ConnectionMethod.WebSocket, + ): Flow = + flow { + val credentials = requireNotNull(connector.getCredentialsCached()) { "Not logged in" } + + emitAll( + httpClient.rSocketSyncStream( + options = options, + req = req, + credentials = credentials, + ), ) + } - bucketEntries.forEach { entry -> - initialBuckets[entry.bucket] = entry.opId + private suspend fun streamingSyncIteration() { + coroutineScope { + if (options.newClientImplementation) { + val iteration = ActiveIteration(this) + + try { + iteration.start() + } finally { + // This can't be cancelled because we need to send a stop message, which is async, to + // clean up resources. + withContext(NonCancellable) { + iteration.stop() + } + } + } else { + legacySyncIteration() + } } + } - val req = - StreamingSyncRequest( - buckets = initialBuckets.map { (bucket, after) -> BucketRequest(bucket, after) }, - clientId = clientId!!, - parameters = params, - ) + @OptIn(LegacySyncImplementation::class) + private suspend fun CoroutineScope.legacySyncIteration() { + LegacyIteration(this).streamingSyncIteration() + } - streamingSyncRequest(req).collect { value -> - val line = JsonUtil.json.decodeFromString(value) + /** + * Implementation of a sync iteration that delegates to helper functions implemented in the + * Rust core extension. + * + * This avoids us having to decode sync lines in Kotlin, unlocking the RSocket protocol and + * improving performance. + */ + private inner class ActiveIteration( + val scope: CoroutineScope, + var fetchLinesJob: Job? = null, + var credentialsInvalidation: Job? = null, + ) { + suspend fun start() { + control("start", JsonUtil.json.encodeToString(params)) + fetchLinesJob?.join() + } - state = handleInstruction(line, value, state) + suspend fun stop() { + control("stop") + fetchLinesJob?.join() + } - if (state.abortIteration) { - return@collect - } + private suspend fun control( + op: String, + payload: String? = null, + ) { + val instructions = bucketStorage.control(op, payload) + handleInstructions(instructions) } - status.update { abortedDownload() } + private suspend fun control( + op: String, + payload: ByteArray, + ) { + val instructions = bucketStorage.control(op, payload) + handleInstructions(instructions) + } - return state + private suspend fun handleInstructions(instructions: List) { + instructions.forEach { handleInstruction(it) } + } + + private suspend fun handleInstruction(instruction: Instruction) { + when (instruction) { + is Instruction.EstablishSyncStream -> { + fetchLinesJob?.cancelAndJoin() + fetchLinesJob = + scope.launch { + launch { + for (completion in completedCrudUploads) { + control("completed_upload") + } + } + + launch { + connect(instruction) + } + } + } + Instruction.CloseSyncStream -> { + fetchLinesJob!!.cancelAndJoin() + fetchLinesJob = null + } + Instruction.FlushSileSystem -> { + // We have durable file systems, so flushing is not necessary + } + is Instruction.LogLine -> { + logger.log( + severity = + when (instruction.severity) { + "DEBUG" -> Severity.Debug + "INFO" -> Severity.Debug + else -> Severity.Warn + }, + message = instruction.line, + tag = logger.tag, + throwable = null, + ) + } + is Instruction.UpdateSyncStatus -> { + status.update { + applyCoreChanges(instruction.status) + } + } + is Instruction.FetchCredentials -> { + if (instruction.didExpire) { + connector.invalidateCredentials() + } else { + // Token expires soon - refresh it in the background + if (credentialsInvalidation == null) { + val job = + scope.launch { + connector.prefetchCredentials().join() + + // Token has been refreshed, start another iteration + stop() + } + job.invokeOnCompletion { + credentialsInvalidation = null + } + credentialsInvalidation = job + } + } + } + Instruction.DidCompleteSync -> { + status.update { copy(downloadError = null) } + } + is Instruction.UnknownInstruction -> { + throw PowerSyncException("Unknown instruction received from core extension: ${instruction.raw}", null) + } + } + } + + private suspend fun connect(start: Instruction.EstablishSyncStream) { + when (val method = options.method) { + ConnectionMethod.Http -> + connectViaHttp(start.request).collect { rawLine -> + control("line_text", rawLine) + } + is ConnectionMethod.WebSocket -> + connectViaWebSocket(start.request, method).collect { binaryLine -> + control("line_binary", binaryLine) + } + } + } } - private suspend fun handleInstruction( - line: SyncLine, - jsonString: String, - state: SyncStreamState, - ): SyncStreamState = - when (line) { - is SyncLine.FullCheckpoint -> handleStreamingSyncCheckpoint(line, state) - is SyncLine.CheckpointDiff -> handleStreamingSyncCheckpointDiff(line, state) - is SyncLine.CheckpointComplete -> handleStreamingSyncCheckpointComplete(state) - is SyncLine.CheckpointPartiallyComplete -> - handleStreamingSyncCheckpointPartiallyComplete( - line, - state, + @LegacySyncImplementation + private inner class LegacyIteration( + val scope: CoroutineScope, + ) { + suspend fun streamingSyncIteration() { + val bucketEntries = bucketStorage.getBucketStates() + val initialBuckets = mutableMapOf() + + var state = + SyncStreamState( + targetCheckpoint = null, + validatedCheckpoint = null, + appliedCheckpoint = null, + bucketSet = initialBuckets.keys.toMutableSet(), ) - is SyncLine.KeepAlive -> handleStreamingKeepAlive(line, state) - is SyncLine.SyncDataBucket -> handleStreamingSyncData(line, state) - SyncLine.UnknownSyncLine -> { - logger.w { "Unhandled instruction $jsonString" } - state + bucketEntries.forEach { entry -> + initialBuckets[entry.bucket] = entry.opId } - } - private suspend fun handleStreamingSyncCheckpoint( - line: SyncLine.FullCheckpoint, - state: SyncStreamState, - ): SyncStreamState { - val (checkpoint) = line - state.targetCheckpoint = checkpoint + val req = + StreamingSyncRequest( + buckets = initialBuckets.map { (bucket, after) -> BucketRequest(bucket, after) }, + clientId = clientId!!, + parameters = params, + ) - val bucketsToDelete = state.bucketSet!!.toMutableList() - val newBuckets = mutableSetOf() + lateinit var receiveLines: Job + receiveLines = + scope.launch { + connectViaHttp(JsonUtil.json.encodeToJsonElement(req)).collect { value -> + val line = JsonUtil.json.decodeFromString(value) - checkpoint.checksums.forEach { checksum -> - run { - newBuckets.add(checksum.bucket) - bucketsToDelete.remove(checksum.bucket) - } + state = handleInstruction(line, value, state) + + if (state.abortIteration) { + receiveLines.cancel() + } + } + } + + receiveLines.join() + status.update { abortedDownload() } } - state.bucketSet = newBuckets - startTrackingCheckpoint(checkpoint, bucketsToDelete) + private suspend fun handleInstruction( + line: SyncLine, + jsonString: String, + state: SyncStreamState, + ): SyncStreamState = + when (line) { + is SyncLine.FullCheckpoint -> handleStreamingSyncCheckpoint(line, state) + is SyncLine.CheckpointDiff -> handleStreamingSyncCheckpointDiff(line, state) + is SyncLine.CheckpointComplete -> handleStreamingSyncCheckpointComplete(state) + is SyncLine.CheckpointPartiallyComplete -> + handleStreamingSyncCheckpointPartiallyComplete( + line, + state, + ) + + is SyncLine.KeepAlive -> handleStreamingKeepAlive(line, state) + is SyncLine.SyncDataBucket -> handleStreamingSyncData(line, state) + SyncLine.UnknownSyncLine -> { + logger.w { "Unhandled instruction $jsonString" } + state + } + } - return state - } + private suspend fun handleStreamingSyncCheckpoint( + line: SyncLine.FullCheckpoint, + state: SyncStreamState, + ): SyncStreamState { + val (checkpoint) = line + state.targetCheckpoint = checkpoint - private suspend fun startTrackingCheckpoint( - checkpoint: Checkpoint, - bucketsToDelete: List, - ) { - val progress = bucketStorage.getBucketOperationProgress() - status.update { - copy( - downloading = true, - downloadProgress = SyncDownloadProgress(progress, checkpoint), - ) - } + val bucketsToDelete = state.bucketSet!!.toMutableList() + val newBuckets = mutableSetOf() - if (bucketsToDelete.isNotEmpty()) { - logger.i { "Removing buckets [${bucketsToDelete.joinToString(separator = ", ")}]" } - } + checkpoint.checksums.forEach { checksum -> + run { + newBuckets.add(checksum.bucket) + bucketsToDelete.remove(checksum.bucket) + } + } - bucketStorage.removeBuckets(bucketsToDelete) - bucketStorage.setTargetCheckpoint(checkpoint) - } + state.bucketSet = newBuckets + startTrackingCheckpoint(checkpoint, bucketsToDelete) - private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState { - val checkpoint = state.targetCheckpoint!! - var result = bucketStorage.syncLocalDatabase(checkpoint) - val pending = isUploadingCrud.get() - - if (!result.checkpointValid) { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - delay(50) - state.abortIteration = true - // TODO handle retries return state - } else if (!result.ready && pending != null) { - // We have pending entries in the local upload queue or are waiting to confirm a write checkpoint, which - // prevented this checkpoint from applying. Wait for that to complete and try again. - logger.d { "Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying." } - pending.done.await() - - result = bucketStorage.syncLocalDatabase(checkpoint) } - if (result.checkpointValid && result.ready) { - state.appliedCheckpoint = checkpoint.clone() - logger.i { "validated checkpoint ${state.appliedCheckpoint}" } + private suspend fun startTrackingCheckpoint( + checkpoint: Checkpoint, + bucketsToDelete: List, + ) { + val progress = bucketStorage.getBucketOperationProgress() + status.update { + copy( + downloading = true, + downloadProgress = SyncDownloadProgress(progress, checkpoint), + ) + } + + if (bucketsToDelete.isNotEmpty()) { + logger.i { "Removing buckets [${bucketsToDelete.joinToString(separator = ", ")}]" } + } - state.validatedCheckpoint = state.targetCheckpoint - status.update { copyWithCompletedDownload() } - } else { - logger.d { "Could not apply checkpoint. Waiting for next sync complete line" } + bucketStorage.removeBuckets(bucketsToDelete) + bucketStorage.setTargetCheckpoint(checkpoint) } - return state - } + private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState { + val checkpoint = state.targetCheckpoint!! + var result = bucketStorage.syncLocalDatabase(checkpoint) + val pending = isUploadingCrud.get() + + if (!result.checkpointValid) { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + delay(50) + state.abortIteration = true + // TODO handle retries + return state + } else if (!result.ready && pending != null) { + // We have pending entries in the local upload queue or are waiting to confirm a write checkpoint, which + // prevented this checkpoint from applying. Wait for that to complete and try again. + logger.d { "Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying." } + pending.done.await() + + result = bucketStorage.syncLocalDatabase(checkpoint) + } + + if (result.checkpointValid && result.ready) { + state.appliedCheckpoint = checkpoint.clone() + logger.i { "validated checkpoint ${state.appliedCheckpoint}" } + + state.validatedCheckpoint = state.targetCheckpoint + status.update { copyWithCompletedDownload() } + } else { + logger.d { "Could not apply checkpoint. Waiting for next sync complete line" } + } - private suspend fun handleStreamingSyncCheckpointPartiallyComplete( - line: SyncLine.CheckpointPartiallyComplete, - state: SyncStreamState, - ): SyncStreamState { - val priority = line.priority - val result = bucketStorage.syncLocalDatabase(state.targetCheckpoint!!, priority) - if (!result.checkpointValid) { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - delay(50) - state.abortIteration = true - // TODO handle retries return state - } else if (!result.ready) { - // Checkpoint is valid, but we have local data preventing this to be published. We'll try to resolve this - // once we have a complete checkpoint if the problem persists. - } else { - logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" } } - status.update { - copy( - priorityStatusEntries = - buildList { - // All states with a higher priority can be deleted since this partial sync includes them. - addAll(status.priorityStatusEntries.filter { it.priority >= line.priority }) - add( - PriorityStatusEntry( - priority = priority, - lastSyncedAt = Clock.System.now(), - hasSynced = true, - ), - ) - }, - ) - } - return state - } + private suspend fun handleStreamingSyncCheckpointPartiallyComplete( + line: SyncLine.CheckpointPartiallyComplete, + state: SyncStreamState, + ): SyncStreamState { + val priority = line.priority + val result = bucketStorage.syncLocalDatabase(state.targetCheckpoint!!, priority) + if (!result.checkpointValid) { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + delay(50) + state.abortIteration = true + // TODO handle retries + return state + } else if (!result.ready) { + // Checkpoint is valid, but we have local data preventing this to be published. We'll try to resolve this + // once we have a complete checkpoint if the problem persists. + } else { + logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" } + } - private suspend fun handleStreamingSyncCheckpointDiff( - checkpointDiff: SyncLine.CheckpointDiff, - state: SyncStreamState, - ): SyncStreamState { - // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint - if (state.targetCheckpoint == null) { - throw Exception("Checkpoint diff without previous checkpoint") + status.update { + copy( + priorityStatusEntries = + buildList { + // All states with a higher priority can be deleted since this partial sync includes them. + addAll(status.priorityStatusEntries.filter { it.priority >= line.priority }) + add( + PriorityStatusEntry( + priority = priority, + lastSyncedAt = Clock.System.now(), + hasSynced = true, + ), + ) + }, + ) + } + return state } - val newBuckets = mutableMapOf() + private suspend fun handleStreamingSyncCheckpointDiff( + checkpointDiff: SyncLine.CheckpointDiff, + state: SyncStreamState, + ): SyncStreamState { + // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint + if (state.targetCheckpoint == null) { + throw Exception("Checkpoint diff without previous checkpoint") + } - state.targetCheckpoint!!.checksums.forEach { checksum -> - newBuckets[checksum.bucket] = checksum - } - checkpointDiff.updatedBuckets.forEach { checksum -> - newBuckets[checksum.bucket] = checksum - } + val newBuckets = mutableMapOf() - checkpointDiff.removedBuckets.forEach { bucket -> newBuckets.remove(bucket) } + state.targetCheckpoint!!.checksums.forEach { checksum -> + newBuckets[checksum.bucket] = checksum + } + checkpointDiff.updatedBuckets.forEach { checksum -> + newBuckets[checksum.bucket] = checksum + } - val newCheckpoint = - Checkpoint( - lastOpId = checkpointDiff.lastOpId, - checksums = newBuckets.values.toList(), - writeCheckpoint = checkpointDiff.writeCheckpoint, - ) + checkpointDiff.removedBuckets.forEach { bucket -> newBuckets.remove(bucket) } - state.targetCheckpoint = newCheckpoint - startTrackingCheckpoint(newCheckpoint, checkpointDiff.removedBuckets) + val newCheckpoint = + Checkpoint( + lastOpId = checkpointDiff.lastOpId, + checksums = newBuckets.values.toList(), + writeCheckpoint = checkpointDiff.writeCheckpoint, + ) - return state - } + state.targetCheckpoint = newCheckpoint + startTrackingCheckpoint(newCheckpoint, checkpointDiff.removedBuckets) - private suspend fun handleStreamingSyncData( - data: SyncLine.SyncDataBucket, - state: SyncStreamState, - ): SyncStreamState { - val batch = SyncDataBatch(listOf(data)) - status.update { copy(downloading = true, downloadProgress = downloadProgress?.incrementDownloaded(batch)) } - bucketStorage.saveSyncData(batch) - return state - } + return state + } - private suspend fun handleStreamingKeepAlive( - keepAlive: SyncLine.KeepAlive, - state: SyncStreamState, - ): SyncStreamState { - val (tokenExpiresIn) = keepAlive + private suspend fun handleStreamingSyncData( + data: SyncLine.SyncDataBucket, + state: SyncStreamState, + ): SyncStreamState { + val batch = SyncDataBatch(listOf(data)) + status.update { copy(downloading = true, downloadProgress = downloadProgress?.incrementDownloaded(batch)) } + bucketStorage.saveSyncData(batch) + return state + } - if (tokenExpiresIn <= 0) { - // Connection would be closed automatically right after this - logger.i { "Token expiring reconnect" } - connector.invalidateCredentials() - state.abortIteration = true + private fun handleStreamingKeepAlive( + keepAlive: SyncLine.KeepAlive, + state: SyncStreamState, + ): SyncStreamState { + val (tokenExpiresIn) = keepAlive + + if (tokenExpiresIn <= 0) { + // Connection would be closed automatically right after this + logger.i { "Token expiring reconnect" } + connector.invalidateCredentials() + state.abortIteration = true + return state + } + // Don't await the upload job, we can keep receiving sync lines + triggerCrudUploadAsync() return state } - // Don't await the upload job, we can keep receiving sync lines - triggerCrudUploadAsync() - return state } internal companion object { @@ -467,6 +667,7 @@ internal class SyncStream( } } +@LegacySyncImplementation internal data class SyncStreamState( var targetCheckpoint: Checkpoint?, var validatedCheckpoint: Checkpoint?, diff --git a/core/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt b/core/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt new file mode 100644 index 00000000..d75f0400 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt @@ -0,0 +1,3 @@ +package com.powersync.sync + +internal fun userAgent(): String = "PowerSync Kotlin SDK" diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt index 0347d967..b169f88f 100644 --- a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -1,5 +1,4 @@ import co.touchlab.kermit.Logger -import com.powersync.bucket.BucketState import com.powersync.bucket.BucketStorageImpl import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType @@ -148,27 +147,6 @@ class BucketStorageTest { assertTrue(result) } - @Test - fun testGetBucketStates() = - runTest { - val mockBucketStates = listOf(BucketState("bucket1", "op1"), BucketState("bucket2", "op2")) - mockDb = - mock { - everySuspend { - getOptional( - any(), - any(), - any(), - ) - } returns 1L - everySuspend { getAll(any(), any(), any()) } returns mockBucketStates - } - bucketStorage = BucketStorageImpl(mockDb, Logger) - - val result = bucketStorage.getBucketStates() - assertEquals(mockBucketStates, result) - } - // TODO: Add tests for removeBuckets, hasCompletedSync, syncLocalDatabase currently not covered because // currently the internal methods are private and cannot be accessed from the test class } diff --git a/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncRequestTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncRequestTest.kt index 0d9fb85d..e31afa63 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncRequestTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncRequestTest.kt @@ -1,4 +1,5 @@ import com.powersync.bucket.BucketRequest +import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.StreamingSyncRequest import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json @@ -9,6 +10,7 @@ import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue +@OptIn(LegacySyncImplementation::class) class StreamingSyncRequestTest { private val json = Json { ignoreUnknownKeys = true } diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt index 50a88cd9..5236d1e0 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt @@ -7,6 +7,7 @@ import com.powersync.utils.JsonUtil import kotlin.test.Test import kotlin.test.assertEquals +@OptIn(LegacySyncImplementation::class) class SyncLineTest { private fun checkDeserializing( expected: SyncLine, diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 558ada3c..41cee90a 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -1,51 +1,33 @@ package com.powersync.sync -import app.cash.turbine.turbineScope +import co.touchlab.kermit.ExperimentalKermitApi import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig import co.touchlab.kermit.TestLogWriter -import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority +import com.powersync.ExperimentalPowerSyncAPI import com.powersync.bucket.BucketStorage -import com.powersync.bucket.Checkpoint -import com.powersync.bucket.OpType -import com.powersync.bucket.OplogEntry -import com.powersync.bucket.WriteCheckpointData -import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType -import com.powersync.testutils.MockSyncService -import com.powersync.testutils.waitFor -import com.powersync.utils.JsonUtil import dev.mokkery.answering.returns import dev.mokkery.everySuspend -import dev.mokkery.matcher.any import dev.mokkery.mock -import dev.mokkery.resetCalls import dev.mokkery.verify -import dev.mokkery.verify.VerifyMode.Companion.order -import dev.mokkery.verifyNoMoreCalls -import dev.mokkery.verifySuspend import io.ktor.client.HttpClient import io.ktor.client.engine.mock.MockEngine -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay -import kotlinx.coroutines.job import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout -import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonObject import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertContains import kotlin.test.assertEquals -import kotlin.time.Duration.Companion.seconds -@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) +@OptIn(ExperimentalKermitApi::class, ExperimentalPowerSyncAPI::class) class SyncStreamTest { private lateinit var bucketStorage: BucketStorage private lateinit var connector: PowerSyncBackendConnector @@ -71,17 +53,6 @@ class SyncStreamTest { bucketStorage = mock { everySuspend { getClientId() } returns "test-client-id" - everySuspend { getBucketStates() } returns emptyList() - everySuspend { removeBuckets(any()) } returns Unit - everySuspend { setTargetCheckpoint(any()) } returns Unit - everySuspend { saveSyncData(any()) } returns Unit - everySuspend { syncLocalDatabase(any(), any()) } returns - SyncLocalDatabaseResult( - ready = true, - checkpointValid = true, - checkpointFailures = emptyList(), - ) - everySuspend { getBucketOperationProgress() } returns mapOf() } connector = mock { @@ -109,7 +80,8 @@ class SyncStreamTest { uploadCrud = {}, logger = logger, params = JsonObject(emptyMap()), - scope = this, + uploadScope = this, + options = SyncOptions(), ) syncStream.invalidateCredentials() @@ -146,7 +118,8 @@ class SyncStreamTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), - scope = this, + uploadScope = this, + options = SyncOptions(), ) syncStream.status.update { copy(connected = true) } @@ -174,7 +147,6 @@ class SyncStreamTest { bucketStorage = mock { everySuspend { getClientId() } returns "test-client-id" - everySuspend { getBucketStates() } returns emptyList() } syncStream = @@ -186,7 +158,8 @@ class SyncStreamTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), - scope = this, + uploadScope = this, + options = SyncOptions(), ) // Launch streaming sync in a coroutine that we'll cancel after verification @@ -209,123 +182,4 @@ class SyncStreamTest { // Clean up job.cancel() } - - @Test - fun testPartialSync() = - runTest { - // TODO: It would be neat if we could use in-memory sqlite instances instead of mocking everything - // Revisit https://github.com/powersync-ja/powersync-kotlin/pull/117/files at some point - val syncLines = Channel() - val client = MockSyncService(syncLines, { WriteCheckpointResponse(WriteCheckpointData("1000")) }) - - syncStream = - SyncStream( - bucketStorage = bucketStorage, - connector = connector, - createClient = { config -> HttpClient(client, config) }, - uploadCrud = { }, - retryDelayMs = 10, - logger = logger, - params = JsonObject(emptyMap()), - scope = this, - ) - - val job = launch { syncStream.streamingSync() } - var operationId = 1 - - suspend fun pushData(priority: Int) { - val id = operationId++ - - syncLines.send( - SyncLine.SyncDataBucket( - bucket = "prio$priority", - data = - listOf( - OplogEntry( - checksum = (priority + 10).toLong(), - data = JsonUtil.json.encodeToString(mapOf("foo" to "bar")), - op = OpType.PUT, - opId = id.toString(), - rowId = "prio$priority", - rowType = "customers", - ), - ), - after = null, - nextAfter = null, - ), - ) - } - - turbineScope(timeout = 10.0.seconds) { - val turbine = syncStream.status.asFlow().testIn(this) - turbine.waitFor { it.connected } - resetCalls(bucketStorage) - - // Start a sync flow - syncLines.send( - SyncLine.FullCheckpoint( - Checkpoint( - lastOpId = "4", - checksums = - buildList { - for (priority in 0..3) { - add( - BucketChecksum( - bucket = "prio$priority", - priority = BucketPriority(priority), - checksum = 10 + priority, - ), - ) - } - }, - ), - ), - ) - - // Emit a partial sync complete for each priority but the last. - for (priorityNo in 0..<3) { - val priority = BucketPriority(priorityNo) - pushData(priorityNo) - syncLines.send( - SyncLine.CheckpointPartiallyComplete( - lastOpId = operationId.toString(), - priority = priority, - ), - ) - - turbine.waitFor { it.statusForPriority(priority).hasSynced == true } - - verifySuspend(order) { - if (priorityNo == 0) { - bucketStorage.getBucketOperationProgress() - bucketStorage.removeBuckets(any()) - bucketStorage.setTargetCheckpoint(any()) - } - - bucketStorage.saveSyncData(any()) - bucketStorage.syncLocalDatabase(any(), priority) - } - } - - // Then complete the sync - pushData(3) - syncLines.send( - SyncLine.CheckpointComplete( - lastOpId = operationId.toString(), - ), - ) - - turbine.waitFor { it.hasSynced == true } - verifySuspend { - bucketStorage.saveSyncData(any()) - bucketStorage.syncLocalDatabase(any(), null) - } - - turbine.cancel() - } - - verifyNoMoreCalls(bucketStorage) - job.cancel() - syncLines.close() - } } diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 42831a11..591750dc 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -2,6 +2,7 @@ package com.powersync.testutils import app.cash.turbine.ReceiveTurbine import com.powersync.bucket.WriteCheckpointResponse +import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncLine import com.powersync.sync.SyncStatusData import com.powersync.utils.JsonUtil @@ -32,6 +33,7 @@ import kotlinx.serialization.encodeToString * function which makes it very hard to cancel the channel when the sync client closes the request stream. That is * precisely what we may want to test though. */ +@OptIn(LegacySyncImplementation::class) internal class MockSyncService( private val lines: ReceiveChannel, private val generateCheckpoint: () -> WriteCheckpointResponse, diff --git a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt index 39864b54..aef9b6e4 100644 --- a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt +++ b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt @@ -32,8 +32,9 @@ public actual class DatabaseDriverFactory { migrateDriver(driver, schema) + // TODO Revert driver.loadExtensions( - powersyncExtension to "sqlite3_powersync_init", + "/Users/simon/src/powersync-sqlite-core/target/debug/libpowersync.dylib" to "sqlite3_powersync_init", ) val mappedDriver = PsSqlDriver(driver = driver) diff --git a/demos/supabase-todolist/desktopApp/src/jvmMain/kotlin/com/powersync/demos/Main.kt b/demos/supabase-todolist/desktopApp/src/jvmMain/kotlin/com/powersync/demos/Main.kt index 483df5c0..f4dca20e 100644 --- a/demos/supabase-todolist/desktopApp/src/jvmMain/kotlin/com/powersync/demos/Main.kt +++ b/demos/supabase-todolist/desktopApp/src/jvmMain/kotlin/com/powersync/demos/Main.kt @@ -7,9 +7,15 @@ import androidx.compose.ui.window.Window import androidx.compose.ui.window.WindowPosition import androidx.compose.ui.window.application import androidx.compose.ui.window.rememberWindowState +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.platformLogWriter fun main() { + Logger.setLogWriters(platformLogWriter()) + Logger.setMinSeverity(Severity.Verbose) + application { Window( onCloseRequest = ::exitApplication, diff --git a/demos/supabase-todolist/gradle/libs.versions.toml b/demos/supabase-todolist/gradle/libs.versions.toml index 81da7d11..59ef9387 100644 --- a/demos/supabase-todolist/gradle/libs.versions.toml +++ b/demos/supabase-todolist/gradle/libs.versions.toml @@ -11,7 +11,7 @@ kotlin = "2.1.10" coroutines = "1.8.1" kotlinx-datetime = "0.6.2" kotlinx-io = "0.5.4" -ktor = "3.0.1" +ktor = "3.1.0" sqliteJdbc = "3.45.2.0" uuid = "0.8.2" buildKonfig = "0.15.1" diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/Auth.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/Auth.kt index 846cd531..8a266636 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/Auth.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/Auth.kt @@ -5,6 +5,8 @@ import androidx.lifecycle.viewModelScope import co.touchlab.kermit.Logger import com.powersync.PowerSyncDatabase import com.powersync.connector.supabase.SupabaseConnector +import com.powersync.sync.ConnectionMethod +import com.powersync.sync.SyncOptions import io.github.jan.supabase.auth.status.RefreshFailureCause import io.github.jan.supabase.auth.status.SessionStatus import kotlinx.coroutines.flow.MutableStateFlow @@ -44,7 +46,9 @@ internal class AuthViewModel( supabase.sessionStatus.collect { when (it) { is SessionStatus.Authenticated -> { - db.connect(supabase) + // TODO REMOVE + val options = SyncOptions(method = ConnectionMethod.WebSocket()) + db.connect(supabase, options = options) } is SessionStatus.NotAuthenticated -> { db.disconnectAndClear() diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ef5207fe..87b58674 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -15,7 +15,8 @@ kotlin = "2.1.10" coroutines = "1.8.1" kotlinx-datetime = "0.6.2" kotlinx-io = "0.5.4" -ktor = "3.0.1" +ktor = "3.1.0" +rsocket = "0.20.0" uuid = "0.8.2" powersync-core = "0.3.14" sqlite-jdbc = "3.49.1.0" @@ -86,6 +87,8 @@ ktor-client-contentnegotiation = { module = "io.ktor:ktor-client-content-negotia ktor-client-mock = { module = "io.ktor:ktor-client-mock", version.ref = "ktor" } ktor-serialization-json = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" } kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" } +rsocket-core = { module = "io.rsocket.kotlin:rsocket-core", version.ref = "rsocket" } +rsocket-transport-websocket = { module = "io.rsocket.kotlin:rsocket-transport-ktor-websocket-internal", version.ref = "rsocket" } sqldelight-driver-native = { module = "app.cash.sqldelight:native-driver", version.ref = "sqlDelight" } sqliter = { module = "co.touchlab:sqliter-driver", version.ref = "sqliter" }