Simon Belluzzo avatar Simon Belluzzo committed 6f1467d

Initial commit

Comments (0)

Files changed (6)

+/target
+/lib
+/classes
+/checkouts
+pom.xml
+pom.xml.asc
+*.jar
+*.class
+.lein-deps-sum
+.lein-failures
+.lein-plugins
+.lein-repl-history
+# zmq
+
+Experiments in Clojure and ZeroMQ, using examples from:
+augustl.com/blog/2013/zeromq_instead_of_http/
+
+## Usage
+
+FIXME
+
+## License
+
+Copyright © 2013 FIXME
+
+Distributed under the Eclipse Public License, the same as Clojure.
+# Introduction to zmq
+
+TODO: write [great documentation](http://jacobian.org/writing/great-documentation/what-to-write/)
+(defproject zmq "0.1.0-SNAPSHOT"
+  :description "FIXME: write description"
+  :url "http://example.com/FIXME"
+  :license {:name "Eclipse Public License"
+            :url "http://www.eclipse.org/legal/epl-v10.html"}
+  :dependencies [[org.clojure/clojure "1.5.1"]
+                 [org.zeromq/jzmq "2.1.0-SNAPSHOT"]]
+  :profiles {:dev {:jvm-opts ["-Djava.library.path=/usr/local/lib"]}}
+  :main zmq.core)
+(ns zmq.core
+  (:gen-class))
+;; All from examples on augustl.com/blog/2013/zeromq_instead_of_http/
+(import [org.zeromq ZMQ ZMQ$Context ZMQ$Socket ZMQQueue])
+
+(defn single-server
+  "No concurrency reply server"
+  []
+  (.start
+   (Thread.
+    (fn []
+      (let [sock (.socket (ZMQ/context 1) ZMQ/REP)]
+       (.bind sock "tcp://127.0.0.1:1337")
+        (while true
+          ;; Block until we receive a message
+          (let [req (.recv sock 0)]
+            ;; req is a byte[]. Do whatever you want with it!
+            ;; We echo the req back to the client.
+           (.send sock (.getBytes (str (String. req) " - echoed!")) ZMQ/NOBLOCK))))))))
+
+(defn single-client
+  "No concurrency client, single REQ at a time."
+  [x]
+  (.start
+   (Thread.
+    (fn []
+      (let [sock (.socket (ZMQ/context 1) ZMQ/REQ)]
+        (.connect sock "tcp://127.0.0.1:1337")
+        (dotimes [n x]
+          ;; Perform a request
+          (.send sock (.getBytes (str "Hello, " n)) 0)
+          ;; Block until we receive a response
+          (let [res (.recv sock 0)]
+            ;; res is a byte[] containing whatever the REP socket replied with.
+            (println (String. res)))))))))
+
+;; Iteration 2
+(defn my-response-handler
+    "Takes a req (bytes), returns the response (also bytes)."
+    [req]
+    (.getBytes (str (String. req) " - echoed!")))
+
+(defn concurrent-server
+  "Bleh"
+  []
+  (let [ctx (ZMQ/context 1)
+        worker-url "inproc://responders"
+        router-socket (.socket ctx ZMQ/ROUTER)
+        dealer-socket (.socket ctx ZMQ/DEALER)]
+    (.bind router-socket "tcp://127.0.0.1:1337")
+    (.bind dealer-socket worker-url)
+    ;; We can now respond to 10 requests in parallel
+   (dotimes [n 10]
+      (.start
+       (Thread.
+        (fn []
+          (let [sock (.socket ctx ZMQ/REP)]
+            ;; We reply to the DEALER
+            (.connect sock worker-url)
+            (while true
+              ;; Same API as before - receive message, then reply.
+              (let [req (.recv sock 0)]
+                (.send sock (my-response-handler req) ZMQ/NOBLOCK))))))))
+    (.start
+     (Thread.
+      ;; Forwards messages from router to dealer and vice versa.
+      (fn [] (.run (ZMQQueue. ctx router-socket dealer-socket)))))))
+
+;; Iteration 3: Concurrent requests
+(defn connect
+  [server-url]
+  (let [ctx (ZMQ/context 1)
+        worker-url (str "inproc://" (java.util.UUID/randomUUID))
+        queue-thread (Thread.
+                      (fn []
+                        (let [client-sock (.socket ctx ZMQ/DEALER)
+                              worker-sock (.socket ctx ZMQ/ROUTER)]
+                          (.connect client-sock server-url)
+                          (.bind worker-sock worker-url)
+                          (.run (ZMQQueue. ctx  client-sock worker-sock)))))]
+    (.start queue-thread)
+    {:ctx ctx
+     :worker-url worker-url
+     :queue-thread queue-thread}))
+
+(defn disconnect
+  "Useful for tests etc. Pass the map returned by `connect` above."
+  [connection]
+  (.interrupt (get connection :queue-thread))
+  (.term (get connection :ctx)))
+
+(defn with-req-sock
+  "Takes the connection and a higher order function that is passed a new REQ
+   socket. When this function returns, the REQ socket is destroyed."
+  [connection handler]
+  (let [socket (.socket (get connection :ctx) ZMQ/REQ)]
+    (.connect socket (get connection :worker-url))
+    (try
+      (handler socket)
+      (finally (.close socket)))))
+
+(defn concurrent-client
+  [x]
+  (def connection (connect "tcp://127.0.0.1:1337"))
+  (dotimes [n x]
+    (.start
+    (Thread.
+     (fn []
+        (with-req-sock connection
+          (fn [sock]
+            (.send sock (.getBytes (str "Hello, " n)) 0)
+            (let [res (.recv sock 0)]
+              (println (String. res))))))))))
+
+(defn -main
+  "Run main program"
+  []
+  (concurrent-client 100)
+  (concurrent-server))
+

test/zmq/core_test.clj

+(ns zmq.core-test
+  (:require [clojure.test :refer :all]
+            [zmq.core :refer :all]))
+
+(deftest a-test
+  (testing "FIXME, I fail."
+    (is (= 0 1))))
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.