Skip to content

WIP: Add sync implementation from core extension #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ kotlin {
implementation(libs.ktor.client.contentnegotiation)
implementation(libs.ktor.serialization.json)
implementation(libs.kotlinx.io)
api(libs.rsocket.core)
implementation(libs.rsocket.transport.websocket)
implementation(libs.kotlinx.coroutines.core)
implementation(libs.kotlinx.datetime)
implementation(libs.stately.concurrency)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.powersync.sync

import com.powersync.ExperimentalPowerSyncAPI

/**
* Small utility to run tests both with the legacy Kotlin sync implementation and the new
* implementation from the core extension.
*/
abstract class AbstractSyncTest(
private val useNewSyncImplementation: Boolean,
) {
@OptIn(ExperimentalPowerSyncAPI::class)
val options: SyncOptions get() {
return SyncOptions(useNewSyncImplementation)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.powersync
package com.powersync.sync

import app.cash.turbine.turbineScope
import co.touchlab.kermit.ExperimentalKermitApi
import com.powersync.PowerSyncDatabase
import com.powersync.PowerSyncException
import com.powersync.TestConnector
import com.powersync.bucket.BucketChecksum
import com.powersync.bucket.BucketPriority
import com.powersync.bucket.Checkpoint
Expand All @@ -11,36 +14,43 @@ import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.schema.Schema
import com.powersync.sync.SyncLine
import com.powersync.testutils.UserRow
import com.powersync.testutils.databaseTest
import com.powersync.testutils.waitFor
import com.powersync.utils.JsonUtil
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.verify
import dev.mokkery.verifyNoMoreCalls
import dev.mokkery.verifySuspend
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.serialization.encodeToString
import kotlinx.coroutines.launch
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.seconds

class SyncIntegrationTest {
@OptIn(LegacySyncImplementation::class)
abstract class BaseSyncIntegrationTest(
useNewSyncImplementation: Boolean,
) : AbstractSyncTest(
useNewSyncImplementation,
) {
private suspend fun PowerSyncDatabase.expectUserCount(amount: Int) {
val users = getAll("SELECT * FROM users;") { UserRow.from(it) }
users shouldHaveSize amount
}

@Test
@OptIn(DelicateCoroutinesApi::class)
fun connectImmediately() =
databaseTest(createInitialDatabase = false) {
// Regression test for https://github.com/powersync-ja/powersync-kotlin/issues/169
val database = openDatabase()
database.connect(connector)
database.connect(connector, options = options)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand All @@ -53,7 +63,7 @@ class SyncIntegrationTest {
@OptIn(DelicateCoroutinesApi::class)
fun closesResponseStreamOnDatabaseClose() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand All @@ -72,7 +82,7 @@ class SyncIntegrationTest {
@OptIn(DelicateCoroutinesApi::class)
fun cleansResourcesOnDisconnect() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand All @@ -93,7 +103,7 @@ class SyncIntegrationTest {
@Test
fun cannotUpdateSchemaWhileConnected() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand All @@ -111,7 +121,7 @@ class SyncIntegrationTest {
@Test
fun testPartialSync() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

val checksums =
buildList {
Expand Down Expand Up @@ -202,7 +212,7 @@ class SyncIntegrationTest {
@Test
fun testRemembersLastPartialSync() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

syncLines.send(
SyncLine.FullCheckpoint(
Expand Down Expand Up @@ -238,7 +248,7 @@ class SyncIntegrationTest {
@Test
fun setsDownloadingState() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -272,7 +282,7 @@ class SyncIntegrationTest {
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(connector)
database.connect(connector, options = options)
turbine.waitFor { it.connecting }

database.disconnect()
Expand All @@ -285,7 +295,7 @@ class SyncIntegrationTest {
@Test
fun testMultipleSyncsDoNotCreateMultipleStatusEntries() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -331,8 +341,8 @@ class SyncIntegrationTest {

turbineScope(timeout = 10.0.seconds) {
// Connect the first database
database.connect(connector)
db2.connect(connector)
database.connect(connector, options = options)
db2.connect(connector, options = options)

waitFor {
assertNotNull(
Expand All @@ -357,10 +367,10 @@ class SyncIntegrationTest {
val turbine2 = db2.currentStatus.asFlow().testIn(this)

// Connect the first database
database.connect(connector)
database.connect(connector, options = options)

turbine1.waitFor { it.connecting }
db2.connect(connector)
db2.connect(connector, options = options)

// Should not be connecting yet
db2.currentStatus.connecting shouldBe false
Expand All @@ -384,13 +394,13 @@ class SyncIntegrationTest {
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(connector, 1000L)
database.connect(connector, 1000L, options = options)
turbine.waitFor { it.connecting }

database.disconnect()
turbine.waitFor { !it.connecting }

database.connect(connector, 1000L)
database.connect(connector, 1000L, options = options)
turbine.waitFor { it.connecting }
database.disconnect()
turbine.waitFor { !it.connecting }
Expand All @@ -405,10 +415,10 @@ class SyncIntegrationTest {
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(connector, 1000L, retryDelayMs = 5000)
database.connect(connector, 1000L, retryDelayMs = 5000, options = options)
turbine.waitFor { it.connecting }

database.connect(connector, 1000L, retryDelayMs = 5000)
database.connect(connector, 1000L, retryDelayMs = 5000, options = options)
turbine.waitFor { it.connecting }

turbine.cancel()
Expand All @@ -421,7 +431,7 @@ class SyncIntegrationTest {
databaseTest {
val testConnector = TestConnector()
connector = testConnector
database.connect(testConnector)
database.connect(testConnector, options = options)

suspend fun expectUserRows(amount: Int) {
val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! }
Expand All @@ -440,8 +450,15 @@ class SyncIntegrationTest {

// Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
// connected).
turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
syncLines.send(SyncLine.KeepAlive(1234))
turbine.waitFor { it.connected }
turbine.cancelAndIgnoreRemainingEvents()
}

database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "[email protected]"))
syncLines.send(SyncLine.KeepAlive(1234))

expectUserRows(1)
uploadStarted.await()

Expand Down Expand Up @@ -508,6 +525,7 @@ class SyncIntegrationTest {
}
completeUpload.complete(Unit)
requestedCheckpoint.await()
logger.d { "Did request checkpoint" }

// This should apply the checkpoint
turbineScope {
Expand All @@ -519,4 +537,70 @@ class SyncIntegrationTest {
// Meaning that the two rows are now visible
database.expectUserCount(2)
}

@Test
fun testTokenExpired() =
databaseTest {
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(connector, 1000L, retryDelayMs = 5000, options = options)
turbine.waitFor { it.connecting }

syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000))
turbine.waitFor { it.connected }
verifySuspend { connector.getCredentialsCached() }
verifyNoMoreCalls(connector)

// Should invalidate credentials when token expires
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 0))
turbine.waitFor { !it.connected }
verify { connector.invalidateCredentials() }

turbine.cancel()
}
}
}

class LegacySyncIntegrationTest : BaseSyncIntegrationTest(false)

class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
// The legacy sync implementation doesn't prefetch credentials.

@OptIn(LegacySyncImplementation::class)
@Test
fun testTokenPrefetch() =
databaseTest {
val prefetchCalled = CompletableDeferred<Unit>()
val completePrefetch = CompletableDeferred<Unit>()
every { connector.prefetchCredentials() } returns
scope.launch {
prefetchCalled.complete(Unit)
completePrefetch.await()
}

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(connector, 1000L, retryDelayMs = 5000, options = options)
turbine.waitFor { it.connecting }

syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000))
turbine.waitFor { it.connected }
verifySuspend { connector.getCredentialsCached() }
verifyNoMoreCalls(connector)

syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10))
prefetchCalled.complete(Unit)
// Should still be connected before prefetch completes
database.currentStatus.connected shouldBe true

// After the prefetch completes, we should reconnect
completePrefetch.complete(Unit)
turbine.waitFor { !it.connected }

turbine.waitFor { it.connected }
turbine.cancel()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ import kotlin.test.assertFalse
import kotlin.test.assertNull
import kotlin.test.assertTrue

class SyncProgressTest {
@OptIn(LegacySyncImplementation::class)
abstract class BaseSyncProgressTest(
useNewSyncImplementation: Boolean,
) : AbstractSyncTest(
useNewSyncImplementation,
) {
private var lastOpId = 0

@BeforeTest
Expand Down Expand Up @@ -104,7 +109,7 @@ class SyncProgressTest {
@Test
fun withoutPriorities() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -153,7 +158,7 @@ class SyncProgressTest {
@Test
fun interruptedSync() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -183,7 +188,7 @@ class SyncProgressTest {
// And reconnecting
database = openDatabase()
syncLines = Channel()
database.connect(connector)
database.connect(connector, options = options)

turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -217,7 +222,7 @@ class SyncProgressTest {
@Test
fun interruptedSyncWithNewCheckpoint() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand All @@ -243,7 +248,7 @@ class SyncProgressTest {
syncLines.close()
database = openDatabase()
syncLines = Channel()
database.connect(connector)
database.connect(connector, options = options)

turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -276,7 +281,7 @@ class SyncProgressTest {
@Test
fun differentPriorities() =
databaseTest {
database.connect(connector)
database.connect(connector, options = options)

turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -341,3 +346,7 @@ class SyncProgressTest {
syncLines.close()
}
}

class LegacySyncProgressTest : BaseSyncProgressTest(false)

class NewSyncProgressTest : BaseSyncProgressTest(true)
Loading
Loading