Commits

izazi committed dd097ab

Completed the time slice based generation of data into cassandra, with Avro file as the source

  • Participants
  • Parent commits 1410b91

Comments (0)

Files changed (6)

+#!/bin/sh
+hadoop fs -rm data/document.avro
+hadoop fs -copyFromLocal ./data/document.avro data/document.avro

File data/document.avro

Binary file added.

File data/rain.txt

-doc_id	text
-doc01	A rain shadow is a dry area on the lee back side of a mountainous area.
-doc02	This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.
-doc03	A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.
-doc04	This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.
-doc05	Two Women. Secrets. A Broken Land. [DVD Australia]
+doc_id	time	text
+doc01	1365318767732	A rain shadow is a dry area on the lee back side of a mountainous area.
+doc02	1365318767732	This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.
+doc03	1365318767732	A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.
+doc04	1365318767732	This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.
+doc05	1365318767732	Two Women. Secrets. A Broken Land. [DVD Australia]
   :url "http://example.com/FIXME"
   :source-paths ["src/clj"]
   :test-paths   ["test/clj"]
+  :resource-paths ["src/resources"]
   :license {:name "Eclipse Public License"
             :url "http://www.eclipse.org/legal/epl-v10.html"}
   :dependencies [[org.clojure/clojure "1.4.0"]
   				[cascalog "1.10.1"]
-  				[org.apache.cassandra/cassandra-all "1.1.5"]
+  				[org.apache.cassandra/cassandra-all "1.1.5" 
+           :exclusions [org.apache.cassandra.deps/avro]]
   				[clojurewerkz/cassaforte "1.0.0-beta11-SNAPSHOT"]
           [quintona/cascading-cassandra "0.0.7-SNAPSHOT"]
+          [clj-time "0.5.0"]
+          [cascading.avro/avro-scheme "2.2-SNAPSHOT"]
           [cascalog-more-taps "0.3.0"]
                 [org.apache.httpcomponents/httpclient "4.2.3"]]
   :profiles { :dev {:dependencies [[org.apache.hadoop/hadoop-core "0.20.2-dev"]

File src/clj/tfidf_cascalog/core.clj

   (:use cascalog.api
         clojure.test
         [midje sweet cascalog]
-        [cascalog.more-taps :only (hfs-delimited)])
+        [cascalog.more-taps :only (hfs-delimited)]
+        [clj-time.core :only (today in-minutes interval)]
+            [clj-time.coerce :only (from-long to-long)]
+            [clj-time.local :only (local-now)])
   (:require [cascalog.io :as io]
             [clojure.string :as s]
-            [cascalog.ops :as c])
+            [cascalog.ops :as c]
+            [cascalog.tap :as tap]
+            )
   (:import [cascading.tuple Fields]
            [cascading.scheme Scheme]
+           [cascading.avro AvroScheme]
+           [org.apache.avro Schema]
            [com.ifesdjeen.cascading.cassandra CassandraTap CassandraScheme]
            [org.apache.cassandra.utils ByteBufferUtil]
            [org.apache.cassandra.thrift Column]))
 
 (def storm_keyspace "storm")
 
+(deffilterop timing-correct? [doc-time] 
+  (let [now (local-now)
+        interval (in-minutes (interval (from-long doc-time) now))]
+    (if (< interval 60) false true)))
+
 (defmapcatop split [line]
   "reads in a line of string and splits it by regex"
   (s/split line #"[\[\]\\\(\),.)\s]+"))
 
 (defn etl-docs-gen [rain stop]
-  (<- [?doc-id ?word]
-      (rain ?doc-id ?line)
+  (<- [?doc-id ?time ?word]
+      (rain ?doc-id ?time ?line)
       (split ?line :> ?word-dirty)
       ((c/comp s/trim s/lower-case) ?word-dirty :> ?word)
-      (stop ?word :> false)))
+      (stop ?word :> false)
+      (timing-correct? ?time)))
 
 (defn D [src]
   (let [src  (select-fields src ["?doc-id"])]
 
 (defn DF [src]
   (<- [?key ?df-count-str]
-      (src ?doc-id ?df-word)
+      (src ?doc-id ?time ?df-word)
       (c/distinct-count ?doc-id ?df-word :> ?df-count)
       (str ?df-word :> ?key)
-      (str ?df-count :> ?df-count-str)))
+      (str ?df-count :> ?df-count-str))
+  )
 
 (defn TF [src]
   (<- [?key ?tf-count-str]
-      (src ?doc-id ?tf-word)
+      (src ?doc-id ?time ?tf-word)
       (c/count ?tf-count)
       (str ?doc-id ?tf-word :> ?key)
       (str ?tf-count :> ?tf-count-str)))
 (defn create-tf-tap [cassandra-ip]
   (create-tap "tf" cassandra-ip))
 
+(defn load-schema []
+  (Schema/parse (.getResourceAsStream (clojure.lang.RT/baseLoader) "document.avsc")))
+
 (defn execute [in stop cassandra-ip]
   (cc/connect! cassandra-ip)
   (sch/set-keyspace storm_keyspace)
-  (let [input (hfs-delimited in :skip-header? true)
+  (let [input (tap/hfs-tap (AvroScheme. (load-schema)) in)
         stop (hfs-delimited stop :skip-header? true)
         src  (etl-docs-gen input stop)]
     (?- (create-d-tap cassandra-ip)

File src/resources/document.avsc

+{"namespace": "storm.cookbook",
+ "type": "record",
+ "name": "Document",
+ "fields": [
+     {"name": "docid", "type": "string"},
+     {"name": "time",  "type": "long"},
+     {"name": "line", "type": "string"}
+ ]
+}