| 
 | 1 | +package com.powersync.sync  | 
 | 2 | + | 
 | 3 | +import app.cash.turbine.ReceiveTurbine  | 
 | 4 | +import app.cash.turbine.turbineScope  | 
 | 5 | +import com.powersync.bucket.BucketChecksum  | 
 | 6 | +import com.powersync.bucket.BucketPriority  | 
 | 7 | +import com.powersync.bucket.Checkpoint  | 
 | 8 | +import com.powersync.bucket.OpType  | 
 | 9 | +import com.powersync.bucket.OplogEntry  | 
 | 10 | +import com.powersync.testutils.ActiveDatabaseTest  | 
 | 11 | +import com.powersync.testutils.databaseTest  | 
 | 12 | +import com.powersync.testutils.waitFor  | 
 | 13 | +import kotlinx.coroutines.channels.Channel  | 
 | 14 | +import kotlin.test.BeforeTest  | 
 | 15 | +import kotlin.test.Test  | 
 | 16 | +import kotlin.test.assertEquals  | 
 | 17 | +import kotlin.test.assertFalse  | 
 | 18 | +import kotlin.test.assertNull  | 
 | 19 | +import kotlin.test.assertTrue  | 
 | 20 | + | 
 | 21 | +class SyncProgressTest {  | 
 | 22 | +    private var lastOpId = 0  | 
 | 23 | + | 
 | 24 | +    @BeforeTest  | 
 | 25 | +    fun resetOpId() {  | 
 | 26 | +        lastOpId = 0  | 
 | 27 | +    }  | 
 | 28 | + | 
 | 29 | +    private fun bucket(  | 
 | 30 | +        name: String,  | 
 | 31 | +        count: Int,  | 
 | 32 | +        priority: BucketPriority = BucketPriority(3),  | 
 | 33 | +    ): BucketChecksum =  | 
 | 34 | +        BucketChecksum(  | 
 | 35 | +            bucket = name,  | 
 | 36 | +            priority = priority,  | 
 | 37 | +            checksum = 0,  | 
 | 38 | +            count = count,  | 
 | 39 | +        )  | 
 | 40 | + | 
 | 41 | +    private suspend fun ActiveDatabaseTest.addDataLine(  | 
 | 42 | +        bucket: String,  | 
 | 43 | +        amount: Int,  | 
 | 44 | +    ) {  | 
 | 45 | +        syncLines.send(  | 
 | 46 | +            SyncLine.SyncDataBucket(  | 
 | 47 | +                bucket = bucket,  | 
 | 48 | +                data =  | 
 | 49 | +                    List(amount) {  | 
 | 50 | +                        OplogEntry(  | 
 | 51 | +                            checksum = 0,  | 
 | 52 | +                            opId = (++lastOpId).toString(),  | 
 | 53 | +                            op = OpType.PUT,  | 
 | 54 | +                            rowId = lastOpId.toString(),  | 
 | 55 | +                            rowType = bucket,  | 
 | 56 | +                            data = "{}",  | 
 | 57 | +                        )  | 
 | 58 | +                    },  | 
 | 59 | +                after = null,  | 
 | 60 | +                nextAfter = null,  | 
 | 61 | +            ),  | 
 | 62 | +        )  | 
 | 63 | +    }  | 
 | 64 | + | 
 | 65 | +    private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) {  | 
 | 66 | +        if (priority != null) {  | 
 | 67 | +            syncLines.send(  | 
 | 68 | +                SyncLine.CheckpointPartiallyComplete(  | 
 | 69 | +                    lastOpId = lastOpId.toString(),  | 
 | 70 | +                    priority = priority,  | 
 | 71 | +                ),  | 
 | 72 | +            )  | 
 | 73 | +        } else {  | 
 | 74 | +            syncLines.send(SyncLine.CheckpointComplete(lastOpId = lastOpId.toString()))  | 
 | 75 | +        }  | 
 | 76 | +    }  | 
 | 77 | + | 
 | 78 | +    private suspend fun ReceiveTurbine<SyncStatusData>.expectProgress(  | 
 | 79 | +        total: Pair<Int, Int>,  | 
 | 80 | +        priorities: Map<BucketPriority, Pair<Int, Int>> = emptyMap(),  | 
 | 81 | +    ) {  | 
 | 82 | +        val item = awaitItem()  | 
 | 83 | +        val progress = item.downloadProgress ?: error("Expected download progress on $item")  | 
 | 84 | + | 
 | 85 | +        assertTrue { item.downloading }  | 
 | 86 | +        assertEquals(total.first, progress.downloadedOperations)  | 
 | 87 | +        assertEquals(total.second, progress.totalOperations)  | 
 | 88 | + | 
 | 89 | +        priorities.forEach { (priority, expected) ->  | 
 | 90 | +            val (expectedDownloaded, expectedTotal) = expected  | 
 | 91 | +            val progress = progress.untilPriority(priority)  | 
 | 92 | +            assertEquals(expectedDownloaded, progress.downloadedOperations)  | 
 | 93 | +            assertEquals(expectedTotal, progress.totalOperations)  | 
 | 94 | +        }  | 
 | 95 | +    }  | 
 | 96 | + | 
 | 97 | +    private suspend fun ReceiveTurbine<SyncStatusData>.expectNotDownloading() {  | 
 | 98 | +        awaitItem().also {  | 
 | 99 | +            assertFalse { it.downloading }  | 
 | 100 | +            assertNull(it.downloadProgress)  | 
 | 101 | +        }  | 
 | 102 | +    }  | 
 | 103 | + | 
 | 104 | +    @Test  | 
 | 105 | +    fun withoutPriorities() =  | 
 | 106 | +        databaseTest {  | 
 | 107 | +            database.connect(connector)  | 
 | 108 | + | 
 | 109 | +            turbineScope {  | 
 | 110 | +                val turbine = database.currentStatus.asFlow().testIn(this)  | 
 | 111 | +                turbine.waitFor { it.connected && !it.downloading }  | 
 | 112 | + | 
 | 113 | +                // Send checkpoint with 10 ops, progress should be 0/10  | 
 | 114 | +                syncLines.send(  | 
 | 115 | +                    SyncLine.FullCheckpoint(  | 
 | 116 | +                        Checkpoint(  | 
 | 117 | +                            lastOpId = "10",  | 
 | 118 | +                            checksums = listOf(bucket("a", 10)),  | 
 | 119 | +                        ),  | 
 | 120 | +                    ),  | 
 | 121 | +                )  | 
 | 122 | +                turbine.expectProgress(0 to 10)  | 
 | 123 | + | 
 | 124 | +                addDataLine("a", 10)  | 
 | 125 | +                turbine.expectProgress(10 to 10)  | 
 | 126 | + | 
 | 127 | +                addCheckpointComplete()  | 
 | 128 | +                turbine.expectNotDownloading()  | 
 | 129 | + | 
 | 130 | +                // Emit new data, progress should be 0/2 instead of 10/12  | 
 | 131 | +                syncLines.send(  | 
 | 132 | +                    SyncLine.CheckpointDiff(  | 
 | 133 | +                        lastOpId = "12",  | 
 | 134 | +                        updatedBuckets = listOf(bucket("a", 12)),  | 
 | 135 | +                        removedBuckets = emptyList(),  | 
 | 136 | +                    ),  | 
 | 137 | +                )  | 
 | 138 | +                turbine.expectProgress(0 to 2)  | 
 | 139 | + | 
 | 140 | +                addDataLine("a", 2)  | 
 | 141 | +                turbine.expectProgress(2 to 2)  | 
 | 142 | + | 
 | 143 | +                addCheckpointComplete()  | 
 | 144 | +                turbine.expectNotDownloading()  | 
 | 145 | + | 
 | 146 | +                turbine.cancel()  | 
 | 147 | +            }  | 
 | 148 | + | 
 | 149 | +            database.close()  | 
 | 150 | +            syncLines.close()  | 
 | 151 | +        }  | 
 | 152 | + | 
 | 153 | +    @Test  | 
 | 154 | +    fun interruptedSync() =  | 
 | 155 | +        databaseTest {  | 
 | 156 | +            database.connect(connector)  | 
 | 157 | + | 
 | 158 | +            turbineScope {  | 
 | 159 | +                val turbine = database.currentStatus.asFlow().testIn(this)  | 
 | 160 | +                turbine.waitFor { it.connected && !it.downloading }  | 
 | 161 | + | 
 | 162 | +                // Send checkpoint with 10 ops, progress should be 0/10  | 
 | 163 | +                syncLines.send(  | 
 | 164 | +                    SyncLine.FullCheckpoint(  | 
 | 165 | +                        Checkpoint(  | 
 | 166 | +                            lastOpId = "10",  | 
 | 167 | +                            checksums = listOf(bucket("a", 10)),  | 
 | 168 | +                        ),  | 
 | 169 | +                    ),  | 
 | 170 | +                )  | 
 | 171 | +                turbine.expectProgress(0 to 10)  | 
 | 172 | + | 
 | 173 | +                addDataLine("a", 5)  | 
 | 174 | +                turbine.expectProgress(5 to 10)  | 
 | 175 | + | 
 | 176 | +                turbine.cancel()  | 
 | 177 | +            }  | 
 | 178 | + | 
 | 179 | +            // Emulate the app closing  | 
 | 180 | +            database.close()  | 
 | 181 | +            syncLines.close()  | 
 | 182 | + | 
 | 183 | +            // And reconnecting  | 
 | 184 | +            database = openDatabase()  | 
 | 185 | +            syncLines = Channel()  | 
 | 186 | +            database.connect(connector)  | 
 | 187 | + | 
 | 188 | +            turbineScope {  | 
 | 189 | +                val turbine = database.currentStatus.asFlow().testIn(this)  | 
 | 190 | +                turbine.waitFor { it.connected && !it.downloading }  | 
 | 191 | + | 
 | 192 | +                // Send the same checkpoint as before  | 
 | 193 | +                syncLines.send(  | 
 | 194 | +                    SyncLine.FullCheckpoint(  | 
 | 195 | +                        Checkpoint(  | 
 | 196 | +                            lastOpId = "10",  | 
 | 197 | +                            checksums = listOf(bucket("a", 10)),  | 
 | 198 | +                        ),  | 
 | 199 | +                    ),  | 
 | 200 | +                )  | 
 | 201 | + | 
 | 202 | +                // Progress should be restored: 5 / 10 instead of 0/5  | 
 | 203 | +                turbine.expectProgress(5 to 10)  | 
 | 204 | + | 
 | 205 | +                addDataLine("a", 5)  | 
 | 206 | +                turbine.expectProgress(10 to 10)  | 
 | 207 | +                addCheckpointComplete()  | 
 | 208 | +                turbine.expectNotDownloading()  | 
 | 209 | + | 
 | 210 | +                turbine.cancel()  | 
 | 211 | +            }  | 
 | 212 | + | 
 | 213 | +            database.close()  | 
 | 214 | +            syncLines.close()  | 
 | 215 | +        }  | 
 | 216 | + | 
 | 217 | +    @Test  | 
 | 218 | +    fun interruptedSyncWithNewCheckpoint() =  | 
 | 219 | +        databaseTest {  | 
 | 220 | +            database.connect(connector)  | 
 | 221 | + | 
 | 222 | +            turbineScope {  | 
 | 223 | +                val turbine = database.currentStatus.asFlow().testIn(this)  | 
 | 224 | +                turbine.waitFor { it.connected && !it.downloading }  | 
 | 225 | +                syncLines.send(  | 
 | 226 | +                    SyncLine.FullCheckpoint(  | 
 | 227 | +                        Checkpoint(  | 
 | 228 | +                            lastOpId = "10",  | 
 | 229 | +                            checksums = listOf(bucket("a", 10)),  | 
 | 230 | +                        ),  | 
 | 231 | +                    ),  | 
 | 232 | +                )  | 
 | 233 | +                turbine.expectProgress(0 to 10)  | 
 | 234 | + | 
 | 235 | +                addDataLine("a", 5)  | 
 | 236 | +                turbine.expectProgress(5 to 10)  | 
 | 237 | + | 
 | 238 | +                turbine.cancel()  | 
 | 239 | +            }  | 
 | 240 | + | 
 | 241 | +            // Close and re-connect  | 
 | 242 | +            database.close()  | 
 | 243 | +            syncLines.close()  | 
 | 244 | +            database = openDatabase()  | 
 | 245 | +            syncLines = Channel()  | 
 | 246 | +            database.connect(connector)  | 
 | 247 | + | 
 | 248 | +            turbineScope {  | 
 | 249 | +                val turbine = database.currentStatus.asFlow().testIn(this)  | 
 | 250 | +                turbine.waitFor { it.connected && !it.downloading }  | 
 | 251 | + | 
 | 252 | +                // Send checkpoint with two more ops  | 
 | 253 | +                syncLines.send(  | 
 | 254 | +                    SyncLine.FullCheckpoint(  | 
 | 255 | +                        Checkpoint(  | 
 | 256 | +                            lastOpId = "12",  | 
 | 257 | +                            checksums = listOf(bucket("a", 12)),  | 
 | 258 | +                        ),  | 
 | 259 | +                    ),  | 
 | 260 | +                )  | 
 | 261 | + | 
 | 262 | +                turbine.expectProgress(5 to 12)  | 
 | 263 | + | 
 | 264 | +                addDataLine("a", 7)  | 
 | 265 | +                turbine.expectProgress(12 to 12)  | 
 | 266 | +                addCheckpointComplete()  | 
 | 267 | +                turbine.expectNotDownloading()  | 
 | 268 | + | 
 | 269 | +                turbine.cancel()  | 
 | 270 | +            }  | 
 | 271 | + | 
 | 272 | +            database.close()  | 
 | 273 | +            syncLines.close()  | 
 | 274 | +        }  | 
 | 275 | + | 
 | 276 | +    @Test  | 
 | 277 | +    fun differentPriorities() =  | 
 | 278 | +        databaseTest {  | 
 | 279 | +            database.connect(connector)  | 
 | 280 | + | 
 | 281 | +            turbineScope {  | 
 | 282 | +                val turbine = database.currentStatus.asFlow().testIn(this)  | 
 | 283 | +                turbine.waitFor { it.connected && !it.downloading }  | 
 | 284 | + | 
 | 285 | +                suspend fun expectProgress(  | 
 | 286 | +                    prio0: Pair<Int, Int>,  | 
 | 287 | +                    prio2: Pair<Int, Int>,  | 
 | 288 | +                ) {  | 
 | 289 | +                    turbine.expectProgress(prio2, mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2))  | 
 | 290 | +                }  | 
 | 291 | + | 
 | 292 | +                syncLines.send(  | 
 | 293 | +                    SyncLine.FullCheckpoint(  | 
 | 294 | +                        Checkpoint(  | 
 | 295 | +                            lastOpId = "10",  | 
 | 296 | +                            checksums =  | 
 | 297 | +                                listOf(  | 
 | 298 | +                                    bucket("a", 5, BucketPriority(0)),  | 
 | 299 | +                                    bucket("b", 5, BucketPriority(2)),  | 
 | 300 | +                                ),  | 
 | 301 | +                        ),  | 
 | 302 | +                    ),  | 
 | 303 | +                )  | 
 | 304 | +                expectProgress(0 to 5, 0 to 10)  | 
 | 305 | + | 
 | 306 | +                addDataLine("a", 5)  | 
 | 307 | +                expectProgress(5 to 5, 5 to 10)  | 
 | 308 | + | 
 | 309 | +                addCheckpointComplete(BucketPriority(0))  | 
 | 310 | +                expectProgress(5 to 5, 5 to 10)  | 
 | 311 | + | 
 | 312 | +                addDataLine("b", 2)  | 
 | 313 | +                expectProgress(5 to 5, 7 to 10)  | 
 | 314 | + | 
 | 315 | +                // Before syncing b fully, send a new checkpoint  | 
 | 316 | +                syncLines.send(  | 
 | 317 | +                    SyncLine.CheckpointDiff(  | 
 | 318 | +                        lastOpId = "14",  | 
 | 319 | +                        updatedBuckets =  | 
 | 320 | +                            listOf(  | 
 | 321 | +                                bucket("a", 8, BucketPriority(0)),  | 
 | 322 | +                                bucket("b", 6, BucketPriority(2)),  | 
 | 323 | +                            ),  | 
 | 324 | +                        removedBuckets = emptyList(),  | 
 | 325 | +                    ),  | 
 | 326 | +                )  | 
 | 327 | +                expectProgress(5 to 8, 7 to 14)  | 
 | 328 | + | 
 | 329 | +                addDataLine("a", 3)  | 
 | 330 | +                expectProgress(8 to 8, 10 to 14)  | 
 | 331 | +                addDataLine("b", 4)  | 
 | 332 | +                expectProgress(8 to 8, 14 to 14)  | 
 | 333 | + | 
 | 334 | +                addCheckpointComplete()  | 
 | 335 | +                turbine.expectNotDownloading()  | 
 | 336 | + | 
 | 337 | +                turbine.cancel()  | 
 | 338 | +            }  | 
 | 339 | + | 
 | 340 | +            database.close()  | 
 | 341 | +            syncLines.close()  | 
 | 342 | +        }  | 
 | 343 | +}  | 
0 commit comments