Commits

Matt Bone committed 9be0669

go version of this, other things.

Comments (0)

Files changed (7)

 *.elc
 *.pyc
 *~
+*.6
+memo_go/memoserver
 import socket
 import pickle
 
-from memo_server import PUT, GET
+from memo_server import PUT, GET, SHUTDOWN
 
 RECV_SIZE = 1024
 
 
         return value
 
+    def shutdown(self):
+        """Tells the remote memo queue server to shutdown."""
+        msg_parts = (SHUTDOWN)
+        #TODO just stream to the socket? does that work with pickle?
+        msg_parts_pickled = pickle.dumps(msg_parts)
+        self.s.send(msg_parts_pickled)
+        
+        self.s.close()        
+
 
     def __exit__(self, type, value, trace_back):
         self.s.close()
+include $(GOROOT)/src/Make.inc
+
+TARG=memoserver
+GOFILES=\
+	datastore.go\
+	memoserver.go\
+
+include $(GOROOT)/src/Make.cmd

memo_go/datastore.go

+package main
+
+import ( 
+	"container/list"
+	"sync"
+)
+
+
+type MemoQueue struct {
+	lock sync.RWMutex
+	table map[string](*list.List)
+}
+
+func NewMemoQueue() *MemoQueue {	
+	var memoqueue = new(MemoQueue)
+	memoqueue.table = make(map[string](*list.List))
+	return memoqueue
+}
+
+
+// Put a a value into the queue specified by key
+func (m *MemoQueue) Put(key string, value string) {
+
+	m.lock.Lock()
+
+	queue, found := m.table[key]
+	if !found {
+		var newlist *list.List = list.New()
+		m.table[key] = newlist
+		queue = newlist
+	} 
+	
+	queue.PushBack(value)
+
+	m.lock.Unlock()
+}
+
+
+// Get a value from the queue specified by key
+func (m *MemoQueue) Get(key string) (string, bool) {
+
+	m.lock.RLock()
+
+	/*do a check for membership and panic*/
+	queue, ok := m.table[key]
+	if !ok { return "", ok /* TODO find something better to return here than an empty string */ }
+
+	var first = queue.Front()
+	queue.Remove(first)
+
+	retVal, _ := first.Value.(string)
+
+	m.lock.RUnlock()
+
+	return retVal, true
+}

memo_go/memoserver.go

+package main
+
+import ( 
+	"fmt"
+	"http"
+	"flag"
+)
+
+
+var mqueue *MemoQueue
+
+
+func PutHandler(w http.ResponseWriter, r *http.Request) {
+
+	key := r.FormValue("key")
+	value := r.FormValue("value")
+
+	// TODO must be a better way to enforce the values being set...
+	if len(key) != 0 && len(value) != 0 {
+		mqueue.Put(key, value)
+		fmt.Fprintf(w, "ok")
+	}
+
+}
+
+
+func GetHandler(w http.ResponseWriter, r *http.Request) {
+	key := r.FormValue("key")
+
+	if len(key) != 0 {
+		value, ok := mqueue.Get(key)
+		if ok {
+			fmt.Fprintf(w, "%s", value)
+		}
+	}
+}
+
+func StatsHandler(w http.ResponseWriter, r *http.Request) {	
+	for queue, mylist := range mqueue.table {
+		fmt.Fprintf(w, "%s: %d<br/>", queue, mylist.Len())
+	}
+}
+
+
+func slam(queue string, max int, done chan int) {
+	for i := 0; i<max; i++ {
+		_, _, err := http.Get(fmt.Sprintf("http://localhost:8080/put?key=%s&value=%d", queue, i))
+		if(err != nil) {
+			//just retry incessantly if anything goes wrong
+			i = i - 1
+		}
+	}
+	done <- 1
+}
+
+func main() {	
+
+	var benchmark = flag.Bool("benchmark", false, "benchmark an already running server")
+
+	flag.Parse()
+
+	if(*benchmark) {
+		done := make(chan int)
+
+		go slam("foo", 10000, done)
+		go slam("bar", 10000, done)
+		go slam("foo", 10000, done)
+		go slam("bar", 10000, done)
+
+		<-done
+		<-done
+		<-done
+		<-done
+
+	} else {
+		mqueue = NewMemoQueue()
+
+		http.HandleFunc("/put", PutHandler)
+		http.HandleFunc("/get", GetHandler)
+		http.HandleFunc("/stats", StatsHandler)
+
+		http.ListenAndServe(":8080", nil)
+	}
+}
 import socket
 import threading
+import _thread as thread
 import pickle
 
 RECV_SIZE = 1024
 
 PUT = 'p'
 GET = 'g'
+SHUTDOWN = 's'
 
 class Updater(object):
 
-    def __init__(self, clientsocket, memo_queue, lock):
+    def __init__(self, serversocket, clientsocket, memo_queue, lock):
+        self.serversocket = serversocket
         self.clientsocket = clientsocket
         self.memo_queue = memo_queue
         self.lock = lock
         
         if msg_parts[0] == PUT:
             self.update_memo_queue(msg_parts[1], msg_parts[2])
-        if msg_parts[0] == GET:
+            
+        elif msg_parts[0] == GET:
             self.fetch_from_memo_queue(msg_parts[1])
+            
+        elif msg_parts[0] == SHUTDOWN:
+            self.serversocket.close()
+
+            
         self.clientsocket.close() #TODO should shutdown?
 
+
     def update_memo_queue(self, key, value):
         #very pessimistic piece of crap to begin
         self.lock.acquire()
         #hmmm
         #need some thread pool or evented server?
         #single threading this makes it faaaaast
-        updater = Updater(clientsocket, memo_queue, threading.Lock())
+        updater = Updater(serversocket, clientsocket, memo_queue, threading.Lock())
         #thread = threading.Thread(target=updater.run)
         #thread.start()
         updater.run()
 
 from memo_server import serve
 from memo_client import MemoServer
+import time
 
 def thread_a_server():
     """Start a memo server locally (in a thread) and return a
 
 #make these not-module level globals when there's good way to
 #stop and join() a server thread
-hostname, port, memo_queue = thread_a_server()
+
 
 class FunctionalTests(unittest.TestCase):
     """Functional tests for the memo client and server.
     """
 
     def setUp(self):
-        memo_queue.clear()
+        self.hostname, self.port, self.memo_queue = thread_a_server()
+        time.sleep(1)
 
 
-    def test_put(self):
-        with MemoServer(hostname, port) as memo_server:
-            memo_server.put("asdf", "jkl")
-            memo_server.put("asdf", "qwer")
+    def tearDown(self):
+        with MemoServer(self.hostname, self.port) as memo_server:
+            memo_server.shutdown()
 
-            memo_server.put("qwer", "123")
+
+    # def test_put(self):
+    #     with MemoServer(self.hostname, self.port) as memo_server:
+    #         memo_server.put("asdf", "jkl")
+    #         memo_server.put("asdf", "qwer")
+
+    #         memo_server.put("qwer", "123")
             
             
-        self.assertEquals(["jkl", "qwer"], memo_queue["asdf"])
-        self.assertEquals(["123"], memo_queue["qwer"])
+    #     self.assertEquals(["jkl", "qwer"], self.memo_queue["asdf"])
+    #     self.assertEquals(["123"], self.memo_queue["qwer"])
 
 
     def test_get(self):
-        memo_queue["123"] = ["a", "b"]
-        with MemoServer(hostname, port) as memo_server:
+        self.memo_queue["123"] = ["a", "b"]
+        with MemoServer(self.hostname, self.port) as memo_server:
             get1 = memo_server.get("123")
             get2 = memo_server.get("123")
             get3 = memo_server.get("123")