From 0e34d1854fe8082c28e1fc27ed2d189ba1087001 Mon Sep 17 00:00:00 2001 From: Shlomi Vaknin Date: Mon, 27 Aug 2018 21:30:13 +0000 Subject: [PATCH 1/2] bringing in changes from hackathon --- src/papiea/core.clj | 1 + src/papiea/db/spec.clj | 5 ++- src/papiea/db/status.clj | 2 + src/papiea/engine.clj | 96 ++++++++++++++++++++++++++-------------- src/papiea/specs.clj | 27 +++++++---- src/papiea/tasks.clj | 7 +-- 6 files changed, 93 insertions(+), 45 deletions(-) diff --git a/src/papiea/core.clj b/src/papiea/core.clj index 8eb94fe..0f3dee4 100644 --- a/src/papiea/core.clj +++ b/src/papiea/core.clj @@ -26,6 +26,7 @@ :direct (pfn arg) :rest (json/decode (try+ (:body (rest/post pfn {:content-type :json + :insecure? true :body (json/generate-string arg)})) (catch Object o (throw+ {:status :failed-api-call diff --git a/src/papiea/db/spec.clj b/src/papiea/db/spec.clj index 272d644..093ceef 100644 --- a/src/papiea/db/spec.clj +++ b/src/papiea/db/spec.clj @@ -39,7 +39,10 @@ :_id (-> entity :metadata :uuid)}} {:return-new true :upsert true}) :_id :_prefix) - (mc/remove-by-id db entity-spec (-> entity :metadata :uuid))) + ;; throw an error of user is trying to remove an item that does not exists. This should be made atomic + (if (mc/find-by-id db entity-spec (-> entity :metadata :uuid)) + (mc/remove-by-id db entity-spec (-> entity :metadata :uuid)) + (throw+ "Trying to remove an Item that does not exist"))) (catch Object e (throw+ {:entity entity :cause e} "Cant insert or update entity. Check that you are diff --git a/src/papiea/db/status.clj b/src/papiea/db/status.clj index 7b83601..1ad40f8 100644 --- a/src/papiea/db/status.clj +++ b/src/papiea/db/status.clj @@ -39,3 +39,5 @@ (defn clear-entities [] (mc/remove db entity-status)) +(defn remove-entity [prefix uuid] + (mc/remove db entity-status {:prefix prefix :_id uuid})) diff --git a/src/papiea/engine.clj b/src/papiea/engine.clj index f4af192..9936255 100644 --- a/src/papiea/engine.clj +++ b/src/papiea/engine.clj @@ -5,6 +5,7 @@ [orchestra.spec.test :as st] [clojure.set :as set] [papiea.specs] + [papiea.core :as c] [tracks.core :as t] [papiea.tasks :as task] [papiea.core :refer [call-api fixed-rate ->timer]] @@ -12,8 +13,17 @@ [papiea.db.status :as stdb])) ;;(map #(ns-unmap *ns* %) (keys (ns-interns *ns*))) -(defn state-settled? [entity] - (= (:spec entity) (select-keys (:status entity) (keys (:spec entity))))) +(defn unidirectional-diff + "Check diff only on items defined in the reference map" + [reference target] + (cond + (map? reference) (every? (fn[key] (unidirectional-diff (get reference key) (get target key))) (keys reference)) + (sequential? reference) (every? (fn[key] (unidirectional-diff (get reference key) (get target key))) reference) + :default (= reference target))) + +(defn state-settled? + "returns true iff every key that is defined in the `spec` (recursively defined) has the same value as the one in the `status`" + [entity] (unidirectional-diff (:spec entity) (:status entity))) (defn handleable-diffs-for [state prefix] (let [prefix-diffs (select (into prefix [ALL (complement state-settled?)]) state)] @@ -93,10 +103,10 @@ (defn previous-spec-version [entity] (transform [:metadata :spec_version] dec entity)) -(defn unspec-version [entity] - (select-keys (assoc (setval [:metadata :spec_version] nil entity) - :spec (:spec entity)) - [:metadata :spec])) +(defn unspec-version + "Returns an entity composed only from the spec and the uuid part of the metadata. Should only be used internally" + [entity] {:metadata {:uuid (-> entity :metadata :uuid)} + :spec (-> entity :spec)}) (defn ensure-spec-version ([prefix entity default-value] @@ -118,23 +128,30 @@ (let [{:keys [add-fn del-fn change-fn] :as w} (get transformers prefix)] (let [[modify data op tasked] (cond added [add-fn added :added (:add-tasked? w)] removed [del-fn removed :removed (:del-tasked? w)] - changed [change-fn changed :changed (:changed-tasked? w)]) + changed [change-fn changed :changed (:change-tasked? w)]) task-id (when tasked (tasked-fn op data))] - (let [r (try+ (modify data) + (let [r (try+ (c/call-api modify data) (catch Object o {:status :failed :error o}))] (if tasked (if (= :failed (:status r)) - (task/update-task1 task-id {:status "FAILED"}) + (task/update-task (:uuid task-id) {:status "FAILED"}) (do (stdb/update-entity-status! prefix r) ;; Save the state - (task/update-task1 task-id {:status "COMPLETED"}))) + (task/update-task (:uuid task-id) {:status "COMPLETED"}) + (when-not (:status r) + (spdb/remove-entity prefix (-> r :metadata :uuid)) + (stdb/remove-entity prefix (-> r :metadata :uuid))))) (if (= :failed (:status r)) (failed-fn op data) - (do (stdb/update-entity-status! prefix r) ;; Save the state - (success-fn op data)) - )))))) + (do + (stdb/update-entity-status! prefix r) + (when (nil? (:status r)) + (spdb/remove-entity prefix (-> r :metadata :uuid)) + (stdb/remove-entity prefix (-> r :metadata :uuid))) + ;; Save the state + (success-fn op data)))))))) (defn tasked-op [change-watch] (fn[op entity] @@ -143,7 +160,7 @@ (swap! change-watch dissoc previous-entity) (let [task (task/register-new-task entity)] (deliver done (:uuid task)) - (:uuid task)))))) + task))))) (defn change-succeeded [change-watch] (fn[op entity] @@ -163,10 +180,12 @@ (defn handle-diffs "apply the diffs" - ([transformers] (handle-diffs transformers - (fn[op data] (println "Taksed: " op data)) - (fn[op data] (println "Success: " op data)) - (fn[op data] (println "Failed: " op data)))) + ([transformers] + (let [change-watch (atom {})] + (handle-diffs transformers + (tasked-op change-watch) + (change-succeeded change-watch) + (change-failed change-watch)))) ([transformers tasked-fn success-fn failed-fn] (let [diffs (-> (refresh-specs transformers) (refresh-status transformers) @@ -183,7 +202,8 @@ (start-engine [this transformers timeout]) (stop-engine [this]) (notify-change [this]) - (change-spec! [this prefix entity])) + (change-spec! [this prefix entity]) + (get-entity [this prefix entity])) (defrecord DefaultEngine [change-watch handle-diff-notify started state] PapieaEngine @@ -191,23 +211,26 @@ (add-watch handle-diff-notify :process-diffs (fn[key a o n] (println n "Looking for diffs") - (handle-diffs transformers - (tasked-op change-watch) - (change-succeeded change-watch) - (change-failed change-watch)))) + (try+ + (handle-diffs transformers + (tasked-op change-watch) + (change-succeeded change-watch) + (change-failed change-watch)) + (catch Object o + (println o))))) (when (compare-and-set! started false true) (println "Starting engine...") (swap! state assoc :tranformers transformers - :interval-notify (fixed-rate (partial notify-change this) (->timer) 5000))) + :interval-notify-cancel-fn (fixed-rate (partial notify-change this) (->timer) 5000))) this) (stop-engine [this] (when (compare-and-set! started true false) (println "Stopping engine..") - ((:interval-notify @state)) ;; stop the timer - (swap! state dissoc :interval-notify :tranformers)) + ((:interval-notify-cancel-fn @state)) ;; stop the timer + (swap! state dissoc :interval-notify-cancel-fn :tranformers)) this) (notify-change [this] @@ -224,16 +247,23 @@ (notify-change this) done (catch Object e - (swap! change-watch dissoc speced-entity) - ;;(println e) (println "Error processing spec change." e) - (throw+ (if (map? e) - (merge {:status :failure} e) - {:status :failure - :cause e})))))) - ) + (deliver done (if (map? e) + (merge {:status :failure} e) + {:status :failure + :cause e})) + (swap! change-watch dissoc speced-entity) + done)))) + + (get-entity [this prefix uuid] + (let [all-ents (get-in (-> (refresh-status transformers) + (refresh-specs transformers)) + prefix)] + (some-> (filter (fn[x] (= uuid (-> x :metadata :uuid))) all-ents) + first)))) (defn new-papiea-engine [] (println "Created new engine") (->DefaultEngine (atom {}) (atom 0) (atom false) (atom {}))) + diff --git a/src/papiea/specs.clj b/src/papiea/specs.clj index c6945e9..6ac7ede 100644 --- a/src/papiea/specs.clj +++ b/src/papiea/specs.clj @@ -14,8 +14,6 @@ (s/def :papiea.entity.list/spec (s/keys :req-un [:papiea.entity/metadata :papiea.entity/spec])) (s/def :papiea.entity.list/specs (s/coll-of :papiea.entity.list/spec)) - - (s/def :papiea.provider.ok/status #{:ok}) (s/def :papiea.provider.failed/status #{:failed}) (s/def :papiea.provider.failed/error string?) @@ -39,9 +37,12 @@ (s/def :papiea/api-fn (s/or :direct ifn? :rest string?)) - (s/def :papiea.engine/prefix (s/or :keywords (s/coll-of keyword?) - :strings (s/coll-of string?))) + :strings (s/coll-of string?) + :string string? + :keyword keyword?)) + +(s/def :papiea.engine.json/prefix keyword?) (s/def :papiea.engine/fn-or-staged (s/or :single-step :papiea/api-fn :staged (s/coll-of :papiea/api-fn :count 2))) @@ -63,7 +64,7 @@ :papiea.engine/del-tasked? :papiea.engine/change-tasked?])) -(s/def :papiea.engine/transformers (s/map-of :papiea.engine/prefix :papiea.engine/entity-crud)) +(s/def :papiea.engine/transformers (s/map-of :papiea.engine.json/prefix :papiea.engine/entity-crud)) (s/def :papiea.engine/refresh-period integer?) (s/def :papiea.engine/clean-start boolean?) (s/def :papiea.engine/start-engine (s/keys :req-un [:papiea.engine/transformers @@ -73,15 +74,25 @@ (s/def :papiea.engine/stop-engine (s/keys :req-un [:papiea.entity/uuid])) -(s/def :papiea.engine/change-spec (s/keys :req-un [:papiea.entity/uuid :papiea.engine/prefix :papiea/entity])) +(s/def :papiea.engine/change-spec (s/keys :req-un [:papiea.entity/uuid + :papiea.engine/prefix + :papiea/entity])) + (s/def :papiea.engine/tasked-response (s/keys :req-un [])) (s/def :papiea.engine/synced-response (s/keys :req-un [])) (s/def :papiea.engine/change-spec-response (s/or :task :papiea.engine/tasked-response :sync :papiea.engine/synced-response)) -(s/def :papiea.engine/list-entities-request (s/keys :req-un [:papiea.entity/uuid +(s/def :papiea.engine/list-entities-request (s/keys :req-un [:papiea.entity/uuid :papiea.engine/prefix])) -(s/def :papiea.engine/list-entities-response (s/coll-of map?)) +(s/def :papiea.engine.list/entities (s/coll-of map?)) +(s/def :papiea.engine/list-entities-response (s/keys :req-un [:papiea.engine.list/entities])) + + +(s/def :papiea.engine/get-entity-request (s/keys :req-un [:papiea.entity/uuid + :papiea.engine/prefix + :papiea/entity])) +(s/def :papiea.engine/get-entity-response :papiea/entity) (s/def :papiea.task/time string?) (s/def :papiea.task/status string?) diff --git a/src/papiea/tasks.clj b/src/papiea/tasks.clj index a2698d5..4d33f80 100644 --- a/src/papiea/tasks.clj +++ b/src/papiea/tasks.clj @@ -17,6 +17,7 @@ (let [now (java.util.Date.) uuid (uuid) d {:_id uuid + :status "PENDING" :uuid uuid :creation_time (str now) :start_time (str now) @@ -24,12 +25,12 @@ :percentage_complete 0 :entity_reference_list [{:kind (or (some-> entity :metadata :kind) "") :uuid (-> entity :metadata :uuid) - :name (some-> entity :spec :name)}]}] + :name (or (some-> entity :spec :name) "")}]}] (mc/insert db tasks d) d)) -(defn update-task1 - ([uuid] (throw+ "A") (update-task1 uuid {})) +(defn update-task + ([uuid] (update-task uuid {})) ([uuid task] (let [uuid (if (map? uuid) (-> uuid :metadata :uuid) uuid)] (if-let [p (mc/find-map-by-id db tasks uuid)] From 9e25ddccc706291d67334643fae11059dc9e585e Mon Sep 17 00:00:00 2001 From: Shlomi Vaknin Date: Wed, 29 Aug 2018 20:33:56 +0000 Subject: [PATCH 2/2] Merged hackacthon stuff. Added some performance checking code (commented) --- project.clj | 9 ++- src/papiea/core.clj | 11 +++ src/papiea/db/spec.clj | 3 + src/papiea/engine.clj | 71 ++++++++++------- src/papiea/tasks.clj | 14 ++-- test/papiea/tasks_test.clj | 158 +++++++++++++++++++++++++++++++++++-- 6 files changed, 221 insertions(+), 45 deletions(-) diff --git a/project.clj b/project.clj index 838fcef..60de567 100644 --- a/project.clj +++ b/project.clj @@ -3,7 +3,7 @@ :dependencies [[org.clojure/clojure "1.9.0"] ;; clojure itself [org.clojure/spec.alpha "0.2.168"] [org.clojure/core.specs.alpha "0.2.36"] - [mount "0.1.12"] ;; dependencies + ;;[mount "0.1.12"] ;; dependencies [metosin/compojure-api "2.0.0-alpha21"] ;; rest api routes [metosin/spec-tools "0.7.1"] ;; allowes for more complex specs @@ -14,17 +14,20 @@ [com.rpl/specter "1.1.1"] ;; deep datastructure transformation [slingshot "0.12.2"] ;; advanced error handling - [kovacnica/clojure.network.ip "0.1.2"] ;; library for handling ip spaces + ;;[kovacnica/clojure.network.ip "0.1.2"] ;; library for handling ip spaces [tracks "1.0.5"] ;; improved destructuring [orchestra "2017.11.12-1"] ;; better specs validation - [com.grammarly/perseverance "0.1.3"] ;; retry on exceptions + ;;[com.grammarly/perseverance "0.1.3"] ;; retry on exceptions [com.novemberain/monger "3.1.0"] ;; Mongo DB [clj-http "3.9.0"] ;; http client [cheshire "5.8.0"] ;; json support + [com.clojure-goes-fast/clj-async-profiler "0.1.3"] + [com.climate/claypoole "1.1.4"] ] :ring {:handler papiea.handler/app} + :jvm-opts ["-Xmx1024m" "-server" "-XX:+UseSerialGC" "-XX:+UseStringDeduplication"] :uberjar-name "server.jar" :profiles {:dev {:dependencies [[javax.servlet/javax.servlet-api "3.1.0"] [cheshire "5.8.0"] diff --git a/src/papiea/core.clj b/src/papiea/core.clj index 0f3dee4..f0e6020 100644 --- a/src/papiea/core.clj +++ b/src/papiea/core.clj @@ -36,3 +36,14 @@ :http-response o}))) keyword)))) + +(def current + "Get current process PID" + (memoize + (fn [] + (-> (java.lang.management.ManagementFactory/getRuntimeMXBean) + (.getName) + (clojure.string/split #"@") + (first))))) + +(println "Starting Papiea. PID" (current)) diff --git a/src/papiea/db/spec.clj b/src/papiea/db/spec.clj index 093ceef..715f65d 100644 --- a/src/papiea/db/spec.clj +++ b/src/papiea/db/spec.clj @@ -53,3 +53,6 @@ (defn clear-entities [] (mc/remove db entity-spec)) + +(defn remove-entity [prefix uuid] + (mc/remove db entity-spec {:prefix prefix :_id uuid})) diff --git a/src/papiea/engine.clj b/src/papiea/engine.clj index 9936255..d76e84f 100644 --- a/src/papiea/engine.clj +++ b/src/papiea/engine.clj @@ -12,6 +12,7 @@ [papiea.db.spec :as spdb] [papiea.db.status :as stdb])) ;;(map #(ns-unmap *ns* %) (keys (ns-interns *ns*))) +(set! *warn-on-reflection* true) (defn unidirectional-diff "Check diff only on items defined in the reference map" @@ -62,8 +63,8 @@ (defn-spec merge-entities-specs :papiea.entity.list/specs [old-entities :papiea.entity.list/specs new-entities :papiea.entity.list/specs] - (merge-entities-part (merge-entities-part old-entities new-entities :spec) - new-entities :metadata)) + (-> (merge-entities-part old-entities new-entities :spec) + (merge-entities-part new-entities :metadata))) (defn ensure-entity-map [m ks] (if (empty? ks) m @@ -76,19 +77,20 @@ ([state transformers] (reduce (fn[o [prefix {:keys [status-fn]}]] (if status-fn - (transform prefix (fn[x] (let [db-statuses (stdb/get-entities prefix) - statuses (call-api status-fn db-statuses) - removed (map (fn[e] (dissoc e :status :spec)) - (set/difference (set db-statuses) (set statuses)))] - (doseq [entity (concat statuses removed)] - (stdb/update-entity-status! prefix entity)) - (merge-entities-status x statuses))) + (transform prefix + (fn[x] (let [db-statuses (stdb/get-entities prefix) + statuses (call-api status-fn db-statuses) + removed (map (fn[e] {:metadata (:metadata e)} #_(dissoc e :status :spec)) + (set/difference (set db-statuses) + (set statuses)))] + (doseq [entity (concat statuses removed)] + (stdb/update-entity-status! prefix entity)) + (merge-entities-status x statuses))) (ensure-entity-map o prefix)) (do(println "Error: Cant refresh" prefix " - no `status-fn` defined. Unsafely ignoring..") - (ensure-entity-map o prefix)) - )) - state - transformers))) + (ensure-entity-map o prefix)))) + state + transformers))) (declare prefix) ;; bug in cider while debugging.. (defn refresh-specs @@ -96,7 +98,10 @@ ([transformers] (refresh-specs {} transformers)) ([state transformers] (reduce (fn[o [prefix _]] - (transform prefix (fn[x] (merge-entities-specs x (spdb/get-entities prefix))) (ensure-entity-map o prefix))) + (transform prefix (fn[prefix-entities] + (merge-entities-specs prefix-entities + (spdb/get-entities prefix))) + (ensure-entity-map o prefix))) state transformers))) @@ -130,7 +135,7 @@ removed [del-fn removed :removed (:del-tasked? w)] changed [change-fn changed :changed (:change-tasked? w)]) task-id (when tasked (tasked-fn op data))] - + ;;(println "Performing: " op data) (let [r (try+ (c/call-api modify data) (catch Object o {:status :failed @@ -159,7 +164,7 @@ (when-let [done (get @change-watch previous-entity)] (swap! change-watch dissoc previous-entity) (let [task (task/register-new-task entity)] - (deliver done (:uuid task)) + (deliver done task) task))))) (defn change-succeeded [change-watch] @@ -187,27 +192,34 @@ (change-succeeded change-watch) (change-failed change-watch)))) ([transformers tasked-fn success-fn failed-fn] + (println "Thread:" (.getName (Thread/currentThread))) (let [diffs (-> (refresh-specs transformers) (refresh-status transformers) (handleable-diffs (keys transformers)))] - (doseq [{:keys [prefix diffs]} diffs - diff diffs] - (turn-spec-to-status transformers prefix tasked-fn success-fn failed-fn diff))))) + (when-not (empty? diffs) + (println "\tFound" (count(:diffs (first diffs))) "diffs") + (time(doseq [{:keys [prefix diffs]} diffs + diff diffs] + + (turn-spec-to-status transformers prefix tasked-fn success-fn failed-fn diff))))))) ;; We model the async call as a watch on an atom. The watch is triggered every time the atom ;; value is changed, causing handle-diffs to be called with the registered transformers (defprotocol PapieaEngine - (start-engine [this transformers timeout]) + (start-engine [this transformers diff-interval]) (stop-engine [this]) (notify-change [this]) (change-spec! [this prefix entity]) (get-entity [this prefix entity])) + +;;(require '[com.climate.claypoole :as cp]) + (defrecord DefaultEngine [change-watch handle-diff-notify started state] PapieaEngine - (start-engine [this transformers timeout] + (start-engine [this transformers diff-interval] (add-watch handle-diff-notify :process-diffs (fn[key a o n] (println n "Looking for diffs") @@ -221,9 +233,12 @@ (when (compare-and-set! started false true) (println "Starting engine...") - (swap! state assoc - :tranformers transformers - :interval-notify-cancel-fn (fixed-rate (partial notify-change this) (->timer) 5000))) + (let [timer (->timer)] + (swap! state assoc + :diff-interval diff-interval + ;;:diff-cleanup-cancel-fn (fixed-rate (fn[] (cleanup-change-watch )) timer (* 2.5 diff-interval)) + :tranformers (:transformers this) + :interval-notify-cancel-fn (fixed-rate (partial notify-change this) timer diff-interval)))) this) (stop-engine [this] @@ -244,7 +259,7 @@ (try+ (swap! change-watch assoc (unspec-version speced-entity) done) (insert-spec-change! prefix speced-entity) - (notify-change this) + ;;(notify-change this) done (catch Object e (println "Error processing spec change." e) @@ -252,12 +267,12 @@ (merge {:status :failure} e) {:status :failure :cause e})) - (swap! change-watch dissoc speced-entity) + (swap! change-watch dissoc (unspec-version speced-entity)) done)))) (get-entity [this prefix uuid] - (let [all-ents (get-in (-> (refresh-status transformers) - (refresh-specs transformers)) + (let [all-ents (get-in (-> (refresh-status (:transformers this)) + (refresh-specs (:transformers this))) prefix)] (some-> (filter (fn[x] (= uuid (-> x :metadata :uuid))) all-ents) first)))) diff --git a/src/papiea/tasks.clj b/src/papiea/tasks.clj index 4d33f80..836b3ca 100644 --- a/src/papiea/tasks.clj +++ b/src/papiea/tasks.clj @@ -34,14 +34,12 @@ ([uuid task] (let [uuid (if (map? uuid) (-> uuid :metadata :uuid) uuid)] (if-let [p (mc/find-map-by-id db tasks uuid)] - (do(println "MERGING" p task) - (let [now (java.util.Date.) - d (merge p - task - {:last_update_time (str now)})] - (println "UPDATING to " d) - (mc/update db tasks {:_id uuid} d) - d)))))) + (let [now (java.util.Date.) + d (merge p + task + {:last_update_time (str now)})] + (mc/update db tasks {:_id uuid} d) + d))))) (defn get-task [uuid] (dissoc (mc/find-map-by-id db tasks uuid) :_id)) diff --git a/test/papiea/tasks_test.clj b/test/papiea/tasks_test.clj index d02d7f9..8ffc85d 100644 --- a/test/papiea/tasks_test.clj +++ b/test/papiea/tasks_test.clj @@ -2,28 +2,167 @@ (:require [clojure.test :refer :all] [slingshot.slingshot :refer [throw+ try+]] [papiea.engine :as e] + [papiea.staged :as staged] + [papiea.db.spec :as spdb] + [papiea.db.status :as stdb] + [papiea.tasks :as task] [papiea.tasks :as t])) +(require '[clj-async-profiler.core :as prof]) + +(set! *warn-on-reflection* true) + (defn add [entity] (assoc entity :status (:spec entity))) +(defn del [entity] {:metadata (:metadata entity)}) + (defn fail [& _] (throw+ "not implemnted")) + +(defn inc-version [living-objects id] + (swap! living-objects assoc id (inc(get @living-objects id 0)))) + +(defn generate-test + ([max-ops] (generate-test max-ops [:add :change :dell] #{} 0 0)) + ([max-ops ops] (generate-test max-ops ops #{} 0 0)) + ([max-ops ops living-objects max-id min-spec-version ] + (let [living-objects (atom (into {} (map (fn[o] [o min-spec-version]) living-objects))) + max-id (atom max-id) + gen-add (fn[id spec_version name ] {:metadata {:uuid (str id) + :spec_version spec_version} + :spec {:name (str name \- id)}}) + gen-del (fn[id _] {:metadata {:uuid (str id)}}) + + gen-change (fn[id spec new-name] (gen-add id spec (str new-name \- (rand-int (* 2 max-ops)))))] + (reduce (fn[o op-i] + (let [op (rand-nth ops)] + (conj + o + (condp = op + :add (let [id (swap! max-id inc)] + (inc-version living-objects id) + (gen-add id 1 "added")) + :change (if (empty? @living-objects) + (let [id (swap! max-id inc)] ;; if cant change, add + (println "Cant change!!") + (inc-version living-objects id) + (gen-add id 1 "added")) + (let [[id spec-version] (rand-nth (vec @living-objects))] + (inc-version living-objects id) + (gen-change id spec-version "changed"))) + :del (if (empty? @living-objects) + (let [id (swap! max-id inc)] ;; if cant change, add + (println "Cant del!!") + (inc-version living-objects id) + (gen-add id 1 "added")) + (let [id (rand-nth (vec @living-objects))] + (swap! living-objects dissoc id) + (gen-del id "del"))) + )) + + )) + [] + (range max-ops)) + + )) + + ) + + +(defn cleanup [] + (do(println "Clean start - removing all previously known entities") + (spdb/clear-entities) + (stdb/clear-entities) + (task/clear-tasks) + (staged/clear-stages)) + ) + +(comment + (cleanup) + + (def creates (generate-test 1000 [:add])) + + (def tests (generate-test 2000 [:change] (into #{} (map inc (range 1000))) 1000 2) + ) + + (drop 100 tests) + ;; -L1234:*:54321 + + (prof/profile-for 2 {}) ; Run profiler for 10 seconds + + + (do + (prof/start {:interval 99}) + + (cleanup) + + ( + + ( + (doseq [test creates] + (e/change-spec! e [:test :tasks] test)) + + (time (doseq [test tests] + ;;(println test) + (e/change-spec! e [:test :tasks] test))) + + + )) + + (e/change-spec! e [:test :tasks] (first (rest tests))) + + #_(def r(let [a (doall (time(map (fn[test] (e/change-spec! e [:test :tasks] test)) tests)))] + ;;(doall(map (fn[x] (deref x 5 :timeout)) a)) + )) + + (prof/stop {})) + + (prof/serve-files 54321) + + (time + (dotimes [r 1] + (doseq [i (range 1000)] + (spdb/get-entity [:test :tasks] (str i)) + (stdb/get-entity [:test :tasks] (str i))))) + + (time + (dotimes [r 1] + (spdb/get-entities [:test])))) + (comment (def transformers {[:test :tasks] {:add-fn add :add-tasked? true - :del-fn fail - :change-fn fail + :del-fn del + :del-tasked? true + :change-fn add + :change-tasked? true :status-fn identity}}) - (-> (e/refresh-status transformers) - (e/refresh-specs transformers)) + (time(count(get-in (-> (e/refresh-status transformers) + (e/refresh-specs transformers)) + [:test :tasks]))) (def e (e/new-papiea-engine)) - (e/start-engine e transformers 5000) + (e/start-engine e transformers 1000) (e/stop-engine e) - (def r (e/change-spec! e [:test :tasks] {:metadata {:uuid "11"} + (count @(:change-watch e)) + + + (papiea.core/current) + + (dotimes [i 100] + (e/change-spec! e [:test :tasks] {:metadata {:uuid (str i)} + :spec {:name (str "test " i)}}) + ) + + (dotimes [i 100] + (e/change-spec! e [:test :tasks] {:metadata {:uuid (str i)}}) + ) + + + (def r (e/change-spec! e [:test :tasks] {:metadata {:uuid "17"} :spec {:name "test"}})) (println(t/get-task @r)) @@ -36,3 +175,10 @@ (e/handleable-diffs (keys transformers))) (e/handle-diffs transformers)) + +(- 2212 2061) + +;;{ +;; "transformers": {["nusim", "images"] {"add-fn" "http:/"}}, +;; "timeout": 5000, +;;}