Commits

Anonymous committed 12d0b50

add loader scripts

Comments (0)

Files changed (3)

apps/rabbit_publisher/src/rabbit_publisher_server.erl

 -export([send/1]).
 -record(state, {bunnyc_pid, qname}).
 
+send(Message) -> gen_server:call(?MODULE, {send, Message}).
+
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
                                   QName, []),
     {ok, #state{bunnyc_pid=Pid, qname=QName}}.
 
-send(Message) ->
-    gen_server:call(?MODULE, {send, Message}).
-
 handle_call({send, Message}, _From, State=#state{bunnyc_pid=Pid, 
                                                  qname=QName}) ->
-    bunnyc:publish(Pid, QName, Message),
-    {reply, ok, State}.
+    {reply, bunnyc:publish(Pid, QName, Message), State}.
 
-handle_cast(_Msg, State) ->
-    {noreply, State}.
+handle_cast(_Msg, State) -> {noreply, State}.
+handle_info(_Info, State) -> {noreply, State}.
+terminate(_Reason, _State) -> ok.
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
 
-handle_info(_Info, State) ->
-    {noreply, State}.
 
-terminate(_Reason, _State) ->
-    ok.
 
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
 
-
-
-

scripts/load_stockdata.py

+import csv, json
+import riak
+
+DATA_FILE="goog.csv"
+
+def munge(r):
+    return dict(
+        date=r['Date'],
+        volume=r['Volume'],
+        adj_close=r['Adj Close'],
+        open=r['Open'],
+        high=r['High'],
+        low=r['Low'])
+
+def main():
+    client = riak.RiakClient()
+    bucket = client.bucket("goog_prices")
+    fp = file(DATA_FILE, "rbU")
+    reader = csv.reader(fp)
+    h = reader.next()
+    count = 0
+    for line in reader:
+        record = munge(dict(zip(h, line)))
+        obj = bucket.new(record['date'])
+        obj.set_data(json.dumps(record))
+        obj.store()
+        count += 1
+    print "stored %s objects" % count
+
+if __name__ == "__main__":
+    main()

scripts/rabbit_consumer.py

+from amqplib import client_0_8 as amqp
+from sqlite3 import dbapi2 as sqlite
+import json
+
+AMQP_HOST = "localhost:5672"
+AMQP_USER = "guest"
+AMQP_PASS = "guest"
+AMQP_VHOST = "/"
+AMQP_QUEUE = "riak.publish"
+
+DB_CONN = None
+
+def connect():
+    ''' Connect to RabbitMQ '''
+    return amqp.Connection(
+        host=AMQP_HOST,
+        userid=AMQP_USER,
+        password=AMQP_PASS,
+        virtual_host=AMQP_VHOST,
+        insist=False)
+
+def save_obj(obj):
+    ''' Save "obj" to database '''
+    c = DB_CONN.cursor()
+    c.execute("INSERT INTO goog_prices VALUES "
+              "(%s,%s,%s,%s,%s,%s)" %
+              (obj['date'],
+               obj['open'],
+               obj['high'],
+               obj['low'],
+               obj['volume'],
+               obj['adj_close']))
+    DB_CONN.commit()
+
+def recv_callback(msg):
+    ''' Called on receipt of a RabbitMQ message '''
+    obj = json.loads(json.loads(json.loads(msg.body.strip())))
+    save_obj(obj)
+
+def setup_db():
+    DB_CONN.execute("CREATE TABLE goog_prices "
+                    "(date, open, high, low, volume, close)")
+
+
+def main():
+    global DB_CONN
+    DB_CONN = sqlite.connect("goog_prices")
+    setup_db()
+    amqp_conn = connect()
+    amqp_chan = amqp_conn.channel()
+    amqp_chan.basic_consume(
+        queue=AMQP_QUEUE, 
+        no_ack=True,
+        callback=recv_callback, 
+        consumer_tag="testtag")
+    while True: amqp_chan.wait()
+
+
+if __name__ == "__main__":
+    main()