Source

aquchinchay / src / aquchinchay / csp / server.go

Full commit
package csp

import (
	"aquchinchay/clients"
	"aquchinchay/config"
	"crypto/rand"
	"encoding/json"
	"fmt"
	"net/http"
	"time"
)

type CSPServer struct {
	Sessions map[string]CSPSession
	ClientFactory func () clients.RemoteClient
	staticDirectory string
}

func (server *CSPServer) newSessionID () (string) {
	id := make([]byte, 64)
	rand.Read(id)
	newID := ""
	for _, b := range(id) {
		newID += fmt.Sprintf("%x", b)
	}
	return newID
}

func handshake (server *CSPServer) (func (http.ResponseWriter, *http.Request)) {
	return func (w http.ResponseWriter, r *http.Request) {
		var data interface{}
		var dataBytes []byte
		if r.Method == "GET" {
			dataBytes = []byte(r.FormValue("d"))
		} else if r.Method == "POST" {
			dataBytes = []byte(r.FormValue("d"))
			// // const CHUNK = 1024
			// dataBytes = make([]byte, CHUNK)
			// r.Body.Read(dataBytes)
		}
		json.Unmarshal(dataBytes, &data)
		client := server.ClientFactory()
		session := NewCSPSession(client)
		id := server.newSessionID()
		server.Sessions[id] = session
		session.handleRequest(r.Form)
		responseMessage := map[string]string{"session": id}
		responseJSON, err := json.Marshal(responseMessage)
		if err != nil {
			fmt.Fprint(w, err)
		}
		w.Header().Set("Content-type", session.Vars["ct"])
		fmt.Fprint(w, session.prepareResponse(string(responseJSON)))
	}
}

func send (server *CSPServer) (func (http.ResponseWriter, *http.Request)) {
	return func (w http.ResponseWriter, r *http.Request) {
		sessionID := r.FormValue("s")
		session := server.Sessions[sessionID]
		session.handleRequest(r.Form);
		batch := packetBatchFromJSON(r.FormValue("d"))
	session.Send(batch.data())
	w.Header().Set("Content-type", session.Vars["ct"])
	fmt.Fprint(w, session.prepareResponse("OK"))
	}
}

func comet (server *CSPServer) (func (http.ResponseWriter, *http.Request)) {
	return func (w http.ResponseWriter, r *http.Request) {
		sessionID := r.FormValue("s")
		session := server.Sessions[sessionID]
		session.handleRequest(r.Form)
		if session.IsStreaming() {
			server.cometStream(session, w)
		} else {
			server.cometPoll(session, w)
		}
	}
}

func close (server *CSPServer) (func (http.ResponseWriter, *http.Request)) {
	return func (w http.ResponseWriter, r *http.Request) {
		sessionID := r.FormValue("s")
		session := server.Sessions[sessionID]
		session.Close()
	}
}

func (server *CSPServer) cometPoll (session CSPSession, w http.ResponseWriter) {
	fmt.Println("comet polling")
	var batch CSPPacketBatch
	if session.hasPackets() {
		fmt.Println("comet has packets")
		batch = session.batchPackets()
	} else {
		timeout := time.After(time.Duration(session.Duration()) * time.Second)
		fmt.Println("comet waiting for packets")
		batch = newPacketBatch()
		select {
		case data := <- session.Client.Channel():
			packet := newPacket(session.NextPacketId, data)
			session.NextPacketId += 1
			batch.addPacket(packet)
		case <- timeout:
		}
	}
	packetJSON := batch.toJSON()
	w.Header().Set("Content-type", session.Vars["ct"])
	fmt.Fprint(w, session.prepareBatch(packetJSON))
}

func (server *CSPServer) cometStream (session CSPSession, w http.ResponseWriter) {
	fmt.Println("comet streaming")
	w.Header().Set("Content-type", session.Vars["ct"])
	if session.hasPackets() {
		fmt.Println("comet has packets")
		batch := session.batchPackets()
		packetJSON := batch.toJSON()
		fmt.Fprintln(w, packetJSON)	
	}
	deadline := time.Now().Add(time.Duration(session.Duration()) * time.Second)
	for time.Now().Before(deadline) {
		timeout := time.After(deadline.Sub(time.Now()))
		fmt.Println("comet waiting for packets")
		select {
		case data := <- session.Client.Channel():
			packet := newPacket(session.NextPacketId, data)
			session.NextPacketId += 1
			batch := newPacketBatch()
			batch.addPacket(packet)
			packetJSON := batch.toJSON()
			fmt.Fprintln(w, packetJSON)
			fmt.Println("comet streamed packet")
		case <- timeout:
		}
	}
}

func NewServer (
	globalConfig config.GlobalConfig,
	serverConfig config.ServiceConfig) CSPServer {
	server := CSPServer{
		make(map[string]CSPSession), 
		clients.MakeClient(serverConfig.ClientConfig),
		globalConfig.StaticDirectory}
	return server
}

func (server *CSPServer) start (address string, resultChan chan error) {
	handler := http.NewServeMux()
	handler.HandleFunc("/csp/handshake", handshake(server))
	handler.HandleFunc("/csp/send", send(server))
	handler.HandleFunc("/csp/comet", comet(server))
	handler.Handle("/csp/static/", 
		http.StripPrefix("/csp/static/",
		http.FileServer(http.Dir(server.staticDirectory))))
	go func () {
		err := http.ListenAndServe(address, handler)
		resultChan <- err
	}()
}

func StartServer (
	globalConfig config.GlobalConfig,
	serverConfig config.ServiceConfig,
	resultChan chan error) CSPServer {
	server := NewServer(globalConfig, serverConfig)
	server.start(serverConfig.ListenerAddress, resultChan)
	return server
}