diff --git a/rustfmt.toml b/rustfmt.toml
new file mode 100644
index 0000000..8fa5544
--- /dev/null
+++ b/rustfmt.toml
@@ -0,0 +1,15 @@
+use_field_init_shorthand = true
+use_try_shorthand = true
+match_block_trailing_comma = true
+
+# Nightly only options:
+unstable_features = true
+condense_wildcard_suffixes = true
+format_strings = true
+imports_granularity = "Crate"
+reorder_impl_items = true
+imports_layout = "Vertical"
+group_imports = "StdExternalCrate"
+wrap_comments = true
+normalize_comments = false
+error_on_line_overflow = true
\ No newline at end of file
diff --git a/src/client/subscription.rs b/src/client/subscription.rs
index b361d3f..3c6e1e7 100644
--- a/src/client/subscription.rs
+++ b/src/client/subscription.rs
@@ -44,7 +44,8 @@ pub struct QuerySubscription {
     pub(super) subscriber_id: SubscriberId,
     pub(super) request_sender: UnboundedSender<ClientRequest>,
     pub(super) watch: BroadcastStream<QueryResults>,
-    pub(super) initial: Option<FunctionResult>,
+    pub(super) sent_initial_value: bool,
+    pub(super) last_value: Option<FunctionResult>,
 }
 impl QuerySubscription {
     /// Returns an identifier for this subscription based on its query and args.
@@ -84,8 +85,11 @@ impl Stream for QuerySubscription {
         mut self: Pin<&mut Self>,
         cx: &mut task::Context<'_>,
     ) -> task::Poll<Option<Self::Item>> {
-        if let Some(initial) = self.initial.take() {
-            return task::Poll::Ready(Some(initial));
+        if !self.sent_initial_value {
+            self.sent_initial_value = true;
+            if let Some(value) = self.last_value.clone() {
+                return task::Poll::Ready(Some(value));
+            }
         }
         loop {
             return match self.watch.poll_next_unpin(cx) {
@@ -97,6 +101,11 @@ impl Stream for QuerySubscription {
                         // No result yet in the query result set. Keep polling.
                         continue;
                     };
+                    if Some(value) == self.last_value.as_ref() {
+                        // Redundant
+                        continue;
+                    }
+                    self.last_value = Some(value.clone());
                     task::Poll::Ready(Some(value.clone()))
                 },
                 task::Poll::Ready(None) => task::Poll::Ready(None),
diff --git a/src/client/worker.rs b/src/client/worker.rs
index efa1689..560f195 100644
--- a/src/client/worker.rs
+++ b/src/client/worker.rs
@@ -168,7 +168,8 @@ async fn _worker_once<T: SyncProtocol>(
                         subscriber_id,
                         request_sender,
                         watch,
-                        initial: base_client.latest_results().get(&subscriber_id).cloned(),
+                        sent_initial_value: false,
+                        last_value: base_client.latest_results().get(&subscriber_id).cloned(),
                     };
                     let _ = tx.send(subscription);
                 },