Source

clojurewise / metrics / src / db.clj

Full commit
(ns ^{:doc "Database layer"}
  db
  (:require [clojure.contrib.sql :as sql] 
            [clojure.contrib.logging :as log]
            fs))

; Base path to database, h2 will make two files out of this
(def db-path (fs/join (fs/home) ".metrics"))

; Database connection definition
(def *db* {
  :classname "org.h2.Driver"
  :subprotocol "h2"
  :subname db-path
  :create true})

; Taken from
(def *default-urls* [
    "http://server1.mycompany.com/metrics"
    "http://server2.mycompany.com/metrics"
    "http://server3.mycompany.com/metrics"
])

(defn- current-time []
  (System/currentTimeMillis))

(defn- metrics->records 
  "Convert metrics [name value] to records {:url U :metric M :value V :time T}"
  [url metrics]
  (let [now (current-time)]
    (for [[metric value] metrics]
      {:url url :metric metric :value value :time now})))

(defn insert [url metrics]
  (let [records (metrics->records url metrics)]
    (sql/with-connection *db*
      (sql/transaction
        (dorun (apply sql/insert-records :metrics  records))))))

(defn- query [query func]
  (sql/with-connection *db*
    (sql/with-query-results rs query
      (doall (map func rs)))))

(defn fetch 
  "Fetch max 'size' metrics, which is a sequence of [time value]"
  [url metric size]
  (query 
    ["SELECT time, value FROM metrics
     WHERE url = ? AND metric = ?
     ORDER BY time DESC
     LIMIT ?"
     url metric size] (juxt :time :value)))

(defn urls 
  "Sequence of current urls in the database"
  []
  (query ["SELECT DISTINCT url FROM metrics"] :url))

(defn metrics 
  "Sequence of metrics for url"
  [url]
  (query
    ["SELECT DISTINCT metric FROM metrics
      WHERE url = ? AND metric != 'dummy'" url]
    :metric))

(defn- create []
  (sql/with-connection *db*
    (sql/transaction
      (sql/create-table :metrics
        [:url "VARCHAR(255)"]
        [:metric "VARCHAR(255)"]
        [:value :long]
        [:time :long])
      (sql/do-commands
        "CREATE INDEX metircs_url ON metrics(url)"
        "CREATE INDEX metircs_metric ON metrics(metric)"
        "CREATE INDEX metircs_time ON metrics(time)")
      (dorun (map #(insert % [["dummy" "0"]]) *default-urls*)))))

; FIXME: This should be in sync with *update-interval* and *num-values*
(defn- clean-interval []
  (* 60 60 1000))

(defn- cleanup 
  "Delete old entries"
  []
  (let [now (current-time)
        min-time (- now (clean-interval))]
  (sql/with-connection *db*
    (sql/delete-rows :metrics ["time < ?" min-time]))))

(defn- cleanup-thread []
  (try (cleanup) (catch Exception e (log/error e)))
  (Thread/sleep (clean-interval))
  (recur))

(defn initialize []
  (try
    (create)
    (catch Exception e (log/info e)))
  (future (cleanup-thread)))