diff --git a/project.clj b/project.clj index 9f86f626..63100086 100644 --- a/project.clj +++ b/project.clj @@ -21,7 +21,8 @@ "ring-jetty-adapter/src" "ring-servlet/src" "ring-jakarta-servlet/src" - "ring-websocket-protocols/src"]} + "ring-websocket-protocols/src" + "ring-sse-protocols/src"]} :aliases {"test" ["sub" "test"] "test-all" ["sub" "test-all"] "bench" ["sub" "-s" "ring-bench" "run"] diff --git a/ring-core/project.clj b/ring-core/project.clj index d5a67095..0cbd0ac8 100644 --- a/ring-core/project.clj +++ b/ring-core/project.clj @@ -7,6 +7,7 @@ :dependencies [[org.clojure/clojure "1.9.0"] [org.ring-clojure/ring-core-protocols "1.15.2"] [org.ring-clojure/ring-websocket-protocols "1.15.2"] + [org.ring-clojure/ring-sse-protocols "1.15.2"] [ring/ring-codec "1.3.0"] [commons-io "2.20.0"] [org.apache.commons/commons-fileupload2-core "2.0.0-M4"] diff --git a/ring-core/src/ring/sse.clj b/ring-core/src/ring/sse.clj new file mode 100644 index 00000000..b367d94f --- /dev/null +++ b/ring-core/src/ring/sse.clj @@ -0,0 +1,23 @@ +(ns ring.sse + "Protocols and utility functions for SSE support." + (:refer-clojure :exclude [send]) + (:require [ring.sse.protocols :as p])) + + +(extend-type clojure.lang.IPersistentMap + p/Listener + (on-open [m sender] + (when-let [kv (find m :on-open)] ((val kv) sender)))) + + +(defn send + "Sends a SSE message" + [sender sso-message] + (p/-send sender sso-message)) + + +(defn sse-response? + "Returns true if the response contains a SSE emitter." + [response] + (contains? response ::listener)) + diff --git a/ring-jakarta-servlet/src/ring/util/jakarta/servlet.clj b/ring-jakarta-servlet/src/ring/util/jakarta/servlet.clj index d17e4bab..659f34c3 100644 --- a/ring-jakarta-servlet/src/ring/util/jakarta/servlet.clj +++ b/ring-jakarta-servlet/src/ring/util/jakarta/servlet.clj @@ -59,7 +59,7 @@ :servlet-context (.getServletContext servlet) :servlet-context-path (.getContextPath request)})) -(defn- set-headers [^HttpServletResponse response, headers] +(defn set-headers [^HttpServletResponse response, headers] (doseq [[key val-or-vals] headers] (if (string? val-or-vals) (.setHeader response key val-or-vals) @@ -69,7 +69,7 @@ (when-let [content-type (get headers "Content-Type")] (.setContentType response content-type))) -(defn- make-output-stream +(defn make-output-stream [^HttpServletResponse response ^AsyncContext context] (let [os (.getOutputStream response)] (if (nil? context) diff --git a/ring-jetty-adapter/src/ring/adapter/jetty.clj b/ring-jetty-adapter/src/ring/adapter/jetty.clj index e725f229..48f922f5 100644 --- a/ring-jetty-adapter/src/ring/adapter/jetty.clj +++ b/ring-jetty-adapter/src/ring/adapter/jetty.clj @@ -5,7 +5,9 @@ (:require [clojure.java.io :as io] [ring.util.jakarta.servlet :as servlet] [ring.websocket :as ws] - [ring.websocket.protocols :as wsp]) + [ring.websocket.protocols :as wsp] + [ring.sse :as sse] + [ring.sse.protocols :as ssep]) (:import [java.nio ByteBuffer] [java.time Duration] [org.eclipse.jetty.server @@ -108,14 +110,61 @@ (.setMaxBinaryMessageSize (:ws-max-binary-size options 65536))) (.upgrade container creator request response))) +(defn- sse-write [^java.io.Writer out k v] + (when v + (doto out + (.write (name k)) + (.write ": ") + (.write (str v)) + (.write "\r\n")))) + +(defn- make-sse-sender [^java.io.OutputStream resp-stream ^java.util.concurrent.CountDownLatch close-latch] + (let [out (io/writer resp-stream)] + (reify ssep/Sender + (-send [_ {:keys [id event data]}] + (try + (doto out + (sse-write :id id) + (sse-write :event event) + (sse-write :data data) + (.write "\r\n") + (.flush)) + (catch java.io.IOException _ + (.countDown close-latch))))))) + +(defn- upgrade-to-sse + [^Request request ^HttpServletResponse response response-map _options] + (let [context (.startAsync request) + output (servlet/make-output-stream response context)] + (try + (let [close-latch (java.util.concurrent.CountDownLatch. 1) + on-open (-> response-map :ring.sse/listener :on-open) + sse-sender (make-sse-sender output close-latch)] + (doto response + (.setStatus (:status response-map 200)) + (servlet/set-headers (assoc (:headers response-map) "Content-Type" "text/event-stream"))) + (.start context (fn [] (on-open sse-sender))) + (.await close-latch)) + ;; Client terminates the connection: + (catch java.io.IOException _) + (catch java.lang.InterruptedException _) + (finally + (.close output))))) + (defn- proxy-handler ^ServletHandler [handler options] (proxy [ServletHandler] [] - (doHandle [_ ^Request base-request request response] + (doHandle [_ ^Request base-request ^Request request response] (let [request-map (servlet/build-request-map request) response-map (handler request-map)] (try - (if (ws/websocket-response? response-map) + (cond + (ws/websocket-response? response-map) (upgrade-to-websocket request response response-map options) + + (sse/sse-response? response-map) + (upgrade-to-sse request response response-map options) + + :else (servlet/update-servlet-response response response-map)) (finally (.setHandled base-request true) diff --git a/ring-jetty-adapter/test/ring/adapter/test/jetty.clj b/ring-jetty-adapter/test/ring/adapter/test/jetty.clj index 8b94e8ae..52b10459 100644 --- a/ring-jetty-adapter/test/ring/adapter/test/jetty.clj +++ b/ring-jetty-adapter/test/ring/adapter/test/jetty.clj @@ -8,7 +8,9 @@ [less.awful.ssl :as less-ssl] [ring.core.protocols :as p] [ring.websocket :as ws] - [ring.websocket.protocols :as wsp]) + [ring.websocket.protocols :as wsp] + [ring.sse :as sse] + [ring.sse.protocols :as ssep]) (:import [java.io File] [java.nio ByteBuffer] [java.nio.file Paths] @@ -940,6 +942,56 @@ (is (realized? closer)) (is (= @closer [1009 "Binary message too large: 6 > 5"]))))))) +;; All browsers have SSE support but I could not find any simple SSE client libs +;; that I could use for testing. This is rather crude way to test SSE, but... + +(defn sse-get [test-port] + (with-open [socket (java.net.Socket. "localhost" test-port) + out (-> (.getOutputStream socket) (io/writer)) + in (-> (.getInputStream socket) (io/reader))] + (let [read-kv (fn [key-fn] + (loop [data {}] + (let [line (.readLine in)] + (if (str/blank? line) + data + (let [[_ k v] (re-matches #"([^:]+):\s*(.*)" line)] + (recur (assoc data (key-fn k) v)))))))] + (doto out + (.write "GET / HTTP/1.1\r\n") + (.write "host: localhost\r\n") + (.write "accept: text/event-stream\r\n") + (.write "connection: close\r\n") + (.write "\r\n") + (.flush)) + (let [status-line (.readLine in) + [_ status _] (str/split status-line #"\s+") + headers (read-kv str/lower-case)] + {:status (parse-long status) + :headers headers + :body (->> (repeatedly (fn [] (read-kv keyword))) + (take-while (fn [message] (-> message :event (not= "close")))) + (into []))})))) + +(deftest run-jetty-sse-test + (let [messages [{:id "1" + :event "test" + :data "message 1"} + {:id "2" + :data "message 2"}] + handler (constantly + {::sse/listener {:on-open (fn [sse-sender] + ;; Send the test messages: + (doseq [message messages] + (sse/send sse-sender message)) + ;; Send close message so that the `sse-get` knows when to stop + ;; reading response: + (sse/send sse-sender {:event "close"}))}})] + (with-server handler {:port test-port} + (let [resp (sse-get test-port)] + (is (= 200 (:status resp))) + (is (= "text/event-stream" (get-in resp [:headers "content-type"]))) + (is (= messages (:body resp))))))) + (deftest run-jetty-async-websocket-test (testing "ping/pong" (let [log (atom []) diff --git a/ring-sse-protocols/project.clj b/ring-sse-protocols/project.clj new file mode 100644 index 00000000..739c6163 --- /dev/null +++ b/ring-sse-protocols/project.clj @@ -0,0 +1,9 @@ +(defproject org.ring-clojure/ring-sse-protocols "1.15.2" + :description "Ring protocols for SSE." + :url "https://github.com/ring-clojure/ring" + :scm {:dir ".."} + :license {:name "The MIT License" + :url "http://opensource.org/licenses/MIT"} + :dependencies [] + :profiles + {:dev {:dependencies [[org.clojure/clojure "1.9.0"]]}}) diff --git a/ring-sse-protocols/src/ring/sse/protocols.clj b/ring-sse-protocols/src/ring/sse/protocols.clj new file mode 100644 index 00000000..9a051fc6 --- /dev/null +++ b/ring-sse-protocols/src/ring/sse/protocols.clj @@ -0,0 +1,12 @@ +(ns ring.sse.protocols) + + +(defprotocol Listener + "A protocol for handling SSE responses. The second argument is an object that + satisfies the SSESender protocol." + (on-open [listener sse-sender] "Called when the SSE response is opened and ready.")) + + +(defprotocol Sender + "A protocol for sending SSE responses." + (-send [sender message] "Sends a SSE message"))