clj2010 / src / clj2010.clj

Full commit
(ns clj2010
  (:require [apricot-soup :as soup])
  (:use [clj-time.core :only (date-time plus minutes year month day)]
        [clj-time.coerce :only (to-long)]
        [clj-time.format :only (formatter unparse)]
        [incanter.core :only (view)]
        [incanter.charts :only (bar-chart)]
        [clojure.contrib.string :only (trim lower-case split)])

; "chouser: " -> "chouser"
(defn fix-user 
  "\"chouser: \" -> \"chouser\""
  (trim (apply str (butlast user))))

(defn tokenize 
  "Poor man's tokenizer"
  (map lower-case (re-seq #"[a-zA-Z0-9'_-]+" sentence)))

(defn str->long 
  "Parse string to long"
  (Long/valueOf s))

(def *date-re* #"(\d+)-(\d+)-(\d+)")

(defn logfile-day 
  "\"logs/2010-01-01.html\" -> #<DateTime 2010-01-01T00:00:00.000Z>"
  (let [[match year month day] (re-find *date-re* logfile)]
    (apply date-time (map str->long [year month day]))))

(defn add-time 
  "Add time to day of log"
  [day hour minute]
  (plus day (minutes (+ (* 60 hour) minute))))

(defn parse-text
  "\"21:38 chouser: great, thanks!\" -> 
      [\"21:38\" \"chouser:\" \"great, thanks!\"]"
  (rest (re-find #"^(\d+):(\d+)([^:]+:)?(.*)" text)))

(defn process-p
  "Process a <p>...</p> log recrod, return {:time ... :tokens ... :user ..}"
  [day previous-log p]
  (let [[hour minute user text] (parse-text (soup/text p))]
    { :time (add-time day (str->long hour) (str->long minute))
      :tokens (tokenize text)
      :user (if user (fix-user user) (:user previous-log))}))

(defn process-logfile 
  (let [day (logfile-day logfile)
        pp (partial process-p day)]
    (rest (reductions pp (cons nil (soup/$ (slurp logfile) p))))))

(defn log-files 
  "Return list of log files under root"
  (let [dir (File. root)
        files (filter #(not (nil? (re-find *date-re* %))) (.list dir))]
    (map #(str root "/" %) files)))

(defn load-data [root]
  (flatten (pmap process-logfile (log-files root))))

(defn flatten1 
  "Flatten one level"
  (mapcat identity lst))

; mapper returns [[k1 v1] [k1 v2] [k2 v3] ...], we aggregate it to
; {k1 [v1 v2] k2 [v3] ...}
(defn map-stage [mapper records]
  (let [results (flatten1 (pmap mapper records))]
    (reduce (fn [prev [k v]] (assoc prev k (cons v (prev k)))) {} results)))

(defn reduce-stage [reducer map-result]
  (let [ks (keys map-result)]
    (zipmap ks (pmap #(reducer % (map-result %)) ks))))

(defn map-reduce [mapper reducer records]
  (reduce-stage reducer (map-stage mapper records)))

(defn month-only [time]
  (date-time (year time) (month time)))

(defn day-only [time]
  (date-time (year time) (month time) (day time)))

(def numlogs {
    :map (fn [record] [[(month-only (:time record)) 1]])
    :reduce (fn [key values] (reduce + values))
    :title "# of logs by month"
    :x-label "Month"
    :y-label "Number of logs"
    :x-format (fn [dt] (unparse (formatter "MMM") dt)) })

(defn run-job [job records]
  (let [result (map-reduce (:map job) (:reduce job) records)
        xs (sort (keys result))
        ys (map #(result %) xs)
        fxs (map (:x-format job) xs)]
    (bar-chart fxs ys :title (:title job) :x-label (:x-label job) 
               :y-label (:y-label job))))

(defn -main []
  (let [records (load-data "logs")]
    (map-reduce numlogs records)))