diff --git a/CHANGELOG.md b/CHANGELOG.md
index 03a76802..9dfaacaa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,8 @@
 
 * Fixed `CrudBatch` `hasMore` always returning false.
 * Added `triggerImmediately` to `onChange` method.
+* Report real-time progress information about downloads through `SyncStatus.downloadProgress`.
+* Compose: Add `composeState()` extension method on `SyncStatus`.
 
 ## 1.0.0-BETA32
 
diff --git a/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt b/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt
new file mode 100644
index 00000000..74a48ef6
--- /dev/null
+++ b/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt
@@ -0,0 +1,10 @@
+package com.powersync.compose
+
+import androidx.compose.runtime.Composable
+import androidx.compose.runtime.State
+import androidx.compose.runtime.collectAsState
+import com.powersync.sync.SyncStatus
+import com.powersync.sync.SyncStatusData
+
+@Composable
+public fun SyncStatus.composeState(): State<SyncStatusData> = asFlow().collectAsState(initial = this)
diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt
similarity index 100%
rename from core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt
rename to core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt
diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt
new file mode 100644
index 00000000..e7fe1d57
--- /dev/null
+++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt
@@ -0,0 +1,343 @@
+package com.powersync.sync
+
+import app.cash.turbine.ReceiveTurbine
+import app.cash.turbine.turbineScope
+import com.powersync.bucket.BucketChecksum
+import com.powersync.bucket.BucketPriority
+import com.powersync.bucket.Checkpoint
+import com.powersync.bucket.OpType
+import com.powersync.bucket.OplogEntry
+import com.powersync.testutils.ActiveDatabaseTest
+import com.powersync.testutils.databaseTest
+import com.powersync.testutils.waitFor
+import kotlinx.coroutines.channels.Channel
+import kotlin.test.BeforeTest
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertNull
+import kotlin.test.assertTrue
+
+class SyncProgressTest {
+    private var lastOpId = 0
+
+    @BeforeTest
+    fun resetOpId() {
+        lastOpId = 0
+    }
+
+    private fun bucket(
+        name: String,
+        count: Int,
+        priority: BucketPriority = BucketPriority(3),
+    ): BucketChecksum =
+        BucketChecksum(
+            bucket = name,
+            priority = priority,
+            checksum = 0,
+            count = count,
+        )
+
+    private suspend fun ActiveDatabaseTest.addDataLine(
+        bucket: String,
+        amount: Int,
+    ) {
+        syncLines.send(
+            SyncLine.SyncDataBucket(
+                bucket = bucket,
+                data =
+                    List(amount) {
+                        OplogEntry(
+                            checksum = 0,
+                            opId = (++lastOpId).toString(),
+                            op = OpType.PUT,
+                            rowId = lastOpId.toString(),
+                            rowType = bucket,
+                            data = "{}",
+                        )
+                    },
+                after = null,
+                nextAfter = null,
+            ),
+        )
+    }
+
+    private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) {
+        if (priority != null) {
+            syncLines.send(
+                SyncLine.CheckpointPartiallyComplete(
+                    lastOpId = lastOpId.toString(),
+                    priority = priority,
+                ),
+            )
+        } else {
+            syncLines.send(SyncLine.CheckpointComplete(lastOpId = lastOpId.toString()))
+        }
+    }
+
+    private suspend fun ReceiveTurbine<SyncStatusData>.expectProgress(
+        total: Pair<Int, Int>,
+        priorities: Map<BucketPriority, Pair<Int, Int>> = emptyMap(),
+    ) {
+        val item = awaitItem()
+        val progress = item.downloadProgress ?: error("Expected download progress on $item")
+
+        assertTrue { item.downloading }
+        assertEquals(total.first, progress.downloadedOperations)
+        assertEquals(total.second, progress.totalOperations)
+
+        priorities.forEach { (priority, expected) ->
+            val (expectedDownloaded, expectedTotal) = expected
+            val progress = progress.untilPriority(priority)
+            assertEquals(expectedDownloaded, progress.downloadedOperations)
+            assertEquals(expectedTotal, progress.totalOperations)
+        }
+    }
+
+    private suspend fun ReceiveTurbine<SyncStatusData>.expectNotDownloading() {
+        awaitItem().also {
+            assertFalse { it.downloading }
+            assertNull(it.downloadProgress)
+        }
+    }
+
+    @Test
+    fun withoutPriorities() =
+        databaseTest {
+            database.connect(connector)
+
+            turbineScope {
+                val turbine = database.currentStatus.asFlow().testIn(this)
+                turbine.waitFor { it.connected && !it.downloading }
+
+                // Send checkpoint with 10 ops, progress should be 0/10
+                syncLines.send(
+                    SyncLine.FullCheckpoint(
+                        Checkpoint(
+                            lastOpId = "10",
+                            checksums = listOf(bucket("a", 10)),
+                        ),
+                    ),
+                )
+                turbine.expectProgress(0 to 10)
+
+                addDataLine("a", 10)
+                turbine.expectProgress(10 to 10)
+
+                addCheckpointComplete()
+                turbine.expectNotDownloading()
+
+                // Emit new data, progress should be 0/2 instead of 10/12
+                syncLines.send(
+                    SyncLine.CheckpointDiff(
+                        lastOpId = "12",
+                        updatedBuckets = listOf(bucket("a", 12)),
+                        removedBuckets = emptyList(),
+                    ),
+                )
+                turbine.expectProgress(0 to 2)
+
+                addDataLine("a", 2)
+                turbine.expectProgress(2 to 2)
+
+                addCheckpointComplete()
+                turbine.expectNotDownloading()
+
+                turbine.cancel()
+            }
+
+            database.close()
+            syncLines.close()
+        }
+
+    @Test
+    fun interruptedSync() =
+        databaseTest {
+            database.connect(connector)
+
+            turbineScope {
+                val turbine = database.currentStatus.asFlow().testIn(this)
+                turbine.waitFor { it.connected && !it.downloading }
+
+                // Send checkpoint with 10 ops, progress should be 0/10
+                syncLines.send(
+                    SyncLine.FullCheckpoint(
+                        Checkpoint(
+                            lastOpId = "10",
+                            checksums = listOf(bucket("a", 10)),
+                        ),
+                    ),
+                )
+                turbine.expectProgress(0 to 10)
+
+                addDataLine("a", 5)
+                turbine.expectProgress(5 to 10)
+
+                turbine.cancel()
+            }
+
+            // Emulate the app closing
+            database.close()
+            syncLines.close()
+
+            // And reconnecting
+            database = openDatabase()
+            syncLines = Channel()
+            database.connect(connector)
+
+            turbineScope {
+                val turbine = database.currentStatus.asFlow().testIn(this)
+                turbine.waitFor { it.connected && !it.downloading }
+
+                // Send the same checkpoint as before
+                syncLines.send(
+                    SyncLine.FullCheckpoint(
+                        Checkpoint(
+                            lastOpId = "10",
+                            checksums = listOf(bucket("a", 10)),
+                        ),
+                    ),
+                )
+
+                // Progress should be restored: 5 / 10 instead of 0/5
+                turbine.expectProgress(5 to 10)
+
+                addDataLine("a", 5)
+                turbine.expectProgress(10 to 10)
+                addCheckpointComplete()
+                turbine.expectNotDownloading()
+
+                turbine.cancel()
+            }
+
+            database.close()
+            syncLines.close()
+        }
+
+    @Test
+    fun interruptedSyncWithNewCheckpoint() =
+        databaseTest {
+            database.connect(connector)
+
+            turbineScope {
+                val turbine = database.currentStatus.asFlow().testIn(this)
+                turbine.waitFor { it.connected && !it.downloading }
+                syncLines.send(
+                    SyncLine.FullCheckpoint(
+                        Checkpoint(
+                            lastOpId = "10",
+                            checksums = listOf(bucket("a", 10)),
+                        ),
+                    ),
+                )
+                turbine.expectProgress(0 to 10)
+
+                addDataLine("a", 5)
+                turbine.expectProgress(5 to 10)
+
+                turbine.cancel()
+            }
+
+            // Close and re-connect
+            database.close()
+            syncLines.close()
+            database = openDatabase()
+            syncLines = Channel()
+            database.connect(connector)
+
+            turbineScope {
+                val turbine = database.currentStatus.asFlow().testIn(this)
+                turbine.waitFor { it.connected && !it.downloading }
+
+                // Send checkpoint with two more ops
+                syncLines.send(
+                    SyncLine.FullCheckpoint(
+                        Checkpoint(
+                            lastOpId = "12",
+                            checksums = listOf(bucket("a", 12)),
+                        ),
+                    ),
+                )
+
+                turbine.expectProgress(5 to 12)
+
+                addDataLine("a", 7)
+                turbine.expectProgress(12 to 12)
+                addCheckpointComplete()
+                turbine.expectNotDownloading()
+
+                turbine.cancel()
+            }
+
+            database.close()
+            syncLines.close()
+        }
+
+    @Test
+    fun differentPriorities() =
+        databaseTest {
+            database.connect(connector)
+
+            turbineScope {
+                val turbine = database.currentStatus.asFlow().testIn(this)
+                turbine.waitFor { it.connected && !it.downloading }
+
+                suspend fun expectProgress(
+                    prio0: Pair<Int, Int>,
+                    prio2: Pair<Int, Int>,
+                ) {
+                    turbine.expectProgress(prio2, mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2))
+                }
+
+                syncLines.send(
+                    SyncLine.FullCheckpoint(
+                        Checkpoint(
+                            lastOpId = "10",
+                            checksums =
+                                listOf(
+                                    bucket("a", 5, BucketPriority(0)),
+                                    bucket("b", 5, BucketPriority(2)),
+                                ),
+                        ),
+                    ),
+                )
+                expectProgress(0 to 5, 0 to 10)
+
+                addDataLine("a", 5)
+                expectProgress(5 to 5, 5 to 10)
+
+                addCheckpointComplete(BucketPriority(0))
+                expectProgress(5 to 5, 5 to 10)
+
+                addDataLine("b", 2)
+                expectProgress(5 to 5, 7 to 10)
+
+                // Before syncing b fully, send a new checkpoint
+                syncLines.send(
+                    SyncLine.CheckpointDiff(
+                        lastOpId = "14",
+                        updatedBuckets =
+                            listOf(
+                                bucket("a", 8, BucketPriority(0)),
+                                bucket("b", 6, BucketPriority(2)),
+                            ),
+                        removedBuckets = emptyList(),
+                    ),
+                )
+                expectProgress(5 to 8, 7 to 14)
+
+                addDataLine("a", 3)
+                expectProgress(8 to 8, 10 to 14)
+                addDataLine("b", 4)
+                expectProgress(8 to 8, 14 to 14)
+
+                addCheckpointComplete()
+                turbine.expectNotDownloading()
+
+                turbine.cancel()
+            }
+
+            database.close()
+            syncLines.close()
+        }
+}
diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt
index 7560d8cb..f4353eac 100644
--- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt
+++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt
@@ -82,7 +82,7 @@ internal class ActiveDatabaseTest(
             ),
         )
 
-    val syncLines = Channel<SyncLine>()
+    var syncLines = Channel<SyncLine>()
     var checkpointResponse: () -> WriteCheckpointResponse = {
         WriteCheckpointResponse(WriteCheckpointData("1000"))
     }
diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt
index 654bc3c7..ab278ee0 100644
--- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt
+++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt
@@ -29,6 +29,8 @@ internal interface BucketStorage {
 
     suspend fun getBucketStates(): List<BucketState>
 
+    suspend fun getBucketOperationProgress(): Map<String, LocalOperationCounters>
+
     suspend fun removeBuckets(bucketsToDelete: List<String>)
 
     suspend fun hasCompletedSync(): Boolean
diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt
index 3f647b29..75a23dfa 100644
--- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt
+++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt
@@ -152,6 +152,22 @@ internal class BucketStorageImpl(
             },
         )
 
+    override suspend fun getBucketOperationProgress(): Map<String, LocalOperationCounters> =
+        buildMap {
+            val rows =
+                db.getAll("SELECT name, count_at_last, count_since_last FROM ps_buckets") { cursor ->
+                    cursor.getString(0)!! to
+                        LocalOperationCounters(
+                            atLast = cursor.getLong(1)!!.toInt(),
+                            sinceLast = cursor.getLong(2)!!.toInt(),
+                        )
+                }
+
+            for ((name, counters) in rows) {
+                put(name, counters)
+            }
+        }
+
     override suspend fun removeBuckets(bucketsToDelete: List<String>) {
         bucketsToDelete.forEach { bucketName ->
             deleteBucket(bucketName)
@@ -305,7 +321,6 @@ internal class BucketStorageImpl(
             }
 
         return db.writeTransaction { tx ->
-
             tx.execute(
                 "INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
                 listOf("sync_local", args),
@@ -316,7 +331,28 @@ internal class BucketStorageImpl(
                     cursor.getLong(0)!!
                 }
 
-            return@writeTransaction res == 1L
+            val didApply = res == 1L
+            if (didApply && priority == null) {
+                // Reset progress counters. We only do this for a complete sync, as we want a download progress to
+                // always cover a complete checkpoint instead of resetting for partial completions.
+                tx.execute(
+                    """
+                    UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
+                      WHERE name != '${'$'}local' AND ?1->name IS NOT NULL
+                    """.trimIndent(),
+                    listOf(
+                        JsonUtil.json.encodeToString(
+                            buildMap<String, Int> {
+                                for (bucket in checkpoint.checksums) {
+                                    bucket.count?.let { put(bucket.bucket, it) }
+                                }
+                            },
+                        ),
+                    ),
+                )
+            }
+
+            return@writeTransaction didApply
         }
     }
 
diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt b/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt
new file mode 100644
index 00000000..73f6a6fc
--- /dev/null
+++ b/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt
@@ -0,0 +1,6 @@
+package com.powersync.bucket
+
+internal data class LocalOperationCounters(
+    val atLast: Int,
+    val sinceLast: Int,
+)
diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt
index 86aa3c76..d58156e2 100644
--- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt
+++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt
@@ -14,6 +14,7 @@ import com.powersync.db.crud.CrudRow
 import com.powersync.db.crud.CrudTransaction
 import com.powersync.db.internal.InternalDatabaseImpl
 import com.powersync.db.internal.InternalTable
+import com.powersync.db.internal.PowerSyncVersion
 import com.powersync.db.schema.Schema
 import com.powersync.db.schema.toSerializable
 import com.powersync.sync.PriorityStatusEntry
@@ -217,21 +218,7 @@ internal class PowerSyncDatabaseImpl(
             }
 
             launch {
-                stream.status.asFlow().collect {
-                    currentStatus.update(
-                        connected = it.connected,
-                        connecting = it.connecting,
-                        uploading = it.uploading,
-                        downloading = it.downloading,
-                        lastSyncedAt = it.lastSyncedAt,
-                        hasSynced = it.hasSynced,
-                        uploadError = it.uploadError,
-                        downloadError = it.downloadError,
-                        clearDownloadError = it.downloadError == null,
-                        clearUploadError = it.uploadError == null,
-                        priorityStatusEntries = it.priorityStatusEntries,
-                    )
-                }
+                currentStatus.trackOther(stream.status)
             }
 
             launch {
@@ -437,11 +424,14 @@ internal class PowerSyncDatabaseImpl(
             syncSupervisorJob = null
         }
 
-        currentStatus.update(
-            connected = false,
-            connecting = false,
-            lastSyncedAt = currentStatus.lastSyncedAt,
-        )
+        currentStatus.update {
+            copy(
+                connected = false,
+                connecting = false,
+                downloading = false,
+                downloadProgress = null,
+            )
+        }
     }
 
     override suspend fun disconnectAndClear(clearLocal: Boolean) {
@@ -450,7 +440,7 @@ internal class PowerSyncDatabaseImpl(
         internalDb.writeTransaction { tx ->
             tx.getOptional("SELECT powersync_clear(?)", listOf(if (clearLocal) "1" else "0")) {}
         }
-        currentStatus.update(lastSyncedAt = null, hasSynced = false)
+        currentStatus.update { copy(lastSyncedAt = null, hasSynced = false) }
     }
 
     private suspend fun updateHasSynced() {
@@ -490,11 +480,13 @@ internal class PowerSyncDatabaseImpl(
             }
         }
 
-        currentStatus.update(
-            hasSynced = lastSyncedAt != null,
-            lastSyncedAt = lastSyncedAt,
-            priorityStatusEntries = priorityStatus,
-        )
+        currentStatus.update {
+            copy(
+                hasSynced = lastSyncedAt != null,
+                lastSyncedAt = lastSyncedAt,
+                priorityStatusEntries = priorityStatus,
+            )
+        }
     }
 
     override suspend fun waitForFirstSync() = waitForFirstSyncImpl(null)
@@ -534,20 +526,9 @@ internal class PowerSyncDatabaseImpl(
      * Check that a supported version of the powersync extension is loaded.
      */
     private fun checkVersion(powerSyncVersion: String) {
-        // Parse version
-        val versionInts: List<Int> =
-            try {
-                powerSyncVersion
-                    .split(Regex("[./]"))
-                    .take(3)
-                    .map { it.toInt() }
-            } catch (e: Exception) {
-                throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $powerSyncVersion. Details: $e")
-            }
-
-        // Validate ^0.2.0
-        if (versionInts[0] != 0 || versionInts[1] < 2 || versionInts[2] < 0) {
-            throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $powerSyncVersion")
+        val version = PowerSyncVersion.parse(powerSyncVersion)
+        if (version < PowerSyncVersion.MINIMUM) {
+            PowerSyncVersion.mismatchError(powerSyncVersion)
         }
     }
 }
diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt
new file mode 100644
index 00000000..c9781cb7
--- /dev/null
+++ b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt
@@ -0,0 +1,49 @@
+package com.powersync.db.internal
+
+internal data class PowerSyncVersion(
+    val major: Int,
+    val minor: Int,
+    val patch: Int,
+) : Comparable<PowerSyncVersion> {
+    override fun compareTo(other: PowerSyncVersion): Int =
+        when (val compareMajor = major.compareTo(other.major)) {
+            0 ->
+                when (val compareMinor = minor.compareTo(other.minor)) {
+                    0 -> patch.compareTo(other.patch)
+                    else -> compareMinor
+                }
+            else -> compareMajor
+        }
+
+    override fun toString(): String = "$major.$minor.$patch"
+
+    companion object {
+        val MINIMUM: PowerSyncVersion = PowerSyncVersion(0, 3, 14)
+
+        fun parse(from: String): PowerSyncVersion {
+            val versionInts: List<Int> =
+                try {
+                    from
+                        .split(Regex("[./]"))
+                        .take(3)
+                        .map { it.toInt() }
+                } catch (e: Exception) {
+                    mismatchError(from, e.toString())
+                }
+
+            return PowerSyncVersion(versionInts[0], versionInts[1], versionInts[2])
+        }
+
+        fun mismatchError(
+            actualVersion: String,
+            details: String? = null,
+        ): Nothing {
+            var message = "Unsupported PowerSync extension version (need ^$MINIMUM, got $actualVersion)."
+            if (details != null) {
+                message = " Details: $details"
+            }
+
+            throw Exception(message)
+        }
+    }
+}
diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt
new file mode 100644
index 00000000..8c6b64e0
--- /dev/null
+++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt
@@ -0,0 +1,146 @@
+package com.powersync.sync
+
+import com.powersync.bucket.BucketPriority
+import com.powersync.bucket.Checkpoint
+import com.powersync.bucket.LocalOperationCounters
+
+/**
+ * Information about a progressing download.
+ *
+ * This reports the [totalOperations] amount of operations to download, how many of them have already been
+ * [downloadedOperations] and finally a [fraction] indicating relative progress.
+ *
+ * To obtain a [ProgressWithOperations] instance, use a method on [SyncDownloadProgress] which in turn is available
+ * on [SyncStatusData].
+ */
+public interface ProgressWithOperations {
+    /**
+     * How many operations need to be downloaded in total for the current download to complete.
+     */
+    public val totalOperations: Int
+
+    /**
+     * How many operations, out of [totalOperations], have already been downloaded.
+     */
+    public val downloadedOperations: Int
+
+    /**
+     * The relative amount of [totalOperations] to items in [downloadedOperations], as a number between `0.0` and `1.0` (inclusive).
+     *
+     * When this number reaches `1.0`, all changes have been received from the sync service.
+     * Actually applying these changes happens before the [SyncStatusData.downloadProgress] field is
+     * cleared though, so progress can stay at `1.0` for a short while before completing.
+     */
+    public val fraction: Float get() {
+        if (totalOperations == 0) {
+            return 0.0f
+        }
+
+        return downloadedOperations.toFloat() / totalOperations.toFloat()
+    }
+}
+
+internal data class ProgressInfo(
+    override val downloadedOperations: Int,
+    override val totalOperations: Int,
+) : ProgressWithOperations
+
+/**
+ * Provides realtime progress on how PowerSync is downloading rows.
+ *
+ * This type reports progress by implementing [ProgressWithOperations], meaning that the [totalOperations],
+ * [downloadedOperations] and [fraction] getters are available on this instance.
+ * Additionally, it's possible to obtain the progress towards a specific priority only (instead of tracking progress for
+ * the entire download) by using [untilPriority].
+ *
+ * The reported progress always reflects the status towards the end of a sync iteration (after which a consistent
+ * snapshot of all buckets is available locally).
+ *
+ * In rare cases (in particular, when a [compacting](https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets)
+ * operation takes place between syncs), it's possible for the returned numbers to be slightly inaccurate. For this
+ * reason, [SyncDownloadProgress] should be seen as an approximation of progress. The information returned is good
+ * enough to build progress bars, but not exact enough to track individual download counts.
+ *
+ * Also note that data is downloaded in bulk, which means that individual counters are unlikely to be updated
+ * one-by-one.
+ */
+@ConsistentCopyVisibility
+public data class SyncDownloadProgress private constructor(
+    private val buckets: Map<String, BucketProgress>,
+) : ProgressWithOperations {
+    override val downloadedOperations: Int
+    override val totalOperations: Int
+
+    init {
+        val (target, completed) = targetAndCompletedCounts(BucketPriority.FULL_SYNC_PRIORITY)
+        totalOperations = target
+        downloadedOperations = completed
+    }
+
+    /**
+     * Creates download progress information from the local progress counters since the last full sync and the target
+     * checkpoint.
+     */
+    internal constructor(localProgress: Map<String, LocalOperationCounters>, target: Checkpoint) : this(
+        buildMap {
+            for (entry in target.checksums) {
+                val savedProgress = localProgress[entry.bucket]
+
+                put(
+                    entry.bucket,
+                    BucketProgress(
+                        priority = entry.priority,
+                        atLast = savedProgress?.atLast ?: 0,
+                        sinceLast = savedProgress?.sinceLast ?: 0,
+                        targetCount = entry.count ?: 0,
+                    ),
+                )
+            }
+        },
+    )
+
+    /**
+     * Returns download progress towards all data up until the specified [priority] being received.
+     *
+     * The returned [ProgressWithOperations] instance tracks the target amount of operations that need to be downloaded
+     * in total and how many of them have already been received.
+     */
+    public fun untilPriority(priority: BucketPriority): ProgressWithOperations {
+        val (total, completed) = targetAndCompletedCounts(priority)
+        return ProgressInfo(totalOperations = total, downloadedOperations = completed)
+    }
+
+    internal fun incrementDownloaded(batch: SyncDataBatch): SyncDownloadProgress =
+        SyncDownloadProgress(
+            buildMap {
+                putAll(this@SyncDownloadProgress.buckets)
+
+                for (bucket in batch.buckets) {
+                    val previous = get(bucket.bucket) ?: continue
+                    put(
+                        bucket.bucket,
+                        previous.copy(
+                            sinceLast = previous.sinceLast + bucket.data.size,
+                        ),
+                    )
+                }
+            },
+        )
+
+    private fun targetAndCompletedCounts(priority: BucketPriority): Pair<Int, Int> =
+        buckets.values
+            .asSequence()
+            .filter { it.priority >= priority }
+            .fold(0 to 0) { (prevTarget, prevCompleted), entry ->
+                (prevTarget + entry.total) to (prevCompleted + entry.sinceLast)
+            }
+}
+
+private data class BucketProgress(
+    val priority: BucketPriority,
+    val atLast: Int,
+    val sinceLast: Int,
+    val targetCount: Int,
+) {
+    val total get(): Int = targetCount - atLast
+}
diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt
index ce4fde57..067080b9 100644
--- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt
+++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt
@@ -5,6 +5,7 @@ import com.powersync.connectors.PowerSyncBackendConnector
 import kotlinx.coroutines.flow.MutableStateFlow
 import kotlinx.coroutines.flow.SharedFlow
 import kotlinx.coroutines.flow.asSharedFlow
+import kotlinx.datetime.Clock
 import kotlinx.datetime.Instant
 
 @ConsistentCopyVisibility
@@ -36,6 +37,15 @@ public interface SyncStatusData {
      */
     public val downloading: Boolean
 
+    /**
+     * Realtime progress information about downloaded operations during an active sync.
+     *
+     *
+     * For more information on what progress is reported, see [SyncDownloadProgress].
+     * This value will be non-null only if [downloading] is true.
+     */
+    public val downloadProgress: SyncDownloadProgress?
+
     /**
      * true if uploading changes
      */
@@ -106,6 +116,7 @@ internal data class SyncStatusDataContainer(
     override val connected: Boolean = false,
     override val connecting: Boolean = false,
     override val downloading: Boolean = false,
+    override val downloadProgress: SyncDownloadProgress? = null,
     override val uploading: Boolean = false,
     override val lastSyncedAt: Instant? = null,
     override val hasSynced: Boolean? = null,
@@ -115,6 +126,21 @@ internal data class SyncStatusDataContainer(
 ) : SyncStatusData {
     override val anyError
         get() = downloadError ?: uploadError
+
+    internal fun abortedDownload() =
+        copy(
+            downloading = false,
+            downloadProgress = null,
+        )
+
+    internal fun copyWithCompletedDownload() =
+        copy(
+            lastSyncedAt = Clock.System.now(),
+            downloading = false,
+            downloadProgress = null,
+            hasSynced = true,
+            downloadError = null,
+        )
 }
 
 @ConsistentCopyVisibility
@@ -131,34 +157,17 @@ public data class SyncStatus internal constructor(
     /**
      * Updates the internal sync status indicators and emits Flow updates
      */
-    internal fun update(
-        connected: Boolean? = null,
-        connecting: Boolean? = null,
-        downloading: Boolean? = null,
-        uploading: Boolean? = null,
-        hasSynced: Boolean? = null,
-        lastSyncedAt: Instant? = null,
-        uploadError: Any? = null,
-        downloadError: Any? = null,
-        clearUploadError: Boolean = false,
-        clearDownloadError: Boolean = false,
-        priorityStatusEntries: List<PriorityStatusEntry>? = null,
-    ) {
-        data =
-            data.copy(
-                connected = connected ?: data.connected,
-                connecting = connecting ?: data.connecting,
-                downloading = downloading ?: data.downloading,
-                uploading = uploading ?: data.uploading,
-                lastSyncedAt = lastSyncedAt ?: data.lastSyncedAt,
-                hasSynced = hasSynced ?: data.hasSynced,
-                priorityStatusEntries = priorityStatusEntries ?: data.priorityStatusEntries,
-                uploadError = if (clearUploadError) null else uploadError,
-                downloadError = if (clearDownloadError) null else downloadError,
-            )
+    internal inline fun update(makeCopy: SyncStatusDataContainer.() -> SyncStatusDataContainer) {
+        data = data.makeCopy()
         stateFlow.value = data
     }
 
+    internal suspend fun trackOther(source: SyncStatus) {
+        source.stateFlow.collect {
+            update { it }
+        }
+    }
+
     override val anyError: Any?
         get() = data.anyError
 
@@ -171,6 +180,9 @@ public data class SyncStatus internal constructor(
     override val downloading: Boolean
         get() = data.downloading
 
+    override val downloadProgress: SyncDownloadProgress?
+        get() = data.downloadProgress
+
     override val uploading: Boolean
         get() = data.uploading
 
diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
index 56859443..81556310 100644
--- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
+++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
@@ -72,7 +72,7 @@ internal class SyncStream(
         var invalidCredentials = false
         clientId = bucketStorage.getClientId()
         while (true) {
-            status.update(connecting = true)
+            status.update { copy(connecting = true) }
             try {
                 if (invalidCredentials) {
                     // This may error. In that case it will be retried again on the next
@@ -87,15 +87,9 @@ internal class SyncStream(
                 }
 
                 logger.e("Error in streamingSync: ${e.message}")
-                status.update(
-                    downloadError = e,
-                )
+                status.update { copy(downloadError = e) }
             } finally {
-                status.update(
-                    connected = false,
-                    connecting = true,
-                    downloading = false,
-                )
+                status.update { copy(connected = false, connecting = true, downloading = false) }
                 delay(retryDelayMs)
             }
         }
@@ -142,7 +136,7 @@ internal class SyncStream(
                     }
 
                     checkedCrudItem = nextCrudItem
-                    status.update(uploading = true)
+                    status.update { copy(uploading = true) }
                     uploadCrud()
                 } else {
                     // Uploading is completed
@@ -150,7 +144,7 @@ internal class SyncStream(
                     break
                 }
             } catch (e: Exception) {
-                status.update(uploading = false, uploadError = e)
+                status.update { copy(uploading = false, uploadError = e) }
 
                 if (e is CancellationException) {
                     throw e
@@ -161,7 +155,7 @@ internal class SyncStream(
                 break
             }
         }
-        status.update(uploading = false)
+        status.update { copy(uploading = false) }
     }
 
     private suspend fun getWriteCheckpoint(): String {
@@ -215,7 +209,7 @@ internal class SyncStream(
                     throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}")
                 }
 
-                status.update(connected = true, connecting = false)
+                status.update { copy(connected = true, connecting = false) }
                 val channel: ByteReadChannel = httpResponse.body()
 
                 while (!channel.isClosedForRead) {
@@ -260,7 +254,7 @@ internal class SyncStream(
             }
         }
 
-        status.update(downloading = false)
+        status.update { abortedDownload() }
 
         return state
     }
@@ -294,7 +288,6 @@ internal class SyncStream(
     ): SyncStreamState {
         val (checkpoint) = line
         state.targetCheckpoint = checkpoint
-        status.update(downloading = true)
 
         val bucketsToDelete = state.bucketSet!!.toMutableList()
         val newBuckets = mutableSetOf<String>()
@@ -306,15 +299,30 @@ internal class SyncStream(
             }
         }
 
-        if (bucketsToDelete.size > 0) {
+        state.bucketSet = newBuckets
+        startTrackingCheckpoint(checkpoint, bucketsToDelete)
+
+        return state
+    }
+
+    private suspend fun startTrackingCheckpoint(
+        checkpoint: Checkpoint,
+        bucketsToDelete: List<String>,
+    ) {
+        val progress = bucketStorage.getBucketOperationProgress()
+        status.update {
+            copy(
+                downloading = true,
+                downloadProgress = SyncDownloadProgress(progress, checkpoint),
+            )
+        }
+
+        if (bucketsToDelete.isNotEmpty()) {
             logger.i { "Removing buckets [${bucketsToDelete.joinToString(separator = ", ")}]" }
         }
 
-        state.bucketSet = newBuckets
         bucketStorage.removeBuckets(bucketsToDelete)
         bucketStorage.setTargetCheckpoint(checkpoint)
-
-        return state
     }
 
     private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState {
@@ -343,12 +351,7 @@ internal class SyncStream(
             logger.i { "validated checkpoint ${state.appliedCheckpoint}" }
 
             state.validatedCheckpoint = state.targetCheckpoint
-            status.update(
-                lastSyncedAt = Clock.System.now(),
-                downloading = false,
-                hasSynced = true,
-                clearDownloadError = true,
-            )
+            status.update { copyWithCompletedDownload() }
         } else {
             logger.d { "Could not apply checkpoint. Waiting for next sync complete line" }
         }
@@ -376,20 +379,22 @@ internal class SyncStream(
             logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" }
         }
 
-        status.update(
-            priorityStatusEntries =
-                buildList {
-                    // All states with a higher priority can be deleted since this partial sync includes them.
-                    addAll(status.priorityStatusEntries.filter { it.priority >= line.priority })
-                    add(
-                        PriorityStatusEntry(
-                            priority = priority,
-                            lastSyncedAt = Clock.System.now(),
-                            hasSynced = true,
-                        ),
-                    )
-                },
-        )
+        status.update {
+            copy(
+                priorityStatusEntries =
+                    buildList {
+                        // All states with a higher priority can be deleted since this partial sync includes them.
+                        addAll(status.priorityStatusEntries.filter { it.priority >= line.priority })
+                        add(
+                            PriorityStatusEntry(
+                                priority = priority,
+                                lastSyncedAt = Clock.System.now(),
+                                hasSynced = true,
+                            ),
+                        )
+                    },
+            )
+        }
         return state
     }
 
@@ -402,8 +407,6 @@ internal class SyncStream(
             throw Exception("Checkpoint diff without previous checkpoint")
         }
 
-        status.update(downloading = true)
-
         val newBuckets = mutableMapOf<String, BucketChecksum>()
 
         state.targetCheckpoint!!.checksums.forEach { checksum ->
@@ -423,15 +426,7 @@ internal class SyncStream(
             )
 
         state.targetCheckpoint = newCheckpoint
-
-        state.bucketSet = newBuckets.keys.toMutableSet()
-
-        val bucketsToDelete = checkpointDiff.removedBuckets
-        if (bucketsToDelete.isNotEmpty()) {
-            logger.d { "Remove buckets $bucketsToDelete" }
-        }
-        bucketStorage.removeBuckets(bucketsToDelete)
-        bucketStorage.setTargetCheckpoint(state.targetCheckpoint!!)
+        startTrackingCheckpoint(newCheckpoint, checkpointDiff.removedBuckets)
 
         return state
     }
@@ -440,8 +435,9 @@ internal class SyncStream(
         data: SyncLine.SyncDataBucket,
         state: SyncStreamState,
     ): SyncStreamState {
-        status.update(downloading = true)
-        bucketStorage.saveSyncData(SyncDataBatch(listOf(data)))
+        val batch = SyncDataBatch(listOf(data))
+        status.update { copy(downloading = true, downloadProgress = downloadProgress?.incrementDownloaded(batch)) }
+        bucketStorage.saveSyncData(batch)
         return state
     }
 
diff --git a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt
new file mode 100644
index 00000000..c43e7c51
--- /dev/null
+++ b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt
@@ -0,0 +1,15 @@
+package com.powersync.sync
+
+import kotlin.test.Test
+import kotlin.test.assertEquals
+
+class ProgressTest {
+    @Test
+    fun reportsFraction() {
+        assertEquals(0.0f, ProgressInfo(0, 10).fraction)
+        assertEquals(0.5f, ProgressInfo(5, 10).fraction)
+        assertEquals(1.0f, ProgressInfo(10, 10).fraction)
+
+        assertEquals(0.0f, ProgressInfo(0, 0).fraction)
+    }
+}
diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt
index b9011a46..558ada3c 100644
--- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt
+++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt
@@ -33,6 +33,7 @@ import io.ktor.client.HttpClient
 import io.ktor.client.engine.mock.MockEngine
 import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.delay
+import kotlinx.coroutines.job
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.test.runTest
 import kotlinx.coroutines.withTimeout
@@ -80,6 +81,7 @@ class SyncStreamTest {
                         checkpointValid = true,
                         checkpointFailures = emptyList(),
                     )
+                everySuspend { getBucketOperationProgress() } returns mapOf()
             }
         connector =
             mock<PowerSyncBackendConnector> {
@@ -147,7 +149,7 @@ class SyncStreamTest {
                     scope = this,
                 )
 
-            syncStream.status.update(connected = true)
+            syncStream.status.update { copy(connected = true) }
             syncStream.triggerCrudUploadAsync().join()
 
             testLogWriter.assertCount(2)
@@ -295,6 +297,7 @@ class SyncStreamTest {
 
                     verifySuspend(order) {
                         if (priorityNo == 0) {
+                            bucketStorage.getBucketOperationProgress()
                             bucketStorage.removeBuckets(any())
                             bucketStorage.setTargetCheckpoint(any())
                         }
diff --git a/demos/supabase-todolist/settings.gradle.kts b/demos/supabase-todolist/settings.gradle.kts
index 73016629..3e4cd6c1 100644
--- a/demos/supabase-todolist/settings.gradle.kts
+++ b/demos/supabase-todolist/settings.gradle.kts
@@ -44,15 +44,18 @@ val useReleasedVersions = localProperties.getProperty("USE_RELEASED_POWERSYNC_VE
 if (!useReleasedVersions) {
     includeBuild("../..") {
         dependencySubstitution {
-            substitute(module("com.powersync:core"))
-                .using(project(":core"))
-                .because("we want to auto-wire up sample dependency")
-            substitute(module("com.powersync:persistence"))
-                .using(project(":persistence"))
-                .because("we want to auto-wire up sample dependency")
-            substitute(module("com.powersync:connector-supabase"))
-                .using(project(":connectors:supabase"))
-                .because("we want to auto-wire up sample dependency")
+            val replacements = mapOf(
+                "core" to "core",
+                "persistence" to "persistence",
+                "connector-supabase" to "connectors:supabase",
+                "compose" to "compose"
+            )
+
+            replacements.forEach { (moduleName, projectName) ->
+                substitute(module("com.powersync:$moduleName"))
+                    .using(project(":$projectName"))
+                    .because("we want to auto-wire up sample dependency")
+            }
         }
     }
 }
diff --git a/demos/supabase-todolist/shared/build.gradle.kts b/demos/supabase-todolist/shared/build.gradle.kts
index f2a59ab1..708ed52c 100644
--- a/demos/supabase-todolist/shared/build.gradle.kts
+++ b/demos/supabase-todolist/shared/build.gradle.kts
@@ -44,6 +44,7 @@ kotlin {
             // at: https://central.sonatype.com/artifact/com.powersync/core
             api("com.powersync:core:latest.release")
             implementation("com.powersync:connector-supabase:latest.release")
+            implementation("com.powersync:compose:latest.release")
             implementation(libs.uuid)
             implementation(compose.runtime)
             implementation(compose.foundation)
diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt
index c1a1cb95..cb7141c1 100644
--- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt
+++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt
@@ -5,7 +5,6 @@ import androidx.compose.material.MaterialTheme
 import androidx.compose.runtime.Composable
 import androidx.compose.runtime.LaunchedEffect
 import androidx.compose.runtime.collectAsState
-import androidx.compose.runtime.derivedStateOf
 import androidx.compose.runtime.getValue
 import androidx.compose.runtime.mutableStateOf
 import androidx.compose.runtime.remember
@@ -13,7 +12,7 @@ import androidx.compose.runtime.rememberUpdatedState
 import androidx.compose.ui.Modifier
 import com.powersync.DatabaseDriverFactory
 import com.powersync.PowerSyncDatabase
-import com.powersync.bucket.BucketPriority
+import com.powersync.compose.composeState
 import com.powersync.connector.supabase.SupabaseConnector
 import com.powersync.connectors.PowerSyncBackendConnector
 import com.powersync.demos.components.EditDialog
@@ -25,7 +24,6 @@ import com.powersync.demos.screens.HomeScreen
 import com.powersync.demos.screens.SignInScreen
 import com.powersync.demos.screens.SignUpScreen
 import com.powersync.demos.screens.TodosScreen
-import kotlinx.coroutines.flow.debounce
 import kotlinx.coroutines.runBlocking
 import org.koin.compose.KoinApplication
 import org.koin.compose.koinInject
@@ -71,19 +69,8 @@ fun AppContent(
     db: PowerSyncDatabase = koinInject(),
     modifier: Modifier = Modifier,
 ) {
-    // Debouncing the status flow prevents flicker
-    val status by db.currentStatus
-        .asFlow()
-        .debounce(200)
-        .collectAsState(initial = db.currentStatus)
-
-    // This assumes that the buckets for lists has a priority of 1 (but it will work fine with sync
-    // rules not defining any priorities at all too). When giving lists a higher priority than
-    // items, we can have a consistent snapshot of lists without items. In the case where many items
-    // exist (that might take longer to sync initially), this allows us to display lists earlier.
-    val hasSyncedLists by remember {
-        derivedStateOf { status.statusForPriority(BucketPriority(1)).hasSynced }
-    }
+
+    val status by db.currentStatus.composeState()
 
     val authViewModel = koinViewModel<AuthViewModel>()
     val navController = koinInject<NavController>()
diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt
new file mode 100644
index 00000000..52e5b521
--- /dev/null
+++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt
@@ -0,0 +1,81 @@
+package com.powersync.demos.components
+
+import androidx.compose.foundation.background
+import androidx.compose.foundation.layout.Arrangement
+import androidx.compose.foundation.layout.Column
+import androidx.compose.foundation.layout.fillMaxSize
+import androidx.compose.foundation.layout.fillMaxWidth
+import androidx.compose.foundation.layout.padding
+import androidx.compose.material.LinearProgressIndicator
+import androidx.compose.material.MaterialTheme
+import androidx.compose.material.Text
+import androidx.compose.runtime.Composable
+import androidx.compose.runtime.getValue
+import androidx.compose.ui.Alignment
+import androidx.compose.ui.Modifier
+import androidx.compose.ui.unit.dp
+import com.powersync.PowerSyncDatabase
+import com.powersync.bucket.BucketPriority
+import com.powersync.compose.composeState
+import com.powersync.sync.SyncStatusData
+import org.koin.compose.koinInject
+
+/**
+ * A component that renders its [content] only after a first complete sync was completed on [db].
+ *
+ * Before that, a progress indicator is shown instead.
+ */
+@Composable
+fun GuardBySync(
+    db: PowerSyncDatabase = koinInject(),
+    priority: BucketPriority? = null,
+    content: @Composable () -> Unit
+) {
+    val state: SyncStatusData by db.currentStatus.composeState()
+
+    if (state.hasSynced == true) {
+        content()
+        return
+    }
+
+    Column(
+        modifier = Modifier.fillMaxSize().background(MaterialTheme.colors.background),
+        horizontalAlignment = Alignment.CenterHorizontally,
+        verticalArrangement = Arrangement.Center,
+    ) {
+        // When we have no hasSynced information, the database is still being opened. We just show a
+        // generic progress bar in that case.
+        val databaseOpening = state.hasSynced == null
+
+        if (!databaseOpening) {
+            Text(
+                text = "Busy with initial sync...",
+                style = MaterialTheme.typography.h6,
+            )
+        }
+
+        val progress = state.downloadProgress?.let {
+            if (priority == null) {
+                it
+            } else {
+                it.untilPriority(priority)
+            }
+        }
+        if (progress != null) {
+            LinearProgressIndicator(
+                modifier = Modifier.fillMaxWidth().padding(8.dp),
+                progress = progress.fraction,
+            )
+
+            if (progress.downloadedOperations == progress.totalOperations) {
+                Text("Applying server-side changes...")
+            } else {
+                Text("Downloaded ${progress.downloadedOperations} out of ${progress.totalOperations}.")
+            }
+        } else {
+            LinearProgressIndicator(
+                modifier = Modifier.fillMaxWidth().padding(8.dp),
+            )
+        }
+    }
+}
diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt
index c543be58..4e53852e 100644
--- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt
+++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt
@@ -16,7 +16,9 @@ import androidx.compose.ui.Alignment
 import androidx.compose.ui.Modifier
 import androidx.compose.ui.text.style.TextAlign
 import androidx.compose.ui.unit.dp
+import com.powersync.bucket.BucketPriority
 import com.powersync.demos.Screen
+import com.powersync.demos.components.GuardBySync
 import com.powersync.demos.components.Input
 import com.powersync.demos.components.ListContent
 import com.powersync.demos.components.Menu
@@ -57,34 +59,25 @@ internal fun HomeScreen(
             },
         )
 
-        when {
-            syncStatus.hasSynced == null || syncStatus.hasSynced == false -> {
-                Box(
-                    modifier = Modifier.fillMaxSize().background(MaterialTheme.colors.background),
-                    contentAlignment = Alignment.Center,
-                ) {
-                    Text(
-                        text = "Busy with initial sync...",
-                        style = MaterialTheme.typography.h6,
-                    )
-                }
-            }
+        // This assumes that the buckets for lists has a priority of 1 (but it will work fine with
+        // sync rules not defining any priorities at all too). When giving lists a higher priority
+        // than items, we can have a consistent snapshot of lists without items. In the case where
+        // many items exist (that might take longer to sync initially), this allows us to display
+        // lists earlier.
+        GuardBySync(priority = BucketPriority(1)) {
+            Input(
+                text = inputText,
+                onAddClicked = onAddItemClicked,
+                onTextChanged = onInputTextChanged,
+                screen = Screen.Home,
+            )
 
-            else -> {
-                Input(
-                    text = inputText,
-                    onAddClicked = onAddItemClicked,
-                    onTextChanged = onInputTextChanged,
-                    screen = Screen.Home,
+            Box(Modifier.weight(1F)) {
+                ListContent(
+                    items = items,
+                    onItemClicked = onItemClicked,
+                    onItemDeleteClicked = onItemDeleteClicked,
                 )
-
-                Box(Modifier.weight(1F)) {
-                    ListContent(
-                        items = items,
-                        onItemClicked = onItemClicked,
-                        onItemDeleteClicked = onItemDeleteClicked,
-                    )
-                }
             }
         }
     }
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index bd5f8496..a617db08 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -17,7 +17,7 @@ kotlinx-datetime = "0.6.2"
 kotlinx-io = "0.5.4"
 ktor = "3.0.1"
 uuid = "0.8.2"
-powersync-core = "0.3.12"
+powersync-core = "0.3.14"
 sqlite-jdbc = "3.49.1.0"
 sqliter = "1.3.1"
 turbine = "1.2.0"