clojurewise / mapreduce / mapreduce.clj

(ns ^{:doc "In memory map/reduce framework"
      :author "Miki Tebeka <>"}

(defn- flatten1 
  "Flatten one level"
  (mapcat identity lst))

(defn map-stage 
  "Run the map stage, calls mapper on every record.
  (mapper record) should return a sequence of [key value] pairs.

  The result is aggregaattion of {k1 [v1 v2] k2 [v3] ...}"
  [mapper records]
  (let [results (flatten1 (pmap mapper records))]
    (reduce (fn [prev [k v]] (assoc prev k (cons v (prev k)))) {} results)))

(defn reduce-stage 
  "Run reducer on results of map stage.

  Reducer gets called with key and values, and return a single value"
  [reducer map-result]
  (let [ks (keys map-result)]
    (zipmap ks (pmap #(reducer % (map-result %)) ks))))

(defn map-reduce 
  "Run mapper and reducer on records.

  The result is {k1 v1 k2 v2 ...}"
  [mapper reducer records]
  (reduce-stage reducer (map-stage mapper records)))
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.