From 4320b74de6d012f7a00f395e7821321602f5eb3a Mon Sep 17 00:00:00 2001 From: Nikita Prokopov Date: Thu, 20 Feb 2025 23:01:54 +0000 Subject: [PATCH] =?UTF-8?q?Revert=20"Simplify=20grouped=20queue=202=20(#91?= =?UTF-8?q?2)"=20and=20"Fix=20flaky=20grouped-queue=E2=80=A6=20(#916)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/instant/flags_impl.clj | 6 +- server/src/instant/gauges.clj | 2 +- server/src/instant/grouped_queue.clj | 495 ++++++++++++------ server/src/instant/machine_summaries.clj | 12 +- server/src/instant/reactive/ephemeral.clj | 26 +- server/src/instant/reactive/invalidator.clj | 104 ++-- server/src/instant/reactive/receive_queue.clj | 45 +- server/src/instant/reactive/session.clj | 267 ++++++---- server/src/tool.clj | 2 +- server/test/instant/grouped_queue_test.clj | 95 ---- .../instant/reactive/invalidator_test.clj | 4 +- server/test/instant/reactive/session_test.clj | 11 +- 12 files changed, 609 insertions(+), 460 deletions(-) delete mode 100644 server/test/instant/grouped_queue_test.clj diff --git a/server/src/instant/flags_impl.clj b/server/src/instant/flags_impl.clj index 0f3207fdf..8d089ed3b 100644 --- a/server/src/instant/flags_impl.clj +++ b/server/src/instant/flags_impl.clj @@ -8,7 +8,7 @@ [instant.db.transaction :as tx] [instant.jdbc.aurora :as aurora] [instant.model.app :as app-model] - [instant.reactive.receive-queue :as receive-queue] + [instant.reactive.receive-queue :refer [receive-q]] [instant.reactive.session :as session] [instant.reactive.store :as store] [instant.util.instaql :refer [instaql-nodes->object-tree]] @@ -65,7 +65,7 @@ socket {:id socket-id :http-req nil :ws-conn ws-conn - :receive-q receive-queue/receive-q + :receive-q receive-q :pending-handlers (atom #{})}] ;; Get results in foreground so that flags are initialized before we return @@ -81,7 +81,7 @@ :admin? true}) (doseq [{:keys [query]} queries] (session/on-message {:id socket-id - :receive-q receive-queue/receive-q + :receive-q receive-q :data (->json {:op :add-query :q query :return-type "tree"})})) diff --git a/server/src/instant/gauges.clj b/server/src/instant/gauges.clj index 7896c1426..6eed6626d 100644 --- a/server/src/instant/gauges.clj +++ b/server/src/instant/gauges.clj @@ -106,7 +106,7 @@ (catch Throwable t [{:path "instant.gauges.metric-fn-error" :value (.getMessage t)}])))])] - (into {} (keep (juxt :path :value) metrics)))) + (into {} (map (juxt :path :value) metrics)))) (comment (gauges)) diff --git a/server/src/instant/grouped_queue.clj b/server/src/instant/grouped_queue.clj index 00c98a349..9bc37ffb3 100644 --- a/server/src/instant/grouped_queue.clj +++ b/server/src/instant/grouped_queue.clj @@ -1,169 +1,334 @@ (ns instant.grouped-queue + (:refer-clojure :exclude [peek]) (:require - [clojure+.core :as clojure+] - [instant.config :as config] - [instant.gauges :as gauges] + [instant.util.async :as ua] [instant.util.tracer :as tracer]) (:import - (java.util Map Queue) - (java.util.concurrent ConcurrentHashMap ConcurrentLinkedQueue Executors ExecutorService LinkedBlockingQueue ThreadPoolExecutor TimeUnit) - (java.util.concurrent.atomic AtomicInteger))) - -(defn- poll - "Gets 0..∞ items from group, fetching as many combinable items as possible in a row. - Returns 1 (possibly combined) item or nil" - [group combine-fn] - (loop [item1 (Queue/.poll group)] - (clojure+/cond+ - (nil? item1) nil - :let [item2 (Queue/.peek group)] - (nil? item2) item1 - :let [item12 (combine-fn item1 item2)] - (nil? item12) item1 - :else (do - (Queue/.remove group) ;; remove item2 - (recur (assoc item12 ::combined (inc (::combined item1 1)))))))) - -(defn- process - "Main worker process function" - [{:keys [groups process-fn combine-fn num-workers num-items processing?] :as q} key group] - (AtomicInteger/.incrementAndGet num-workers) - (loop [] - (when @processing? - (if-some [item (poll group combine-fn)] - (do - (try - (process-fn key item) - (catch Throwable t - (tracer/record-exception-span! t {:name "grouped-queue/process-error"}))) - (AtomicInteger/.addAndGet num-items (- (::combined item 1))) - (recur)) - (when (= ::loop (locking q - (if (some? (Queue/.peek group)) - ::loop - (Map/.remove groups key)))) - (recur))))) - (AtomicInteger/.decrementAndGet num-workers)) - -(defn put! - "Schedule item for execution on q" - [{:keys [executor groups group-key-fn num-items accepting?] :as q} item] - (when @accepting? - (let [item (assoc item ::put-at (System/currentTimeMillis)) - key (or (group-key-fn item) ::default)] - (locking q - (if-some [group (Map/.get groups key)] - (Queue/.offer group item) - (let [group (ConcurrentLinkedQueue. [item])] - (Map/.put groups key group) - (ExecutorService/.submit executor ^Runnable #(process q key group)))) - (AtomicInteger/.incrementAndGet num-items))))) - -(defn- longest-wait-time [groups] - (when-some [items (->> groups - (Map/.values) - (keep Queue/.peek) - not-empty)] - (- (System/currentTimeMillis) (transduce (map ::put-at) min items)))) - -(defn start - "Options: - - :group-key-fn :: (fn [item]) -> Any - - A function to determine to which “track” to send item for processing. - All tracks are processed in parallel, items inside one track are processed sequentially. - - :combine-fn :: (fn [item1 item2]) -> item | nil - - A function that can optionally combine two items into one before processing. - Return nil if items shouldn’t be combined. - - :process-fn :: (fn [group-key item]) - - Main processing function. Item passed to it might have additional ::combined and ::put-at keys. - - :executor :: ExecutorService | nil - - An exectutor to use to run worker threads. Should support unbounded task queue. - - :max-workers :: long | nil - - If exectutor is not provided, ~ cached thread pool will be created with at most this many threads. - - :metrics-path :: String | nil - - A string to report gauge metrics to. If skipped, no reporting" - [{:keys [group-key-fn combine-fn process-fn executor max-workers metrics-path] - :or {max-workers 2}}] - (let [groups (ConcurrentHashMap.) - accepting? (atom true) - processing? (atom true) - num-items (AtomicInteger. 0) - num-workers (AtomicInteger. 0) - executor (cond - (some? executor) - executor - - config/fewer-vfutures? - (doto (ThreadPoolExecutor. max-workers max-workers 1 TimeUnit/SECONDS (LinkedBlockingQueue.)) - (.allowCoreThreadTimeOut true)) - - :else - (Executors/newVirtualThreadPerTaskExecutor)) - cleanup-fn (when metrics-path - (gauges/add-gauge-metrics-fn - (fn [_] - [{:path (str metrics-path ".size") - :value (AtomicInteger/.get num-items)} - (when-some [t (longest-wait-time groups)] - {:path (str metrics-path ".longest-waiting-ms") - :value t}) - {:path (str metrics-path ".worker-count") - :value (AtomicInteger/.get num-workers)}]))) - shutdown-fn (fn [{:keys [timeout-ms] - :or {timeout-ms 1000}}] - (when cleanup-fn - (cleanup-fn)) - (reset! accepting? false) - (ExecutorService/.shutdown executor) - (if (ExecutorService/.awaitTermination executor timeout-ms TimeUnit/MILLISECONDS) - :shutdown - (do - (reset! processing? false) - (if (ExecutorService/.awaitTermination executor timeout-ms TimeUnit/MILLISECONDS) - :shutdown - (do - (ExecutorService/.shutdownNow executor) - :terminated)))))] - {:group-key-fn (or group-key-fn identity) - :combine-fn (or combine-fn (fn [_ _] nil)) - :process-fn process-fn - :groups groups - :accepting? accepting? - :processing? processing? - :num-items num-items - :num-workers num-workers - :executor executor - :shutdown-fn shutdown-fn})) - -(defn stop - "Stops grouped queue. Shuts executor down. Possible options: - - :timeout-ms :: long - - How long to wait for existing tasks to finish processing before interrupting." - ([q] - ((:shutdown-fn q) {})) - ([q opts] - ((:shutdown-fn q) opts))) - -(defn num-items - "~ Amount of items currently in all queues" - [q] - (AtomicInteger/.get (:num-items q))) - -(defn num-workers - "~ Amount of workers currently in all queues" - [q] - (AtomicInteger/.get (:num-workers q))) + (java.util.concurrent ConcurrentLinkedQueue Semaphore) + (java.util.concurrent.atomic AtomicInteger) + (clojure.lang PersistentQueue))) + +;; -------------- +;; inflight-queue + +(def persisted-q-empty PersistentQueue/EMPTY) + +(defn pop-times [n coll] + (reduce (fn [coll _] (pop coll)) coll (range n))) + +(comment + (def x (into persisted-q-empty [1 2 3])) + (pop-times 2 x) + (take 2 x)) + +(def inflight-queue-empty + {:pending persisted-q-empty + :working []}) + +(defn inflight-queue-put [inflight-queue item] + (update inflight-queue :pending conj item)) + +(defn inflight-queue-empty? [{:keys [pending working] :as _inflight-queue}] + (and (empty? pending) (empty? working))) + +(defn inflight-queue-workset [{:keys [working]}] + working) + +(defn inflight-queue-workset-clear [inflight-queue] + (assoc inflight-queue :working [])) + +(defn inflight-queue-peek-pending [{:keys [pending] :as _inflight-queue}] + (first pending)) + +(defn inflight-queue-reserve [max-items {:keys [pending working]}] + {:pending (pop-times max-items pending) + :working (into working (take max-items pending))}) + +(defn inflight-queue-reserve-all [{:keys [pending working]}] + {:pending persisted-q-empty + :working (into working pending)}) + +;; ------------- +;; grouped-queue + +(defn create [{:keys [group-fn + on-add]}] + {:size (AtomicInteger. 0) + :group-fn group-fn + :group-key->subqueue (atom {}) + :dispatch-queue (ConcurrentLinkedQueue.) + :on-add on-add}) + +(defn size [{:keys [^AtomicInteger size] :as _grouped-q}] + (.get size)) + +(defn put! [{:keys [group-fn + ^ConcurrentLinkedQueue dispatch-queue + group-key->subqueue + ^AtomicInteger size + on-add] + :as _grouped-q} item] + (let [group-key (group-fn item) + added (if (nil? group-key) + ;; This item is not to be grouped. + (do (.incrementAndGet size) + (.add dispatch-queue [:item item])) + + ;; This item will be grouped on `group-key` + (let [_ (.incrementAndGet size) + [prev] (locking group-key->subqueue + (swap-vals! group-key->subqueue + update + group-key + (fnil inflight-queue-put + inflight-queue-empty) + item)) + prev-subqueue (get prev group-key) + first-enqueue? (inflight-queue-empty? prev-subqueue)] + (when first-enqueue? + (.add dispatch-queue [:group-key group-key]))))] + (when (and added on-add) + (on-add)) + added)) + +(defn peek [{:keys [^ConcurrentLinkedQueue dispatch-queue + group-key->subqueue] :as _grouped-q}] + (let [[t arg :as entry] (.peek dispatch-queue)] + (cond + (nil? entry) nil + (= t :item) arg + (= t :group-key) (inflight-queue-peek-pending (get @group-key->subqueue arg))))) + +(defn default-reserve-fn [_ inflight-q] (inflight-queue-reserve 1 inflight-q)) + +(defn clear-subqueue [state group-key] + (let [subqueue (get state group-key) + cleared-subqueue (inflight-queue-workset-clear subqueue)] + (if (inflight-queue-empty? cleared-subqueue) + (dissoc state group-key) + (assoc state group-key cleared-subqueue)))) + +(defn process! + [{:keys [^ConcurrentLinkedQueue dispatch-queue + group-key->subqueue + ^AtomicInteger size] :as _grouped-q} + {:keys [reserve-fn + process-fn] + :or {reserve-fn default-reserve-fn}}] + (let [[t arg :as entry] (.poll dispatch-queue)] + (cond + (nil? entry) nil + + (= t :item) + (do + (process-fn nil [arg]) + (.decrementAndGet size) + true) + + (= t :group-key) + (let [group-key arg + + reserved (locking group-key->subqueue + (swap! group-key->subqueue update group-key (partial reserve-fn group-key))) + + reserved-subqueue (get reserved group-key) + + workset (inflight-queue-workset reserved-subqueue)] + + (try + (process-fn group-key workset) + true + (finally + (let [cleared (locking group-key->subqueue + (swap! group-key->subqueue clear-subqueue group-key)) + cleared-subqueue (get cleared group-key)] + (.addAndGet size (- (count workset))) + (when (inflight-queue-peek-pending cleared-subqueue) + (.add dispatch-queue [:group-key group-key]))))))))) + +(defn start-grouped-queue-with-workers [{:keys [group-fn + reserve-fn + process-fn + max-workers] + :or {max-workers 2}}] + (let [executor (ua/make-virtual-thread-executor) + workers (atom #{}) + ;; Use a promise so we can access it in the `on-add` function + grouped-queue (promise) + on-add (fn [] + (when (< (count @workers) max-workers) + (ua/worker-vfuture + executor + (loop [worker-id (Object.)] + (when (contains? (swap! workers + (fn [workers] + (if (= (count workers) max-workers) + workers + (conj workers worker-id)))) + worker-id) + (try + (loop [] + (when (process! @grouped-queue {:reserve-fn reserve-fn + :process-fn process-fn}) + ;; Continue processing items until the queue is empty + (recur))) + (catch Throwable t + (tracer/record-exception-span! t {:name "grouped-queue/process-error"})) + (finally + (swap! workers disj worker-id))) + ;; One last check to prevent a race where something is added to the queue + ;; while we're removing ourselves from the workers + (when (and (peek @grouped-queue) + (< (count @workers) max-workers)) + (recur worker-id)))))))] + (deliver grouped-queue (create {:group-fn group-fn + :on-add on-add})) + {:grouped-queue @grouped-queue + :get-worker-count (fn [] (count @workers)) + :virtual-thread-executor executor})) + +(defn start-grouped-queue-with-cpu-workers [{:keys [group-fn + reserve-fn + process-fn + worker-count] + :or {worker-count 2}}] + (let [semaphore (Semaphore. 0) + grouped-queue (create {:group-fn group-fn + :on-add (fn [] + (.release semaphore))}) + shutdown? (atom false) + workers (mapv (fn [_i] + (future + (loop [] + (.acquire semaphore) + (when-not @shutdown? + (try + (loop [] + (when (process! grouped-queue {:reserve-fn reserve-fn + :process-fn process-fn}) + (recur))) + (catch Throwable t + (tracer/record-exception-span! t {:name "grouped-queue-with-cpu-workers/process-error"}))) + (recur))))) + (range worker-count))] + {:grouped-queue grouped-queue + :get-worker-count (fn [] worker-count) + :workers workers + :shutdown (fn [] + (reset! shutdown? true) + (.release semaphore (* 2 worker-count)) + (doseq [w workers] + (when (= :timeout (deref w 1000 :timeout)) + (future-cancel w))))})) + +(comment + (def gq (create {:group-fn :k})) + (put! gq {:k :refresh}) + (put! gq {:k :refresh}) + (put! gq {:k :add-query}) + (put! gq {:k :refresh}) + (put! gq {:k :remove-query}) + (peek gq) + gq + (future + (process! gq + {:reserve-fn (fn [group-key inflight-queue] + (if (= group-key :refresh) + (inflight-queue-reserve-all inflight-queue) + (inflight-queue-reserve 1 inflight-queue))) + + :process-fn (fn [k workset] + (println "processing..." k workset) + #_(Thread/sleep 10000) + (println "done"))})) + + (require 'clojure.tools.logging) + + (defn test-grouped-queue [] + (let [finished (promise) + started (promise) + total-items (AtomicInteger. 0) + process-total (AtomicInteger. 0) + gq (promise) + q (start-grouped-queue-with-workers + {:group-fn :k + :reserve-fn (fn [_ iq] + (inflight-queue-reserve (max 1 (rand-int 25)) iq)) + :process-fn (fn [_ workset] + @started + (clojure.tools.logging/info {:name "workset" + :attributes {:workset-count (count workset) + :total total-items + :worker-count ((:get-worker-count @gq))}}) + (.addAndGet process-total (count workset)) + (when (zero? (.addAndGet total-items (- (count workset)))) + (deliver finished true)) + nil) + :max-workers 1000}) + _ (deliver gq q) + + wait (future + @started + (let [start (. System (nanoTime)) + + _ @finished + end (. System (nanoTime)) + ms (/ (double (- end start)) 1000000.0)] + (tool/def-locals) + (println (format "Elapsed %.2fms, total %d, %.2f / ms" + ms + (.get process-total) + (/ (.get process-total) ms)))))] + (dotimes [x 100] + (dotimes [y 100000] + (.incrementAndGet total-items) + (put! (:grouped-queue q) {:k y :i x}))) + (deliver started true) + (tool/def-locals) + @wait)) + + (test-grouped-queue) + + (defn test-cpu-grouped-queue [] + (let [finished (promise) + started (promise) + total-items (AtomicInteger. 0) + process-total (AtomicInteger. 0) + gq (promise) + q (start-grouped-queue-with-cpu-workers + {:group-fn :k + :reserve-fn (fn [_ iq] + (inflight-queue-reserve (max 1 (rand-int 25)) iq)) + :process-fn (fn [_ workset] + @started + (clojure.tools.logging/info {:name "workset" + :attributes {:workset-count (count workset) + :total total-items + :worker-count ((:get-worker-count @gq))}}) + (.addAndGet process-total (count workset)) + (when (zero? (.addAndGet total-items (- (count workset)))) + (deliver finished true)) + nil) + :worker-count 8}) + _ (deliver gq q) + + wait (future + @started + (let [start (. System (nanoTime)) + + _ @finished + end (. System (nanoTime)) + ms (/ (double (- end start)) 1000000.0)] + (tool/def-locals) + (println (format "Elapsed %.2fms, total %d, %.2f / ms" + ms + (.get process-total) + (/ (.get process-total) ms)))))] + (dotimes [x 100] + (dotimes [y 10000] + (.incrementAndGet total-items) + (put! (:grouped-queue q) {:k y :i x}))) + (deliver started true) + (tool/def-locals) + @wait + ((:shutdown q))))) diff --git a/server/src/instant/machine_summaries.clj b/server/src/instant/machine_summaries.clj index 7ee4d90a4..0ad680acc 100644 --- a/server/src/instant/machine_summaries.clj +++ b/server/src/instant/machine_summaries.clj @@ -2,9 +2,7 @@ (:require [instant.util.hazelcast :as hz] [instant.reactive.ephemeral :as eph] - [instant.reactive.store :as rs]) - (:import - (com.hazelcast.core HazelcastInstance IExecutorService))) + [instant.reactive.store :as rs])) (defn app-sessions->report [app-sessions] (let [[{:keys [app-id app-title creator-email]}] app-sessions @@ -29,8 +27,8 @@ (store->session-report rs/store)) (defn get-all-session-reports [hz] - (let [executor (HazelcastInstance/.getExecutorService hz "session-report-executor") - futures (IExecutorService/.submitToAllMembers executor (hz/->Task #'session-report-task))] + (let [executor (.getExecutorService hz "session-report-executor") + futures (.submitToAllMembers executor (hz/->Task #'session-report-task))] (into {} (for [[member fut] futures] [(str member) @fut])))) @@ -42,8 +40,8 @@ (rs/num-sessions rs/store)) (defn get-all-num-sessions [hz] - (let [executor (HazelcastInstance/.getExecutorService hz "session-nums-executor") - futures (IExecutorService/.submitToAllMembers executor (hz/->Task #'num-sessions-task))] + (let [executor (.getExecutorService hz "session-nums-executor") + futures (.submitToAllMembers executor (hz/->Task #'num-sessions-task))] (into {} (for [[member fut] futures] [(str member) @fut])))) diff --git a/server/src/instant/reactive/ephemeral.clj b/server/src/instant/reactive/ephemeral.clj index de7f9fc4c..b776d8504 100644 --- a/server/src/instant/reactive/ephemeral.clj +++ b/server/src/instant/reactive/ephemeral.clj @@ -159,14 +159,14 @@ :when q :let [just-joined? (and (contains? room-data sess-id) (not (contains? last-data sess-id)))]] - (receive-queue/put! q - {:op :refresh-presence - :app-id app-id - :room-id room-id - :data room-data - :edits (when-not just-joined? - edits) - :session-id sess-id})))))) + (receive-queue/enqueue->receive-q q + {:op :refresh-presence + :app-id app-id + :room-id room-id + :data room-data + :edits (when-not just-joined? + edits) + :session-id sess-id})))))) (defn handle-broadcast-message "Handles the message on the topic we use to broadcast a client-broadcast @@ -177,11 +177,11 @@ (doseq [sess-id session-ids :let [q (-> (rs/session store sess-id) :session/socket :receive-q)] :when q] - (receive-queue/put! q - (assoc base-msg - :op :server-broadcast - :session-id sess-id - :app-id app-id))))) + (receive-queue/enqueue->receive-q q + (assoc base-msg + :op :server-broadcast + :session-id sess-id + :app-id app-id))))) (defn broadcast [app-id session-ids base-msg] (.publish (get-hz-broadcast-topic) diff --git a/server/src/instant/reactive/invalidator.clj b/server/src/instant/reactive/invalidator.clj index 10bba2e3f..bfe5137d2 100644 --- a/server/src/instant/reactive/invalidator.clj +++ b/server/src/instant/reactive/invalidator.clj @@ -6,6 +6,7 @@ [instant.config :as config] [instant.db.model.attr :as attr-model] [instant.db.pg-introspect :as pg-introspect] + [instant.gauges :as gauges] [instant.grouped-queue :as grouped-queue] [instant.jdbc.aurora :as aurora] [instant.jdbc.wal :as wal] @@ -21,7 +22,7 @@ [instant.db.model.triple :as triple-model]) (:import (java.sql Timestamp) - (java.time Instant) + (java.time Duration Instant) (java.time.temporal ChronoUnit) (java.util Map UUID) (java.util.concurrent ConcurrentHashMap) @@ -29,8 +30,6 @@ (declare wal-opts) -(declare invalidator-q) - (defn columns->map ([columns] (columns->map columns false)) @@ -303,21 +302,27 @@ "Combines a list of wal-records into a single wal-record. We combine all of the change lists and advance the tx-id to the latest tx-id in the list." - [r1 r2] - (when (< (::grouped-queue/combined r1 1) 100) - ;; Complain loudly if we accidently mix wal-records from multiple apps - (when (not= (:app-id r1) (:app-id r2)) - (throw (ex-info "app-id mismatch in combine-wal-records" {:r1 r1 :r2 r2}))) - (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id r1) - :name "skipped-in-combined-wal-record"}) - - ;; Keep the old tx-created-at so that we see the worst case wal-latency-ms - (-> r1 - (update :attr-changes (fnil into []) (:attr-changes r2)) - (update :ident-changes (fnil into []) (:ident-changes r2)) - (update :triple-changes (fnil into []) (:triple-changes r2)) - (update :tx-bytes (fnil + 0) (:tx-bytes r2)) - (assoc :tx-id (:tx-id r2))))) + [wal-records] + (reduce (fn [acc {:keys [attr-changes + ident-changes + triple-changes + app-id + tx-id + tx-bytes]}] + ;; Complain loudly if we accidently mix wal-records from multiple apps + (assert (= (:app-id acc) app-id) "app-id mismatch in combine-wal-records") + (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id acc) + :name "skipped-in-combined-wal-record"}) + + ;; Keep the old tx-created-at so that we see the + ;; worst case wal-latency-ms + (-> acc + (update :attr-changes (fnil into []) attr-changes) + (update :ident-changes (fnil into []) ident-changes) + (update :triple-changes (fnil into []) triple-changes) + (update :tx-bytes (fnil + 0) tx-bytes) + (assoc :tx-id tx-id))) + wal-records)) (defn transform-byop-wal-record [{:keys [changes nextlsn]}] ;; TODO(byop): if change is empty, then there might be changes to the schema @@ -369,36 +374,50 @@ :attributes {:num-sockets (count sockets)}}) (tracer/with-span! {:name "invalidator/send-refreshes"} (doseq [{:keys [id]} sockets] - (receive-queue/put! {:op :refresh - :session-id id - :tx-id tx-id - :tx-created-at tx-created-at})))) + (receive-queue/enqueue->receive-q {:op :refresh + :session-id id + :tx-id tx-id + :tx-created-at tx-created-at})))) (catch Throwable t (def -wal-record wal-record) (def -store-value (store-snapshot store app-id)) (tracer/add-exception! t {:escaping? false})))))) +(defn invalidator-q-metrics [{:keys [grouped-queue get-worker-count]}] + [{:path "instant.reactive.invalidator.q.size" + :value (grouped-queue/size grouped-queue)} + {:path "instant.reactive.invalidator.q.longest-waiting-ms" + :value (if-let [{:keys [put-at]} (grouped-queue/peek grouped-queue)] + (.toMillis (Duration/between put-at (Instant/now))) + 0)} + {:path "instant.reactive.invalidator.q.worker-count" + :value (get-worker-count)}]) + (defn start-worker [process-id store wal-chan] (tracer/record-info! {:name "invalidation-worker/start"}) - (let [queue - (grouped-queue/start - {:group-key-fn :app-id - :combine-fn combine-wal-records - :process-fn (fn [_key wal-record] - (process-wal-record process-id - store - (::grouped-queue/combined wal-record 1) - wal-record)) - :metrics-path "instant.reactive.invalidator.q" - :max-workers 8})] + (let [queue-with-workers + (grouped-queue/start-grouped-queue-with-cpu-workers + {:group-fn :app-id + :reserve-fn (fn [_ q] (grouped-queue/inflight-queue-reserve 100 q)) + :process-fn (fn [_key wal-records] + (process-wal-record process-id + store + (count wal-records) + (combine-wal-records wal-records))) + :worker-count 8}) + grouped-queue (:grouped-queue queue-with-workers) + cleanup-gauges (gauges/add-gauge-metrics-fn + (fn [_] (invalidator-q-metrics queue-with-workers)))] (a/go (loop [] - (when-some [wal-record (a/receive-q {:op :refresh + :session-id id})))) (catch Throwable t (def -wal-record wal-record) (def -store-value (store-snapshot store app-id)) @@ -488,8 +507,7 @@ @(:started-promise wal-opts) - (def invalidator-q - (start-worker process-id rs/store worker-chan)) + (start-worker process-id rs/store worker-chan) (when byop-chan (ua/fut-bg diff --git a/server/src/instant/reactive/receive_queue.clj b/server/src/instant/reactive/receive_queue.clj index 5cb4295ec..d891b97d4 100644 --- a/server/src/instant/reactive/receive_queue.clj +++ b/server/src/instant/reactive/receive_queue.clj @@ -1,19 +1,46 @@ (ns instant.reactive.receive-queue (:require - [instant.grouped-queue :as grouped-queue])) + [instant.config :as config] + [instant.gauges :as gauges] + [instant.grouped-queue :as grouped-queue]) + (:import + (java.time Duration Instant))) (declare receive-q) -(defn put! +(defn enqueue->receive-q ([item] - (grouped-queue/put! receive-q item)) + (enqueue->receive-q receive-q item)) ([q item] - (grouped-queue/put! q item))) + (grouped-queue/put! q + {:item item :put-at (Instant/now)}))) -(defn start [q] - (.bindRoot #'receive-q q)) + +(defn receive-q-metrics [{:keys [grouped-queue get-worker-count]}] + [{:path "instant.reactive.session.receive-q.size" + :value (grouped-queue/size grouped-queue)} + {:path "instant.reactive.session.receive-q.longest-waiting-ms" + :value (if-let [{:keys [put-at]} (grouped-queue/peek grouped-queue)] + (.toMillis (Duration/between put-at (Instant/now))) + 0)} + {:path "instant.reactive.session.receive-q.worker-count" + :value (get-worker-count)}]) + +(defn start [{:keys [group-fn reserve-fn process-fn max-workers]}] + (let [{:keys [grouped-queue] :as queue-with-workers} + (if config/fewer-vfutures? + (grouped-queue/start-grouped-queue-with-cpu-workers {:group-fn group-fn + :reserve-fn reserve-fn + :process-fn process-fn + :worker-count max-workers}) + (grouped-queue/start-grouped-queue-with-workers {:group-fn group-fn + :reserve-fn reserve-fn + :process-fn process-fn + :max-workers max-workers}))] + (def receive-q grouped-queue) + (def cleanup-gauge (gauges/add-gauge-metrics-fn + (fn [_] (receive-q-metrics queue-with-workers)))))) (defn stop [] - (when (bound? #'receive-q) - (grouped-queue/stop receive-q) - (.unbindRoot #'receive-q))) + (when (bound? #'cleanup-gauge) + (cleanup-gauge))) diff --git a/server/src/instant/reactive/session.clj b/server/src/instant/reactive/session.clj index 635513322..a1aa28d67 100644 --- a/server/src/instant/reactive/session.clj +++ b/server/src/instant/reactive/session.clj @@ -37,12 +37,14 @@ [instant.util.uuid :as uuid-util] [lambdaisland.uri :as uri]) (:import + (java.time Duration Instant) (java.util.concurrent CancellationException) (java.util.concurrent.atomic AtomicLong))) ;; ------ ;; Setup +(declare receive-q-stop-signal) (def handle-receive-timeout-ms 5000) (def num-receive-workers (* (if config/fewer-vfutures? @@ -374,11 +376,11 @@ (doseq [notify-sess-id local-ids :let [q (-> (rs/session store notify-sess-id) :session/socket :receive-q)] :when (and q (not= sess-id notify-sess-id))] - (receive-queue/put! q - (assoc base-msg - :op :server-broadcast - :session-id notify-sess-id - :app-id app-id))) + (receive-queue/enqueue->receive-q q + (assoc base-msg + :op :server-broadcast + :session-id notify-sess-id + :app-id app-id))) (when (seq remote-ids) (eph/broadcast app-id remote-ids base-msg)) @@ -438,17 +440,17 @@ ::ex/param-malformed ::ex/validation-failed) - (receive-queue/put! q - {:op :error - :app-id app-id - :status 400 - :client-event-id client-event-id - :original-event (merge original-event - debug-info) - :type (keyword (name type)) - :message message - :hint hint - :session-id sess-id}) + (receive-queue/enqueue->receive-q q + {:op :error + :app-id app-id + :status 400 + :client-event-id client-event-id + :original-event (merge original-event + debug-info) + :type (keyword (name type)) + :message message + :hint hint + :session-id sess-id}) (::ex/session-missing ::ex/socket-missing @@ -458,17 +460,17 @@ (do (tracer/add-exception! instant-ex {:escaping? false}) - (receive-queue/put! q - {:op :error - :app-id app-id - :status 500 - :client-event-id client-event-id - :original-event (merge original-event - debug-info) - :type (keyword (name type)) - :message message - :hint hint - :session-id sess-id}))))) + (receive-queue/enqueue->receive-q q + {:op :error + :app-id app-id + :status 500 + :client-event-id client-event-id + :original-event (merge original-event + debug-info) + :type (keyword (name type)) + :message message + :hint hint + :session-id sess-id}))))) (defn- handle-uncaught-err [session app-id original-event root-err debug-info] (let [sess-id (:session/id session) @@ -476,16 +478,16 @@ {:keys [client-event-id]} original-event] (tracer/add-exception! root-err {:escaping? false}) - (receive-queue/put! q - {:op :error - :app-id app-id - :client-event-id client-event-id - :status 500 - :original-event (merge original-event - debug-info) - :message (str "Yikes, something broke on our end! Sorry about that." - " Please ping us (Joe and Stopa) on Discord and let us know!") - :session-id sess-id}))) + (receive-queue/enqueue->receive-q q + {:op :error + :app-id app-id + :client-event-id client-event-id + :status 500 + :original-event (merge original-event + debug-info) + :message (str "Yikes, something broke on our end! Sorry about that." + " Please ping us (Joe and Stopa) on Discord and let us know!") + :session-id sess-id}))) (defn handle-receive-attrs [store session event metadata] (let [{:keys [session/socket]} session @@ -560,33 +562,91 @@ (finally (swap! pending-handlers disj pending-handler))))))) -(defn process-receive-q-event [store event metadata] - (let [{:keys [session-id] - ::grouped-queue/keys [put-at]} event] - (if-some [session (rs/session store session-id)] - (let [session (into {} session) - total-delay-ms (- (System/currentTimeMillis) put-at) - ws-ping-latency-ms (some-> session - :session/socket - :get-ping-latency-ms - (#(%))) - event (assoc event - :total-delay-ms total-delay-ms - :ws-ping-latency-ms ws-ping-latency-ms) - metadata (assoc metadata :skipped-size (dec (::grouped-queue/combined event 1)))] - (handle-receive store session event metadata)) +(defn process-receive-q-entry [store entry metadata] + (let [{:keys [put-at item skipped-size]} entry + {:keys [session-id] :as event} item + now (Instant/now) + session (rs/session store session-id)] + (cond + (not session) (tracer/record-info! {:name "receive-worker/session-not-found" - :attributes (assoc metadata :session-id session-id)})))) - -(defn straight-jacket-process-receive-q-event [store group-key event] - (let [metadata {:group-key group-key}] - (try - (process-receive-q-event store event metadata) - (catch Throwable e - (tracer/record-exception-span! e {:name "receive-worker/straight-jacket-process-receive-q-event" - :attributes (assoc metadata - :session-id (:session-id event) - :event event)}))))) + :attributes (assoc metadata + :session-id session-id)}) + + :else + (handle-receive + store + (into {} session) + (assoc event + :total-delay-ms + (.toMillis (Duration/between put-at now)) + :ws-ping-latency-ms + (some-> session + :session/socket + :get-ping-latency-ms + (#(%)))) + (assoc metadata :skipped-size skipped-size))))) + +(defn resolve-consolidate [type batch] + (if (= 1 (count batch)) + :default + type)) + +(defmulti consolidate #'resolve-consolidate) + +(defmethod consolidate :default [_ batch] + batch) + +(defmethod consolidate :refresh [_ batch] + (doseq [{:keys [item]} (drop-last batch)] + (e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id item) + :tx-created-at (:tx-created-at item) + :name "skipped-refresh"})) + [(-> (last batch) + (assoc :skipped-size (dec (count batch))))]) + +(defmethod consolidate :refresh-presence [_ batch] + [(-> (last batch) + (assoc :skipped-size (dec (count batch))) + (assoc-in [:item :edits] + (into [] (mapcat #(get-in % [:item :edits])) batch)))]) + +(defmethod consolidate :room [_ batch] + (loop [last-entry (first batch) + batch (next batch) + acc (transient [])] + (if (empty? batch) + (persistent! (conj! acc last-entry)) + (let [entry (first batch) + last-entry' (if (and (= :set-presence (-> last-entry :item :op)) + (= :set-presence (-> entry :item :op))) + (assoc entry :skipped-size (inc (:skipped-size last-entry 0))) + entry)] + (recur last-entry' (next batch) acc))))) + +(defn straight-jacket-process-receive-q-batch [store batch metadata] + (try + (let [type (-> metadata :group-key first)] + (doseq [entry (consolidate type batch)] + (process-receive-q-entry store entry metadata))) + (catch Throwable e + (tracer/record-exception-span! e {:name "receive-worker/handle-receive-batch-straight-jacket" + :attributes (assoc metadata + :session-id (:session-id (:item (first batch))) + :items batch)})))) + +(defn receive-worker-reserve-fn [[t] inflight-q] + (case t + (:refresh :room :refresh-presence) + (grouped-queue/inflight-queue-reserve-all inflight-q) + + (grouped-queue/inflight-queue-reserve 1 inflight-q))) + +(defn process-fn [store group-key batch] + (straight-jacket-process-receive-q-batch store + batch + {:batch-size (count batch) + :group-key group-key})) ;; ----------------- ;; Websocket Interop @@ -597,9 +657,9 @@ (rs/assoc-session! store sess-id :session/socket socket))) (defn on-message [{:keys [id receive-q data]}] - (receive-queue/put! receive-q (-> (<-json data true) - (update :op keyword) - (assoc :session-id id)))) + (receive-queue/enqueue->receive-q receive-q (-> (<-json data true) + (update :op keyword) + (assoc :session-id id)))) (defn on-error [{:keys [id error]}] (condp instance? error @@ -662,59 +722,40 @@ ;; ------ ;; System -(defn group-key [{:keys [op session-id room-id q]}] - (case op - :transact - [:transact session-id] - - (:join-room :leave-room :set-presence :client-broadcast :server-broadcast) - [:room session-id room-id] - - (:add-query :remove-query) - [:query session-id q] - - :refresh - [:refresh session-id] - - :refresh-presence - [:refresh-presence session-id room-id] - - :error - [:error session-id] +(defn group-fn [{:keys [item] :as _input}] + (let [{:keys [op session-id room-id]} item] + (case op + :transact + [:transact session-id] - nil)) + (:join-room :leave-room :set-presence :client-broadcast :server-broadcast) + [:room session-id room-id] -(defmulti combine - (fn [event1 event2] - [(:op event1) (:op event2)])) + (:add-query :remove-query) + (let [{:keys [q]} item] + [:query session-id q]) -(defmethod combine :default [_ _] - nil) + :refresh + [:refresh session-id] -(defmethod combine [:refresh :refresh] [event1 event2] - (e2e-tracer/invalidator-tracking-step! - {:name "skipped-refresh" - :tx-id (:tx-id event1) - :tx-created-at (:tx-created-at event1)}) - event2) + :refresh-presence + [:refresh-presence session-id room-id] -(defmethod combine [:refresh-presence :refresh-presence] [event1 event2] - (update event2 :edits #(into (vec (:edits event1)) %))) + :error + [:error session-id] -(defmethod combine [:set-presence :set-presence] [_event1 event2] - event2) + nil))) -(defn process [group-key event] - (straight-jacket-process-receive-q-event rs/store group-key event)) +(comment + (group-fn {:item {:session-id 1 :op :transact}}) + (group-fn {:item {:session-id 1 :op :leave-room}}) + (group-fn {:item {:session-id 1 :op :add-query :q {:users {}}}})) (defn start [] - (receive-queue/start - (grouped-queue/start - {:group-key-fn #'group-key - :combine-fn #'combine - :process-fn #'process - :max-workers num-receive-workers - :metrics-path "instant.reactive.session.receive-q"}))) + (receive-queue/start {:max-workers num-receive-workers + :group-fn #'group-fn + :reserve-fn #'receive-worker-reserve-fn + :process-fn (partial process-fn rs/store)})) (defn stop [] (receive-queue/stop)) @@ -722,9 +763,3 @@ (defn restart [] (stop) (start)) - -(defn before-ns-unload [] - (stop)) - -(defn after-ns-reload [] - (start)) diff --git a/server/src/tool.clj b/server/src/tool.clj index 5a41ad39b..e99dc9893 100644 --- a/server/src/tool.clj +++ b/server/src/tool.clj @@ -183,7 +183,7 @@ (start-portal!) ;; all tap> calls will be sent to portal - (tap> @instant.reactive.store/store) + (tap> @instant.reactive.store/store-conn) For a guide, see: https://www.youtube.com/watch?v=Tj-iyDo3bq0" diff --git a/server/test/instant/grouped_queue_test.clj b/server/test/instant/grouped_queue_test.clj deleted file mode 100644 index 797b0efaf..000000000 --- a/server/test/instant/grouped_queue_test.clj +++ /dev/null @@ -1,95 +0,0 @@ -(ns instant.grouped-queue-test - (:require - [clojure.test :refer [deftest testing is]] - [instant.grouped-queue :as grouped-queue]) - (:import - (java.util.concurrent CountDownLatch Executors))) - -(def groups - "abcdefghijklmnopqrstuvwxyz") - -(def ids - (range 1000)) - -(defn make-opts [] - [{} - {:max-workers 1} - {:max-workers 10} - {:max-workers 100} - {:executor (Executors/newSingleThreadExecutor)} - {:executor (Executors/newFixedThreadPool 10)} - {:executor (Executors/newCachedThreadPool)} - {:executor (Executors/newVirtualThreadPerTaskExecutor)}]) - -(deftest basic-test - (doseq [opts (make-opts)] - (testing (pr-str opts) - (let [input (for [group groups - id ids] - {:group group :id id}) - output (atom []) - q (grouped-queue/start - (merge - {:group-key-fn :group - :process-fn (fn [_group item] - (swap! output conj item))} - opts))] - (doseq [item input] - (grouped-queue/put! q item)) - (is (= :shutdown (grouped-queue/stop q {:timeout-ms 1000}))) - (is (= (count @output) (* (count groups) (count ids)))) - (doseq [group groups - :let [filtered (filterv #(= group (:group %)) @output)]] - (testing group - (is (= ids (mapv :id filtered))))))))) - -(deftest combine-test - (doseq [opts (make-opts)] - (testing (pr-str opts) - (let [input (for [group groups - id ids] - {:group group :id id}) - output (atom []) - q (grouped-queue/start - (merge - {:group-key-fn :group - :combine-fn (fn [item1 item2] - (when (= (:id item2) (inc (:id item1))) - item2)) - :process-fn (fn [_group item] - (swap! output conj item))} - opts))] - (doseq [item input] - (grouped-queue/put! q item)) - (is (= :shutdown (grouped-queue/stop q {:timeout-ms 1000}))) - (doseq [group groups - :let [filtered (filterv #(= group (:group %)) @output) - processed (transduce (map #(::grouped-queue/combined % 1)) + 0 filtered)]] - (when-not (= (count ids) processed) - (println group (count ids) processed filtered)) - (testing group - (is (= (count ids) processed)))))))) - -(deftest thread-pool-size-test - (let [input (for [group [1 2 3 4 5] - id (range 100)] - {:group group :id id}) - latch (CountDownLatch. 500) - q (grouped-queue/start - {:group-key-fn :group - :max-workers 10 - :process-fn (fn [_group _item] - (Thread/sleep 10) - (.countDown latch))}) - t0 (System/currentTimeMillis)] - (doseq [item input] - (grouped-queue/put! q item)) - (testing "put! is not blocked by execution" - (is (< (- (System/currentTimeMillis) t0) 250))) - ;; give threads a chance to start - (Thread/sleep (- 500 (- (System/currentTimeMillis) t0))) - (testing "More than 1 thread spawned, but no more than 1 per group" - (is (<= 2 (grouped-queue/num-workers q) 5))) - (.await latch) - (testing "Total exec time more than 10 threads but less than 1 threads" - (is (< 1000 (- (System/currentTimeMillis) t0) 5000))))) diff --git a/server/test/instant/reactive/invalidator_test.clj b/server/test/instant/reactive/invalidator_test.clj index 940d83070..1f568c8ea 100644 --- a/server/test/instant/reactive/invalidator_test.clj +++ b/server/test/instant/reactive/invalidator_test.clj @@ -400,10 +400,10 @@ #"-" "_")] (with-redefs [inv/invalidate! - (fn [process-id store {:keys [app-id] :as wal-record}] + (fn [process-id store-conn {:keys [app-id] :as wal-record}] (if (and (= machine-id process-id) (= (:id app) app-id)) (swap! records conj wal-record) - (invalidate! store wal-record)))] + (invalidate! store-conn wal-record)))] (let [process (inv/start machine-id) uid (random-uuid)] (try diff --git a/server/test/instant/reactive/session_test.clj b/server/test/instant/reactive/session_test.clj index 46726fe65..b378a3570 100644 --- a/server/test/instant/reactive/session_test.clj +++ b/server/test/instant/reactive/session_test.clj @@ -46,11 +46,12 @@ (defn- with-session [f] (let [store (rs/init) - receive-q - (grouped-queue/start {:group-key-fn session/group-key - :combine-fn session/combine - :process-fn #(session/straight-jacket-process-receive-q-event store %1 %2) - :max-workers 1}) + {receive-q :grouped-queue} + (grouped-queue/start-grouped-queue-with-workers + {:max-workers 1 + :group-fn session/group-fn + :reserve-fn session/receive-worker-reserve-fn + :process-fn (partial session/process-fn store)}) realized-eph? (atom false) eph-hz (delay