Commits

Liam Staskawicz committed a8d1408

prelim commit

correctly receiving RTP and RTCP packets from a unicast client. not sending anything yet.

  • Participants

Comments (0)

Files changed (9)

+
+# ignore exe's
+loop/loop
+
+# go-rtp
+
+An RTP lib for Go. Very incomplete, still exploring.
+
+# testing
+
+/Applications/VLC.app/Contents/MacOS/VLC -vvv MyFavoriteSong.wav --sout '#rtp{dst=127.0.0.1,port=5220}'

File loop/main.go

+package main
+
+import (
+	"log"
+
+	"bitbucket.org/liamstask/go-rtp/rtp"
+)
+
+const (
+	rtpAddr  = "127.0.0.1:5220"
+	rtcpAddr = "127.0.0.1:5221"
+)
+
+func main() {
+	log.Println("running...")
+
+	s := rtp.NewSession(rtpAddr, rtcpAddr)
+    s.Start()
+	select {}
+
+	log.Println("done")
+}

File rtp/handler.go

+package rtp
+
+// Handlers define how we pass incoming data to application code
+
+type RtpHandler interface {
+	HandleRtp(*RtpPacket)
+}
+
+type RtcpHandler interface {
+	HandleRtcp(*RtcpPacket)
+}
+
+type CtrlEventHandler interface {
+	HandleCtrlEvent(*CtrlEvent)
+}
+
+type RtpHandlerFunc func(*RtpPacket)
+
+func (f RtpHandlerFunc) HandleRtp(p *RtpPacket) {
+	f(p)
+}
+
+type RtcpHandlerFunc func(*RtcpPacket)
+
+func (f RtcpHandlerFunc) HandleRtcp(p *RtcpPacket) {
+	f(p)
+}
+
+type CtrlEventHandlerFunc func(*CtrlEvent)
+
+func (f CtrlEventHandlerFunc) HandleCtrlEvent(e *CtrlEvent) {
+	f(e)
+}

File rtp/interval.go

+package rtp
+
+import (
+	// "log"
+	"time"
+)
+
+// long running service to handle periodic transmission events
+func transmissionIntervalService(s *Session) {
+
+	interval := time.Duration(time.Second*2 + time.Millisecond*500)
+	ticker := time.NewTicker(interval)
+
+	for {
+		select {
+		case <-ticker.C:
+			// check for stale streams, etc
+		}
+	}
+}

File rtp/packetRtcp.go

+package rtp
+
+import (
+	"encoding/binary"
+	"net"
+)
+
+// RTCP packet types
+const (
+	RtcpSenderReport   = 200
+	RtcpReceiverReport = 201
+	RtcpSdes           = 202
+	RtcpBye            = 203
+)
+
+// SDES chunk types
+const (
+	SdesEnd       = iota // end of SDES list, null octet
+	SdesCname            // canonical name
+	SdesName             // user name
+	SdesEmail            // user's electronic mail address
+	SdesPhone            // user's phone number
+	SdesLoc              // geographic user location
+	SdesTool             // name of application or tool
+	SdesNote             // notice about the source
+	SdesPriv             // private extensions
+	SdesH323Caddr        // H.323 callable address
+	sdesMax
+)
+
+const (
+	rtcpFixedHeaderSize = 4
+)
+
+type RtcpPacket struct {
+	buf  []byte
+	from net.Addr
+}
+
+// each RTCP packet begins with a fixed header
+type RtcpFixedHeader struct {
+	Version    uint8
+	Padding    bool
+	RCount     uint8
+	PacketType uint8
+	Length     int // in bytes
+}
+
+type SenderReport struct {
+	Header  *RtcpFixedHeader
+	SyncSrc uint32
+
+	// sender info
+	NtpTimestamp uint64
+	RtpTimestamp uint32
+	PacketCount  uint32
+	OctetCount   uint32
+
+	ReceptionReports []ReceptionReport
+}
+
+// sub-struct within a SenderReport
+type ReceptionReport struct {
+	Ssrc             uint32
+	FracLost         uint8
+	PacketsLost      uint32
+	HighSeqNum       uint32
+	Jitter           uint32
+	LastSRTimestamp  uint32
+	DelaySinceLastSR uint32
+}
+
+type ReceiverReport struct {
+	Header  *RtcpFixedHeader
+	SyncSrc uint8
+}
+
+type SdesPacket struct {
+	Header *RtcpFixedHeader
+	Chunks []SdesChunk
+}
+
+type SdesChunk struct {
+	Ssrc  uint32
+	Items []SdesItem
+}
+
+type SdesItem struct {
+	Type uint8
+	Text string
+}
+
+type ByePacket struct {
+	Header *RtcpFixedHeader
+	Srcs   []uint32
+	Reason string
+}
+
+// Extract the fixed size header from this packet starting at pos
+func (p *RtcpPacket) Header(pos int) *RtcpFixedHeader {
+	h := &RtcpFixedHeader{}
+
+	b := p.buf[pos:]
+	b0 := b[0]
+	h.Version = (b0 & versionMask) >> versionShift
+	h.Padding = (b0 & paddingMask) != 0
+	h.RCount = b0 & receptionReportCountMask
+
+	h.PacketType = b[1]
+	// translate length to bytes
+	wordsMinusOne := binary.BigEndian.Uint16(b[2:])
+	h.Length = int((wordsMinusOne + 1) * 4)
+
+	return h
+}
+
+// Extract a sender report from this packet starting at pos.
+func (p *RtcpPacket) SenderReport(h *RtcpFixedHeader, pos int) *SenderReport {
+	s := &SenderReport{
+		Header: h,
+	}
+
+	// we expect pos to be pointing at the beginning of the packet
+	b := p.buf[pos+rtcpFixedHeaderSize:]
+	be := binary.BigEndian
+
+	s.SyncSrc = be.Uint32(b[0:])
+	s.NtpTimestamp = be.Uint64(b[4:])
+	s.RtpTimestamp = be.Uint32(b[12:])
+	s.PacketCount = be.Uint32(b[16:])
+	s.OctetCount = be.Uint32(b[20:])
+
+	pos = 24
+	for i := uint8(0); i < h.RCount; i++ {
+		rr := ReceptionReport{
+			Ssrc:             be.Uint32(b[pos:]),
+			FracLost:         b[pos+4],
+			PacketsLost:      be.Uint32(b[pos+4:]) & 0xffffff,
+			HighSeqNum:       be.Uint32(b[pos+8:]),
+			Jitter:           be.Uint32(b[pos+12:]),
+			LastSRTimestamp:  be.Uint32(b[pos+16:]),
+			DelaySinceLastSR: be.Uint32(b[pos+5:]),
+		}
+		s.ReceptionReports = append(s.ReceptionReports, rr)
+		pos += 24
+	}
+
+	return s
+}
+
+// Extract an SDES packet starting at pos
+func (p *RtcpPacket) SdesPacket(h *RtcpFixedHeader, pos int) *SdesPacket {
+	s := &SdesPacket{
+		Header: h,
+	}
+
+	// we expect pos to be pointing at the beginning of the packet
+	b := p.buf[pos+rtcpFixedHeaderSize:]
+
+	// zero or more chunks, each of which contain zero or more items
+	pos = 0
+	for i := uint8(0); i < h.RCount; i++ {
+		c := SdesChunk{
+			Ssrc: binary.BigEndian.Uint32(b[pos:]),
+		}
+		pos += 4
+
+		for pos < len(b) {
+			// XXX: test that we at least have 2 bytes for type and len
+			itemtype := b[pos]
+			if itemtype == SdesEnd {
+				break
+			}
+			textlen := int(b[pos+1])
+			// strings are utf8 encoded
+			// XXX: verify textlen is valid
+			c.Items = append(c.Items, SdesItem{
+				Type: itemtype,
+				Text: string(b[pos+2 : pos+2+textlen]),
+			})
+			pos += 2 + textlen
+		}
+		s.Chunks = append(s.Chunks, c)
+	}
+
+	return s
+}
+
+// Extract a BYE packet starting at pos
+func (p *RtcpPacket) ByePacket(h *RtcpFixedHeader, pos int) *ByePacket {
+	bp := &ByePacket{
+		Header: h,
+	}
+
+	// we expect pos to be pointing at the beginning of the packet
+	b := p.buf[pos+rtcpFixedHeaderSize:]
+
+	pos = 0
+	for i := uint8(0); i < h.RCount; i++ {
+		bp.Srcs = append(bp.Srcs, binary.BigEndian.Uint32(b[pos:]))
+		pos += 4
+	}
+
+	// Reason is optional, check if we have extra bytes
+	if pos < len(b) {
+		reasonlen := int(b[pos])
+		reasonend := pos + 1 + reasonlen
+		if reasonend < len(b) {
+			bp.Reason = string(b[pos+1 : reasonend])
+		}
+	}
+
+	return bp
+}

File rtp/packetRtp.go

+package rtp
+
+import (
+	"encoding/binary"
+	"log"
+	"net"
+)
+
+const (
+	versionShift  = 6
+	versionMask   = 0x3 << versionShift
+	paddingMask   = 0x1 << 5
+	extensionMask = 0x1 << 4
+	csrcCountMask = 0xf
+
+	markerMask      = 0x1 << 7
+	payloadTypeMask = 0x7f
+
+	receptionReportCountMask = 0x1f
+)
+
+// Fixed size header for all RTP packets
+type RtpHeader struct {
+	Version     uint8
+	Padding     bool
+	Extension   bool
+	CsrcCount   uint8 // 0 - 15
+	Marker      bool
+	PayloadType uint8
+	SeqNum      uint16 // init'd randomly to avoid plaintext attacks
+	Timestamp   uint32
+	SyncSrc     uint32
+	CsrcList    []uint32 // len == CsrcCount
+}
+
+type RtpPacket struct {
+	buf  []byte
+	from net.Addr
+}
+
+// Extract the fixed size header from the beginning of this packet
+func (p *RtpPacket) Header() *RtpHeader {
+	h := &RtpHeader{}
+
+	b0 := p.buf[0]
+	h.Version = (b0 & versionMask) >> versionShift
+	h.Padding = (b0 & paddingMask) != 0
+	h.Extension = (b0 & extensionMask) != 0
+	h.CsrcCount = (b0 & csrcCountMask)
+	if h.CsrcCount > 15 {
+		log.Println("rtp: CsrcCount > 15:", h.CsrcCount)
+	}
+
+	b1 := p.buf[1]
+	h.Marker = (b1 & markerMask) != 0
+	h.PayloadType = (b1 & payloadTypeMask)
+
+	be := binary.BigEndian
+	h.SeqNum = be.Uint16(p.buf[2:])
+	h.Timestamp = be.Uint32(p.buf[4:])
+	h.SyncSrc = be.Uint32(p.buf[8:])
+
+	if h.CsrcCount > 0 {
+		h.CsrcList = make([]uint32, h.CsrcCount)
+		for i := uint8(0); i < h.CsrcCount; i++ {
+			h.CsrcList[i] = be.Uint32(p.buf[12+i*4:])
+		}
+	}
+
+	return h
+}

File rtp/session.go

+package rtp
+
+import (
+	// "time"
+	"log"
+)
+
+type sessionState struct {
+	tp          int     // the last time an RTCP packet was transmitted
+	tc          int     // the current time
+	tn          int     // the next scheduled transmission time of an RTCP packet
+	pmembers    int     // the estimated number of session members at the time tn was last recomputed
+	members     int     // the most current estimate for the number of session members
+	senders     int     // the most current estimate for the number of senders in the session
+	rtcpBw      int     // The target RTCP bandwidth
+	weSent      bool    // true if the application has sent data since the 2nd previous RTCP report was transmitted
+	avgRtcpSize float32 // average compound RTCP packet size, in octets, over all RTCP packets sent and received
+	initial     int     // Flag that is true if the application has not yet sent an RTCP packet.
+}
+
+type Session struct {
+	RtpHandler       RtpHandler
+	RtcpHandler      RtcpHandler
+	CtrlEventHandler CtrlEventHandler
+	Transport        *TransportUDP  // XXX: this should likely be a Transport interface
+	state            sessionState
+	members          *MemberTable
+}
+
+type CtrlEvent struct {
+	EventType int    // Either a Stream event or a Rtcp* packet type event, e.g. RtcpSR, RtcpRR, RtcpSdes, RtcpBye
+	Ssrc      uint32 // the input stream's SSRC
+	Index     uint32 // and its index
+	Reason    string // Resaon string if it was available, empty otherwise
+}
+
+func NewSession(rtp, rtcp string) *Session {
+	s := &Session{
+		members: NewMemberTable(),
+	}
+	s.Transport = &TransportUDP{
+		Handler:  s,
+		RtpAddr:  rtp,
+		RtcpAddr: rtcp,
+	}
+	return s
+}
+
+func (s *Session) Start() error {
+
+	err := s.Transport.ListenAndServe()
+	if err != nil {
+		return err
+	}
+
+	go transmissionIntervalService(s)
+
+	return nil
+}
+
+// a new RTP packet has arrived from our transport
+func (s *Session) HandleRtp(p *RtpPacket) {
+	// XXX: check if packet is valid
+
+	// forward packet to application
+	if s.RtpHandler != nil {
+		s.RtpHandler.HandleRtp(p)
+	}
+}
+
+// a new RTCP packet has arrived from our transport.
+//
+// any RTCP packet can be compound, which means individual packets
+// can be stacked back to back in the same transport layer packet.
+func (s *Session) HandleRtcp(p *RtcpPacket) {
+
+	plen := len(p.buf)
+	s.state.UpdateAvgSize(plen)
+
+	for pos := 0; pos < plen; {
+		hdr := p.Header(pos)
+
+		switch hdr.PacketType {
+		case RtcpSenderReport:
+			sr := p.SenderReport(hdr, pos)
+			s.handleSenderReport(p, sr)
+
+		case RtcpReceiverReport:
+			log.Println("RtcpReceiverReport")
+
+		case RtcpSdes:
+			sd := p.SdesPacket(hdr, pos)
+			s.handleSdes(p, sd)
+
+		case RtcpBye:
+			bp := p.ByePacket(hdr, pos)
+			s.handleBye(p, bp)
+		}
+
+		pos += hdr.Length
+	}
+
+	// dispatch to application
+	if s.RtcpHandler != nil {
+		s.RtcpHandler.HandleRtcp(p)
+	}
+}
+
+// a SenderReport has been received.
+func (s *Session) handleSenderReport(p *RtcpPacket, sr *SenderReport) {
+	log.Println("SenderReport:", sr)
+
+    // XXX: update stats for relevant streams
+
+	for _, rr := range sr.ReceptionReports {
+		// pass RRs to application
+		log.Println(rr)
+	}
+}
+
+func (s *Session) handleSdes(p *RtcpPacket, sd *SdesPacket) {
+	log.Println("RtcpSdes:", sd)
+}
+
+func (s *Session) handleBye(p *RtcpPacket, b *ByePacket) {
+	log.Println("RtcpBye:", b)
+}
+
+// update avgRtcpSize per section 6.3.3
+func (ss *sessionState) UpdateAvgSize(sz int) {
+	ss.avgRtcpSize = (1.0/16.0)*float32(sz) + (15.0/16.0)*ss.avgRtcpSize
+}
+
+func (s *Session) reportCtrlEvent(code int, ssrc, index uint32) {
+	if s.CtrlEventHandler != nil {
+		e := &CtrlEvent{
+			EventType: code,
+			Ssrc:      ssrc,
+			Index:     index,
+		}
+		s.CtrlEventHandler.HandleCtrlEvent(e)
+	}
+}

File rtp/transportUdp.go

+package rtp
+
+import (
+	// "log"
+	"net"
+)
+
+type TransportHandler interface {
+	HandleRtp(*RtpPacket)
+	HandleRtcp(*RtcpPacket)
+}
+
+type TransportUDP struct {
+	Handler  TransportHandler
+	RtpAddr  string
+	RtcpAddr string
+	rtpConn  *net.UDPConn
+	rtcpConn *net.UDPConn
+}
+
+const (
+	maxPacket = 1500 // reasonable MTU size
+)
+
+// listen for both incoming rtp and rtcp data
+func (t *TransportUDP) ListenAndServe() error {
+
+	rtpAddr, err := net.ResolveUDPAddr("udp", t.RtpAddr)
+	if err != nil {
+		return err
+	}
+
+	rtcpAddr, err := net.ResolveUDPAddr("udp", t.RtcpAddr)
+	if err != nil {
+		return err
+	}
+
+	rtpConn, err := net.ListenUDP(rtpAddr.Network(), rtpAddr)
+	if err != nil {
+		return err
+	}
+
+	rtcpConn, err := net.ListenUDP(rtcpAddr.Network(), rtcpAddr)
+	if err != nil {
+		rtpConn.Close()
+		return err
+	}
+
+	t.rtpConn = rtpConn
+	t.rtcpConn = rtcpConn
+
+	go t.readRtpPacket()
+	go t.readRtcpPacket()
+
+	return nil
+}
+
+func ListenAndServeUDP(rtp, rtcp string, h TransportHandler) error {
+	t := &TransportUDP{
+		Handler:  h,
+		RtpAddr:  rtp,
+		RtcpAddr: rtcp,
+	}
+	return t.ListenAndServe()
+}
+
+func (t *TransportUDP) readRtpPacket() {
+
+	for {
+		// XXX: worth recycling packets?
+		//      do some profiling eventually...
+		p := &RtpPacket{
+			buf: make([]byte, maxPacket),
+		}
+		n, addr, err := t.rtpConn.ReadFromUDP(p.buf)
+		if err != nil {
+			if e, ok := err.(net.Error); ok && e.Timeout() {
+				continue // just a timeout, try again
+			}
+			break // real problem, bail for now
+		}
+
+		p.buf = p.buf[:n]
+		p.from = addr
+		t.Handler.HandleRtp(p)
+	}
+
+	t.rtpConn.Close()
+}
+
+func (t *TransportUDP) readRtcpPacket() {
+
+	for {
+		// XXX: worth recycling packets?
+		//      do some profiling eventually...
+		p := &RtcpPacket{
+			buf: make([]byte, maxPacket),
+		}
+		n, addr, err := t.rtcpConn.ReadFromUDP(p.buf)
+		if err != nil {
+			if e, ok := err.(net.Error); ok && e.Timeout() {
+				continue // just a timeout, try again
+			}
+			break // real problem, bail for now
+		}
+
+		p.buf = p.buf[:n]
+		p.from = addr
+		t.Handler.HandleRtcp(p)
+	}
+
+	t.rtcpConn.Close()
+}