Skip to content

Commit c1bb07a

Browse files
authored
Remove websocket transport, use binary HTTP stream when available (#232)
1 parent 1c833dc commit c1bb07a

File tree

13 files changed

+271
-271
lines changed

13 files changed

+271
-271
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
* Update SQLite to 3.50.3.
66
* Android: Ensure JNI libraries are 16KB-aligned.
7+
* Support receiving binary sync lines over HTTP when the Rust client is enabled.
8+
* Remove the experimental websocket transport mode.
79

810
## 1.3.0
911

PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
package com.powersync
44

5-
import com.powersync.sync.ConnectionMethod
65
import com.powersync.sync.SyncOptions
76

87
/**
@@ -25,16 +24,9 @@ public fun throwPowerSyncException(exception: PowerSyncException): Unit = throw
2524
@OptIn(ExperimentalPowerSyncAPI::class)
2625
public fun createSyncOptions(
2726
newClient: Boolean,
28-
webSocket: Boolean,
2927
userAgent: String,
3028
): SyncOptions =
3129
SyncOptions(
3230
newClientImplementation = newClient,
33-
method =
34-
if (webSocket) {
35-
ConnectionMethod.WebSocket()
36-
} else {
37-
ConnectionMethod.Http
38-
},
3931
userAgent = userAgent,
4032
)

core/build.gradle.kts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,6 @@ kotlin {
207207
implementation(libs.ktor.client.contentnegotiation)
208208
implementation(libs.ktor.serialization.json)
209209
implementation(libs.kotlinx.io)
210-
implementation(libs.rsocket.core)
211-
implementation(libs.rsocket.transport.websocket)
212210
implementation(libs.kotlinx.coroutines.core)
213211
implementation(libs.kotlinx.datetime)
214212
implementation(libs.stately.concurrency)

core/src/commonIntegrationTest/kotlin/com/powersync/sync/AbstractSyncTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.powersync.ExperimentalPowerSyncAPI
88
*/
99
abstract class AbstractSyncTest(
1010
private val useNewSyncImplementation: Boolean,
11+
protected val useBson: Boolean = false,
1112
) {
1213
@OptIn(ExperimentalPowerSyncAPI::class)
1314
val options: SyncOptions get() {

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import io.kotest.matchers.collections.shouldHaveSize
3030
import io.kotest.matchers.shouldBe
3131
import io.kotest.matchers.shouldNotBe
3232
import io.kotest.matchers.string.shouldContain
33+
import io.ktor.http.ContentType
3334
import kotlinx.coroutines.CompletableDeferred
3435
import kotlinx.coroutines.DelicateCoroutinesApi
3536
import kotlinx.coroutines.Dispatchers
@@ -859,4 +860,44 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
859860
query.cancelAndIgnoreRemainingEvents()
860861
}
861862
}
863+
864+
@OptIn(ExperimentalStdlibApi::class)
865+
@Test
866+
fun bson() =
867+
databaseTest {
868+
// There's no up-to-date bson library for Kotlin multiplatform, so this test verifies BSON support with byte
869+
// strings created with package:bson in Dart.
870+
syncLinesContentType = ContentType("application", "vnd.powersync.bson-stream")
871+
872+
turbineScope(timeout = 10.0.seconds) {
873+
val query =
874+
database
875+
.watch("SELECT name FROM users", throttleMs = 0L) {
876+
it.getString(0)!!
877+
}.testIn(this)
878+
query.awaitItem() shouldBe emptyList()
879+
880+
database.connect(connector, options = options)
881+
882+
// {checkpoint: {last_op_id: 1, write_checkpoint: null, buckets: [{bucket: a, checksum: 0, priority: 3, count: null}]}}
883+
syncLines.send(
884+
"8100000003636865636b706f696e740070000000026c6173745f6f705f6964000200000031000a77726974655f636865636b706f696e7400046275636b657473003e00000003300036000000026275636b65740002000000610010636865636b73756d0000000000107072696f7269747900030000000a636f756e740000000000"
885+
.hexToByteArray(),
886+
)
887+
888+
// {data: {bucket: a, data: [{checksum: 0, data: {"name":"username"}, op: PUT, op_id: 1, object_id: u, object_type: users}]}}
889+
syncLines.send(
890+
"9e00000003646174610093000000026275636b6574000200000061000464617461007a0000000330007200000010636865636b73756d0000000000026461746100140000007b226e616d65223a22757365726e616d65227d00026f70000400000050555400026f705f696400020000003100026f626a6563745f696400020000007500026f626a6563745f74797065000600000075736572730000000000"
891+
.hexToByteArray(),
892+
)
893+
894+
// {checkpoint_complete: {last_op_id: 1}}
895+
syncLines.send(
896+
"3100000003636865636b706f696e745f636f6d706c6574650017000000026c6173745f6f705f6964000200000031000000".hexToByteArray(),
897+
)
898+
899+
query.awaitItem() shouldBe listOf("username")
900+
query.cancelAndIgnoreRemainingEvents()
901+
}
902+
}
862903
}

core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ import com.powersync.createPowerSyncDatabaseImpl
1616
import com.powersync.db.PowerSyncDatabaseImpl
1717
import com.powersync.db.schema.Schema
1818
import com.powersync.sync.LegacySyncImplementation
19-
import com.powersync.sync.SyncLine
2019
import com.powersync.utils.JsonUtil
2120
import io.ktor.client.HttpClient
2221
import io.ktor.client.HttpClientConfig
2322
import io.ktor.client.engine.mock.toByteArray
23+
import io.ktor.http.ContentType
2424
import kotlinx.coroutines.channels.Channel
2525
import kotlinx.coroutines.test.TestScope
2626
import kotlinx.coroutines.test.runTest
@@ -84,8 +84,8 @@ internal class ActiveDatabaseTest(
8484
),
8585
)
8686

87-
@OptIn(LegacySyncImplementation::class)
88-
var syncLines = Channel<SyncLine>()
87+
var syncLines = Channel<Any>()
88+
var syncLinesContentType = ContentType("application", "x-ndjson")
8989
var requestedSyncStreams = mutableListOf<JsonElement>()
9090
var checkpointResponse: () -> WriteCheckpointResponse = {
9191
WriteCheckpointResponse(WriteCheckpointData("1000"))
@@ -124,6 +124,7 @@ internal class ActiveDatabaseTest(
124124
MockSyncService(
125125
lines = syncLines,
126126
generateCheckpoint = { checkpointResponse() },
127+
syncLinesContentType = { syncLinesContentType },
127128
trackSyncRequest = {
128129
val parsed = JsonUtil.json.parseToJsonElement(it.body.toByteArray().decodeToString())
129130
requestedSyncStreams.add(parsed)

core/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt

Lines changed: 0 additions & 139 deletions
This file was deleted.

core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ package com.powersync.sync
22

33
import com.powersync.ExperimentalPowerSyncAPI
44
import com.powersync.PowerSyncDatabase
5-
import io.rsocket.kotlin.keepalive.KeepAlive
6-
import kotlin.time.Duration
7-
import kotlin.time.Duration.Companion.seconds
85

96
/**
107
* Experimental options that can be passed to [PowerSyncDatabase.connect] to specify an experimental
@@ -19,8 +16,6 @@ public class SyncOptions
1916
constructor(
2017
@property:ExperimentalPowerSyncAPI
2118
public val newClientImplementation: Boolean = false,
22-
@property:ExperimentalPowerSyncAPI
23-
public val method: ConnectionMethod = ConnectionMethod.Http,
2419
/**
2520
* The user agent to use for requests made to the PowerSync service.
2621
*/
@@ -37,53 +32,3 @@ public class SyncOptions
3732
public val defaults: SyncOptions = SyncOptions()
3833
}
3934
}
40-
41-
/**
42-
* The connection method to use when the SDK connects to the sync service.
43-
*/
44-
@ExperimentalPowerSyncAPI
45-
public sealed interface ConnectionMethod {
46-
/**
47-
* Receive sync lines via streamed HTTP response from the sync service.
48-
*
49-
* This mode is less efficient than [WebSocket] because it doesn't support backpressure
50-
* properly and uses JSON instead of the more efficient BSON representation for sync lines.
51-
*
52-
* This is currently the default, but this will be changed once [WebSocket] support is stable.
53-
*/
54-
@ExperimentalPowerSyncAPI
55-
public data object Http : ConnectionMethod
56-
57-
/**
58-
* Receive binary sync lines via RSocket over a WebSocket connection.
59-
*
60-
* This connection mode is currently experimental and requires a recent sync service to work.
61-
* WebSocket support is only available when enabling the [SyncOptions.newClientImplementation].
62-
*/
63-
@ExperimentalPowerSyncAPI
64-
public data class WebSocket(
65-
val keepAlive: RSocketKeepAlive = RSocketKeepAlive.default,
66-
) : ConnectionMethod
67-
}
68-
69-
/**
70-
* Keep-alive options for long-running RSocket streams:
71-
*
72-
* The client will ping the server every [interval], and assumes the connection to be closed if it
73-
* hasn't received an acknowledgement in [maxLifetime].
74-
*/
75-
@ExperimentalPowerSyncAPI
76-
public data class RSocketKeepAlive(
77-
val interval: Duration,
78-
val maxLifetime: Duration,
79-
) {
80-
internal fun toRSocket(): KeepAlive = KeepAlive(interval, maxLifetime)
81-
82-
internal companion object {
83-
val default =
84-
RSocketKeepAlive(
85-
interval = 20.0.seconds,
86-
maxLifetime = 30.0.seconds,
87-
)
88-
}
89-
}

0 commit comments

Comments
 (0)