Commits

Ross Light committed 0b81286

Make server side more usable

  • Participants
  • Parent commits f826589

Comments (0)

Files changed (1)

 	"bytes"
 	"encoding/binary"
 	"http"
+	"io"
 	"net"
 	"os"
 	"strconv"
 	Handler http.Handler
 }
 
+func (srv *Server) ListenAndServe() os.Error {
+	addr := srv.Addr
+	if addr == "" {
+		addr = ":http"
+	}
+	l, err := net.Listen("tcp", addr)
+	if err != nil {
+		return err
+	}
+	return srv.Serve(l)
+}
+
 func (srv *Server) Serve(l net.Listener) os.Error {
 	defer l.Close()
 	handler := srv.Handler
 }
 
 type session struct {
-	c       net.Conn
-	handler http.Handler
-	in, out chan Frame
-	streams map[uint32]*stream
+	c           net.Conn
+	handler     http.Handler
+	in, out     chan Frame
+	streams     map[uint32]*serverStream
+	streamsLock sync.RWMutex
 }
 
 func newSession(c net.Conn, h http.Handler) (s *session, err os.Error) {
 		handler: h,
 		in:      make(chan Frame),
 		out:     make(chan Frame),
-		streams: make(map[uint32]*stream),
+		streams: make(map[uint32]*serverStream),
 	}
 	return
 }
 }
 
 func (sess *session) handleControl(frame ControlFrame) {
-	// TODO
+	switch frame.Type {
+	case TypeSynStream:
+		sess.streamsLock.Lock()
+		defer sess.streamsLock.Unlock()
+		if stream, err := newServerStream(sess, frame); err == nil {
+			if _, exists := sess.streams[stream.id]; !exists {
+				sess.streams[stream.id] = stream
+				go stream.bufferReads()
+				go sess.handler.ServeHTTP(stream, stream.Request())
+			}
+		}
+	}
 }
 
 func (sess *session) handleData(frame DataFrame) {
+	sess.streamsLock.RLock()
+	defer sess.streamsLock.RUnlock()
+
 	st, found := sess.streams[frame.StreamID]
 	if !found {
 		// TODO: Error?
 	}
 }
 
-type stream struct {
+type serverStream struct {
 	id      uint32
 	session *session
 	closed  bool
 
-	requestHeaders  http.Header
+	headerReader   *HeaderReader
+	requestHeaders http.Header
+
+	headerWriter    *HeaderWriter
 	responseHeaders http.Header
-	headerWriter    *HeaderWriter
 	wroteHeader     bool
 
 	readChan   chan DataFrame
 	lastWrite Frame
 }
 
-func (st *stream) bufferReads() {
+func newServerStream(sess *session, frame ControlFrame) (st *serverStream, err os.Error) {
+	if frame.Type != TypeSynStream {
+		err = os.NewError("Server stream must be created from a SynStream frame")
+		return
+	}
+	st = &serverStream{
+		session:         sess,
+		headerReader:    NewHeaderReader(),
+		headerWriter:    NewHeaderWriter(-1),
+		responseHeaders: make(http.Header),
+	}
+	if frame.Flags&FlagFin == 0 {
+		// Request body will follow
+		st.readChan = make(chan DataFrame)
+		st.readBuffer = new(bytes.Buffer)
+	}
+	// Read frame data
+	data := bytes.NewBuffer(frame.Data)
+	err = binary.Read(data, binary.BigEndian, &st.id)
+	if err != nil {
+		return
+	}
+	_, err = io.ReadFull(data, make([]byte, 6)) // skip associated stream ID and priority
+	if err != nil {
+		return
+	}
+	st.requestHeaders, err = st.headerReader.Decode(data.Bytes())
+	return
+}
+
+func (st *serverStream) bufferReads() {
 	for frame := range st.readChan {
 		st.readLock.Lock()
 		st.readBuffer.Write(frame.Data)
 	st.readChan = nil
 }
 
-func (st *stream) Read(p []byte) (n int, err os.Error) {
+// Request returns the request data associated with the serverStream.
+func (st *serverStream) Request() (req *http.Request) {
+	// TODO
+	req = &http.Request{
+		Method:     st.requestHeaders.Get("method"),
+		RawURL:     st.requestHeaders.Get("url"),
+		Proto:      st.requestHeaders.Get("version"),
+		Header:     st.requestHeaders,
+		Body:       st,
+		RemoteAddr: st.session.c.RemoteAddr().String(),
+	}
+	return
+}
+
+func (st *serverStream) Read(p []byte) (n int, err os.Error) {
 	st.readLock.Lock()
 	defer st.readLock.Unlock()
 	if st.readBuffer.Len() == 0 {
 }
 
 // Header returns the current response headers.
-func (st *stream) Header() http.Header { return st.responseHeaders }
+func (st *serverStream) Header() http.Header { return st.responseHeaders }
 
-func (st *stream) Write(p []byte) (n int, err os.Error) {
+func (st *serverStream) Write(p []byte) (n int, err os.Error) {
 	if st.closed {
-		err = os.NewError("Write on closed stream")
+		err = os.NewError("Write on closed serverStream")
 		return
 	}
 	if !st.wroteHeader {
 	return
 }
 
-func (st *stream) WriteHeader(code int) {
+func (st *serverStream) WriteHeader(code int) {
 	if st.wroteHeader {
 		return
 	}
 	})
 }
 
-func (st *stream) writeFrame(frame Frame) {
+func (st *serverStream) writeFrame(frame Frame) {
 	if st.lastWrite != nil {
 		st.session.out <- st.lastWrite
 	}
 	st.lastWrite = frame
 }
 
-func (st *stream) Flush() os.Error {
+func (st *serverStream) Flush() os.Error {
 	if st.lastWrite != nil {
 		st.session.out <- st.lastWrite
 		st.lastWrite = nil
 	return nil
 }
 
-func (st *stream) Close() (err os.Error) {
+func (st *serverStream) Close() (err os.Error) {
 	if st.lastWrite != nil {
 		switch frame := st.lastWrite.(type) {
 		case DataFrame: