Source

resizer / src / resizer / consumer.clj

Full commit
(ns resizer.consumer
  (:refer-clojure :exclude [declare get])
  (:import [com.rabbitmq.client Connection]
           java.io.IOException)
  (:use [clojure.data.json :only [read-json json-str]]
        [langohr.core  :as lhc]
        [langohr.consumers :as lhcons]
        [langohr.queue :as lhq]
        [langohr.basic :as lhb]
        [resizer.resizer :as rz]
        [resizer.fabric :as rf]))

(defonce ^Connection conn (rf/rabbitmq-connect))

;; TODO if resize fails publish a message or log that it failed to process the image.
(defn process-message [ch metadata payload]
  (let [jsondata (read-json (String. payload))
        filename (:filename jsondata)
        mime (:mime jsondata)]
    (rz/process-image filename mime)
    (lhb/publish ch "cloudstagram-new-image" "" payload :content-type "application/json" )))

(defn start-consumer []
  (let [channel     (.createChannel conn)
        exchange    "cloudstagram-upload"
        queue       "resize_queue"
        shutdown-ln (lhc/shutdown-listener (fn [cause]
                                             (println (str "Shutdown listener has fired: " cause))))]
    (try
      (.addShutdownListener channel shutdown-ln)
      (lhq/declare channel queue :durable true :auto-delete false :exclusive false)
      (lhq/bind channel queue exchange)
      (.start (Thread. #(lhcons/subscribe channel queue process-message :auto-ack true) "resize/consumer"))
      (catch IOException ioe ;; see http://www.rabbitmq.com/api-guide.html#shutdown
        nil))))