Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
:dependencies [[org.clojure/clojure "1.9.0"] ;; clojure itself
[org.clojure/spec.alpha "0.2.168"]
[org.clojure/core.specs.alpha "0.2.36"]
[mount "0.1.12"] ;; dependencies
;;[mount "0.1.12"] ;; dependencies

[metosin/compojure-api "2.0.0-alpha21"] ;; rest api routes
[metosin/spec-tools "0.7.1"] ;; allowes for more complex specs
Expand All @@ -14,17 +14,20 @@
[com.rpl/specter "1.1.1"] ;; deep datastructure transformation
[slingshot "0.12.2"] ;; advanced error handling

[kovacnica/clojure.network.ip "0.1.2"] ;; library for handling ip spaces
;;[kovacnica/clojure.network.ip "0.1.2"] ;; library for handling ip spaces

[tracks "1.0.5"] ;; improved destructuring
[orchestra "2017.11.12-1"] ;; better specs validation
[com.grammarly/perseverance "0.1.3"] ;; retry on exceptions
;;[com.grammarly/perseverance "0.1.3"] ;; retry on exceptions
[com.novemberain/monger "3.1.0"] ;; Mongo DB

[clj-http "3.9.0"] ;; http client
[cheshire "5.8.0"] ;; json support
[com.clojure-goes-fast/clj-async-profiler "0.1.3"]
[com.climate/claypoole "1.1.4"]
]
:ring {:handler papiea.handler/app}
:jvm-opts ["-Xmx1024m" "-server" "-XX:+UseSerialGC" "-XX:+UseStringDeduplication"]
:uberjar-name "server.jar"
:profiles {:dev {:dependencies [[javax.servlet/javax.servlet-api "3.1.0"]
[cheshire "5.8.0"]
Expand Down
12 changes: 12 additions & 0 deletions src/papiea/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
:direct (pfn arg)
:rest (json/decode
(try+ (:body (rest/post pfn {:content-type :json
:insecure? true
:body (json/generate-string arg)}))
(catch Object o
(throw+ {:status :failed-api-call
Expand All @@ -35,3 +36,14 @@
:http-response o})))
keyword))))


(def current
"Get current process PID"
(memoize
(fn []
(-> (java.lang.management.ManagementFactory/getRuntimeMXBean)
(.getName)
(clojure.string/split #"@")
(first)))))

(println "Starting Papiea. PID" (current))
8 changes: 7 additions & 1 deletion src/papiea/db/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
:_id (-> entity :metadata :uuid)}}
{:return-new true :upsert true})
:_id :_prefix)
(mc/remove-by-id db entity-spec (-> entity :metadata :uuid)))
;; throw an error of user is trying to remove an item that does not exists. This should be made atomic
(if (mc/find-by-id db entity-spec (-> entity :metadata :uuid))
(mc/remove-by-id db entity-spec (-> entity :metadata :uuid))
(throw+ "Trying to remove an Item that does not exist")))
(catch Object e
(throw+ {:entity entity
:cause e} "Cant insert or update entity. Check that you are
Expand All @@ -50,3 +53,6 @@

(defn clear-entities []
(mc/remove db entity-spec))

(defn remove-entity [prefix uuid]
(mc/remove db entity-spec {:prefix prefix :_id uuid}))
2 changes: 2 additions & 0 deletions src/papiea/db/status.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@
(defn clear-entities []
(mc/remove db entity-status))

(defn remove-entity [prefix uuid]
(mc/remove db entity-status {:prefix prefix :_id uuid}))
159 changes: 102 additions & 57 deletions src/papiea/engine.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,26 @@
[orchestra.spec.test :as st]
[clojure.set :as set]
[papiea.specs]
[papiea.core :as c]
[tracks.core :as t]
[papiea.tasks :as task]
[papiea.core :refer [call-api fixed-rate ->timer]]
[papiea.db.spec :as spdb]
[papiea.db.status :as stdb]))
;;(map #(ns-unmap *ns* %) (keys (ns-interns *ns*)))
(set! *warn-on-reflection* true)

(defn state-settled? [entity]
(= (:spec entity) (select-keys (:status entity) (keys (:spec entity)))))
(defn unidirectional-diff
"Check diff only on items defined in the reference map"
[reference target]
(cond
(map? reference) (every? (fn[key] (unidirectional-diff (get reference key) (get target key))) (keys reference))
(sequential? reference) (every? (fn[key] (unidirectional-diff (get reference key) (get target key))) reference)
:default (= reference target)))

(defn state-settled?
"returns true iff every key that is defined in the `spec` (recursively defined) has the same value as the one in the `status`"
[entity] (unidirectional-diff (:spec entity) (:status entity)))

(defn handleable-diffs-for [state prefix]
(let [prefix-diffs (select (into prefix [ALL (complement state-settled?)]) state)]
Expand Down Expand Up @@ -52,8 +63,8 @@

(defn-spec merge-entities-specs :papiea.entity.list/specs
[old-entities :papiea.entity.list/specs new-entities :papiea.entity.list/specs]
(merge-entities-part (merge-entities-part old-entities new-entities :spec)
new-entities :metadata))
(-> (merge-entities-part old-entities new-entities :spec)
(merge-entities-part new-entities :metadata)))

(defn ensure-entity-map [m ks]
(if (empty? ks) m
Expand All @@ -66,37 +77,41 @@
([state transformers]
(reduce (fn[o [prefix {:keys [status-fn]}]]
(if status-fn
(transform prefix (fn[x] (let [db-statuses (stdb/get-entities prefix)
statuses (call-api status-fn db-statuses)
removed (map (fn[e] (dissoc e :status :spec))
(set/difference (set db-statuses) (set statuses)))]
(doseq [entity (concat statuses removed)]
(stdb/update-entity-status! prefix entity))
(merge-entities-status x statuses)))
(transform prefix
(fn[x] (let [db-statuses (stdb/get-entities prefix)
statuses (call-api status-fn db-statuses)
removed (map (fn[e] {:metadata (:metadata e)} #_(dissoc e :status :spec))
(set/difference (set db-statuses)
(set statuses)))]
(doseq [entity (concat statuses removed)]
(stdb/update-entity-status! prefix entity))
(merge-entities-status x statuses)))
(ensure-entity-map o prefix))
(do(println "Error: Cant refresh" prefix " - no `status-fn` defined. Unsafely ignoring..")
(ensure-entity-map o prefix))
))
state
transformers)))
(ensure-entity-map o prefix))))
state
transformers)))

(declare prefix) ;; bug in cider while debugging..
(defn refresh-specs
"Based on the registered transformers, ask each entity type for its entities specs in our specs-db"
([transformers] (refresh-specs {} transformers))
([state transformers]
(reduce (fn[o [prefix _]]
(transform prefix (fn[x] (merge-entities-specs x (spdb/get-entities prefix))) (ensure-entity-map o prefix)))
(transform prefix (fn[prefix-entities]
(merge-entities-specs prefix-entities
(spdb/get-entities prefix)))
(ensure-entity-map o prefix)))
state
transformers)))

(defn previous-spec-version [entity]
(transform [:metadata :spec_version] dec entity))

(defn unspec-version [entity]
(select-keys (assoc (setval [:metadata :spec_version] nil entity)
:spec (:spec entity))
[:metadata :spec]))
(defn unspec-version
"Returns an entity composed only from the spec and the uuid part of the metadata. Should only be used internally"
[entity] {:metadata {:uuid (-> entity :metadata :uuid)}
:spec (-> entity :spec)})

(defn ensure-spec-version
([prefix entity default-value]
Expand All @@ -118,32 +133,39 @@
(let [{:keys [add-fn del-fn change-fn] :as w} (get transformers prefix)]
(let [[modify data op tasked] (cond added [add-fn added :added (:add-tasked? w)]
removed [del-fn removed :removed (:del-tasked? w)]
changed [change-fn changed :changed (:changed-tasked? w)])
changed [change-fn changed :changed (:change-tasked? w)])
task-id (when tasked (tasked-fn op data))]

(let [r (try+ (modify data)
;;(println "Performing: " op data)
(let [r (try+ (c/call-api modify data)
(catch Object o
{:status :failed
:error o}))]
(if tasked
(if (= :failed (:status r))
(task/update-task1 task-id {:status "FAILED"})
(task/update-task (:uuid task-id) {:status "FAILED"})
(do (stdb/update-entity-status! prefix r) ;; Save the state
(task/update-task1 task-id {:status "COMPLETED"})))
(task/update-task (:uuid task-id) {:status "COMPLETED"})
(when-not (:status r)
(spdb/remove-entity prefix (-> r :metadata :uuid))
(stdb/remove-entity prefix (-> r :metadata :uuid)))))
(if (= :failed (:status r))
(failed-fn op data)
(do (stdb/update-entity-status! prefix r) ;; Save the state
(success-fn op data))
))))))
(do
(stdb/update-entity-status! prefix r)
(when (nil? (:status r))
(spdb/remove-entity prefix (-> r :metadata :uuid))
(stdb/remove-entity prefix (-> r :metadata :uuid)))
;; Save the state
(success-fn op data))))))))

(defn tasked-op [change-watch]
(fn[op entity]
(let [previous-entity (unspec-version (dissoc entity :status))]
(when-let [done (get @change-watch previous-entity)]
(swap! change-watch dissoc previous-entity)
(let [task (task/register-new-task entity)]
(deliver done (:uuid task))
(:uuid task))))))
(deliver done task)
task)))))

(defn change-succeeded [change-watch]
(fn[op entity]
Expand All @@ -163,51 +185,67 @@

(defn handle-diffs
"apply the diffs"
([transformers] (handle-diffs transformers
(fn[op data] (println "Taksed: " op data))
(fn[op data] (println "Success: " op data))
(fn[op data] (println "Failed: " op data))))
([transformers]
(let [change-watch (atom {})]
(handle-diffs transformers
(tasked-op change-watch)
(change-succeeded change-watch)
(change-failed change-watch))))
([transformers tasked-fn success-fn failed-fn]
(println "Thread:" (.getName (Thread/currentThread)))
(let [diffs (-> (refresh-specs transformers)
(refresh-status transformers)
(handleable-diffs (keys transformers)))]
(doseq [{:keys [prefix diffs]} diffs
diff diffs]
(turn-spec-to-status transformers prefix tasked-fn success-fn failed-fn diff)))))
(when-not (empty? diffs)
(println "\tFound" (count(:diffs (first diffs))) "diffs")
(time(doseq [{:keys [prefix diffs]} diffs
diff diffs]

(turn-spec-to-status transformers prefix tasked-fn success-fn failed-fn diff)))))))

;; We model the async call as a watch on an atom. The watch is triggered every time the atom
;; value is changed, causing handle-diffs to be called with the registered transformers


(defprotocol PapieaEngine
(start-engine [this transformers timeout])
(start-engine [this transformers diff-interval])
(stop-engine [this])
(notify-change [this])
(change-spec! [this prefix entity]))
(change-spec! [this prefix entity])
(get-entity [this prefix entity]))


;;(require '[com.climate.claypoole :as cp])

(defrecord DefaultEngine [change-watch handle-diff-notify started state]
PapieaEngine
(start-engine [this transformers timeout]
(start-engine [this transformers diff-interval]
(add-watch handle-diff-notify :process-diffs
(fn[key a o n]
(println n "Looking for diffs")
(handle-diffs transformers
(tasked-op change-watch)
(change-succeeded change-watch)
(change-failed change-watch))))
(try+
(handle-diffs transformers
(tasked-op change-watch)
(change-succeeded change-watch)
(change-failed change-watch))
(catch Object o
(println o)))))

(when (compare-and-set! started false true)
(println "Starting engine...")
(swap! state assoc
:tranformers transformers
:interval-notify (fixed-rate (partial notify-change this) (->timer) 5000)))
(let [timer (->timer)]
(swap! state assoc
:diff-interval diff-interval
;;:diff-cleanup-cancel-fn (fixed-rate (fn[] (cleanup-change-watch )) timer (* 2.5 diff-interval))
:tranformers (:transformers this)
:interval-notify-cancel-fn (fixed-rate (partial notify-change this) timer diff-interval))))
this)

(stop-engine [this]
(when (compare-and-set! started true false)
(println "Stopping engine..")
((:interval-notify @state)) ;; stop the timer
(swap! state dissoc :interval-notify :tranformers))
((:interval-notify-cancel-fn @state)) ;; stop the timer
(swap! state dissoc :interval-notify-cancel-fn :tranformers))
this)

(notify-change [this]
Expand All @@ -221,19 +259,26 @@
(try+
(swap! change-watch assoc (unspec-version speced-entity) done)
(insert-spec-change! prefix speced-entity)
(notify-change this)
;;(notify-change this)
done
(catch Object e
(swap! change-watch dissoc speced-entity)
;;(println e)
(println "Error processing spec change." e)
(throw+ (if (map? e)
(merge {:status :failure} e)
{:status :failure
:cause e}))))))
)
(deliver done (if (map? e)
(merge {:status :failure} e)
{:status :failure
:cause e}))
(swap! change-watch dissoc (unspec-version speced-entity))
done))))

(get-entity [this prefix uuid]
(let [all-ents (get-in (-> (refresh-status (:transformers this))
(refresh-specs (:transformers this)))
prefix)]
(some-> (filter (fn[x] (= uuid (-> x :metadata :uuid))) all-ents)
first))))

(defn new-papiea-engine []
(println "Created new engine")
(->DefaultEngine (atom {}) (atom 0) (atom false) (atom {})))


Loading