Commits

Martin Gergov committed c6b8f0e

Stabilazing..

  • Participants
  • Parent commits 61d19cb

Comments (0)

Files changed (5)

examples/sockrecv.py

+from udt4 import socket, listen, UDTepoll, bind, recvmsg, UDTException
+from udt4 import startup, cleanup, accept, UDT_EPOLL_IN, UDT_EPOLL_OUT
+from udt4 import EASYNCRCV, sendmsg
+from socket import AF_INET, SOCK_DGRAM, AI_PASSIVE
+import udt4
+from udt4.pyudt import UdtSocket
+
+startup()
+s = UdtSocket(AF_INET, SOCK_DGRAM, AI_PASSIVE)
+print s.UDTSOCKET
+s.setblocking(False)
+s.bind(("127.0.0.1", 5000))
+s.listen(20)
+poller = UDTepoll()
+poller.add_usock(s.UDTSOCKET, UDT_EPOLL_IN)
+try:
+    while True:
+        sets = poller.wait(True, True, 1000, False, False)
+        print sets
+        for socket in sets[0]:
+            print socket
+            if (socket.UDTSOCKET == s.UDTSOCKET.UDTSOCKET):
+                sock, host = accept(socket)
+                print sock, host
+                sock = UdtSocket(_sock=sock)
+                sock.setblocking(False)
+                poller.add_usock(sock.UDTSOCKET, UDT_EPOLL_OUT)
+        for socket in sets[1]:
+            try:
+                print recvmsg(socket, 10)
+            except UDTException as e:
+                continue
+            else:
+                print "Break now!"
+                sendmsg(socket.UDTSOCKET, "bagger"*1000, 6*1000+1, -1, True)
+                poller.remove_usock(socket.UDTSOCKET, UDT_EPOLL_IN)
+                break
+                    
+except KeyboardInterrupt:
+    exit("Bye!")
+poller.remove_usock(s.UDTSOCKET)
+
+cleanup()

examples/testudt.py

+import sys
+sys.path.append("../")
+
+from udt4twisted import udtepollreactor
+udtepollreactor.install()
+
+from twisted.python import log
+from twisted.internet import reactor, defer
+from twisted.internet.protocol import DatagramProtocol
+from twisted.python import log
+log.startLogging(sys.stdout)
+
+class Echo(DatagramProtocol):
+
+    def startProtocol(self):
+        log.msg("CLIENT!!!")
+
+    def datagramReceived(self, data, (host, port)):
+        log.msg("received %r from %s:%d" % (data, host, port))
+        self.transport.write(data, (host, port))
+
+
+
+class Tester(object):
+    port = None
+    def listen_to_udt_port(self, res):
+        self.port = reactor.listenUDT(5000, Echo())
+        print self.port
+    
+    def print_hello(self, res):
+        print "Hello!"
+        return "hello"
+
+
+    def bye(self):
+        print self.port
+        self.port.stopListening()
+        reactor.stop()
+        print "Bye!"
+
+d = defer.Deferred()
+test = Tester()
+
+d.addCallback(test.print_hello)
+d.addCallback(test.listen_to_udt_port)
+
+reactor.callLater(0.3, d.callback, 1)
+reactor.callLater(100, test.bye)
+reactor.run()

tests/test_top_api.py

     def test_udp_handling(self):
         p = reactor.listenUDP(5000, Echo())
         p.stopListening()
-
-
-    test_listening.todo = "Make this work!"
-
-    
-        
-    

udt4twisted/udt.py

 from twisted.internet import abstract, error, interfaces
 import udt4 as udt
 from   udt4 import pyudt
+from socket import AI_PASSIVE
+from errno import EWOULDBLOCK, EINTR, EMSGSIZE, ECONNREFUSED, EAGAIN
+_sockErrReadIgnore = [EAGAIN, EINTR, EWOULDBLOCK]
+_sockErrReadRefuse = [ECONNREFUSED]
+
 
 @implementer(
     interfaces.IListeningPort, interfaces.IUDPTransport,
     """
     UDT port, listening for packets.
     """
+    maxThroughput = 256 * 2048
 
     def __init__(self,
                  port,
                  proto,
                  interface='localhost',
                  maxPacketSize=8192,
-                 reactor=None):
+                 reactor=None,
+                 backlog=50):
         """
         Initialize with a numeric port to listen on.
         """
         self.port = port
         self.protocol = proto
         self.maxPacketSize = maxPacketSize
-        self.interface = socket.gethostbyname_ex(interface)[2][0]
+        if interface == '':
+            self.interface = "0.0.0.0"
+        else:
+            self.interface = socket.gethostbyname_ex(interface)[2][0]
         self.setLogStr()
         self._connectedAddr = None
+        self.backlog = backlog
+        self.addresses = {}
 
 
     def getHandle(self):
         return pyudt.UdtSocket
 
     def createInternetSocket(self):
-        s = pyudt.UdtSocket(self.addressFamily, self.socketType)
+        s = pyudt.UdtSocket(self.addressFamily,
+                            self.socketType,
+                            AI_PASSIVE)
         s.setblocking(False)
         #FIXME
         #fdesc._setCloseOnExec(s.UDTSOCKET.udtsocket)
 
         self.connected = 1
         self.socket = skt
-        #lies, it's all lies
-        self.fileno = skt
+        self.fileno = skt.fileno
 
+    def startListening(self):
+        """
+        Create and bind my socket, and begin listening on it.
+
+        This is called on unserialization, and must be called after creating a
+        server to begin listening on the specified port.
+        """
+        udp.Port.startListening(self)
+        self.socket.listen(self.backlog)
+
+    def doRead(self, fd=None):
+        """
+        Called when my socket is ready for reading.
+        """
+        read = 0
+        while read < self.maxThroughput:
+            try:
+                data = udt.recvmsg(sock, self.maxThroughput)
+                #data, addr = self.socket.recvmsg(self.maxThroughput)
+            except udt.UDTException as ue:
+                return
+            except socket.error as se:
+                no = se.args[0]
+                if no in _sockErrReadIgnore:
+                    return
+                if no in _sockErrReadRefuse:
+                    if self._connectedAddr:
+                        self.protocol.connectionRefused()
+                    return
+                raise
+            else:
+                read += len(data)
+                try:
+                    self.protocol.datagramReceived(data, addr)
+                except:
+                    log.err()
+
+
+    def _connectToProtocol(self):
+        self.protocol.makeConnection(self)
+        self.startReading()
+

udt4twisted/udtepollreactor.py

 """
 from __future__ import division, absolute_import
 import errno
+import sys
 from zope.interface import implementer
+from zope.interface import Interface, Attribute
 from twisted.internet.interfaces import IReactorFDSet
 from twisted.python import log
 from twisted.internet import posixbase, base
 from twisted.internet.epollreactor import EPollReactor, _ContinuousPolling
 import udt4
-from udt4 import pyudt
+from udt4 import pyudt, accept, sendmsg
+from udt4.pyudt import UdtSocket
+from udt4twisted import udt
+
+class IReactorUDT(Interface):
+    """
+    UDT transport interface.
+    """
+    def listenUDT(port,
+                  protocol,
+                  interface='',
+                  maxPacketSize=8192,
+                  backlog=50):
+        """
+        Connects a given DatagramProtocol to the given numeric UDT port.
+
+        @return: object which provides L{IListeningPort}.
+        """
 
 @implementer(IReactorFDSet)
 class _UDTContinuousPolling(_ContinuousPolling):
     _POLL_OUT = udt4.UDT_EPOLL_OUT
 
 
-@implementer(IReactorFDSet)
+@implementer(IReactorFDSet, IReactorUDT)
 class UDTEPollReactor(EPollReactor):
     _POLL_DISCONNECTED = (udt4.ECONNLOST | udt4.UDT_EPOLL_ERR)
     _POLL_IN = udt4.UDT_EPOLL_IN
         self._selectables = {}
         self._continuousPolling = _UDTContinuousPolling(self)
         posixbase.PosixReactorBase.__init__(self)
+        self.addSystemEventTrigger("after", "shutdown", udt4.cleanup)
         
 
     def _add(self, xer, primary, other, selectables, event, antievent):
         It takes care of adding it if  new or modifying it if already added
         for another state (read -> read/write for example).
         """
-        fd = xer.fileno()
+        if isinstance(xer, udt.Port):
+            fd = xer.socket.UDTSOCKET
+        else:
+            fd = xer.fileno()
         if fd not in primary:
             flags = event
             # epoll_ctl can raise all kinds of IOErrors, and every one
                 flags |= antievent
                 self._poller.modify(fd, flags)
             else:
-                print "ADDING:", fd
-                self._poller.add_ssock(fd, flags)
+                
+                if isinstance(xer, udt.Port):
+                    print "ADDING:", fd, "With flags:", flags
+                    self._poller.add_usock(fd, flags)
+                else:
+                    print "ADDING:", fd, "With flags:", flags
+                    self._poller.add_ssock(fd, flags)
 
             # Update our own tracking state *only* after the epoll call has
             # succeeded.  Otherwise we may get out of sync.
             primary[fd] = 1
             selectables[fd] = xer
+
+    def addReader(self, reader):
+        """
+        Add a FileDescriptor for notification of data available to read.
+        """
+        try:
+            self._add(reader, self._reads, self._writes, self._selectables,
+                      self._POLL_IN, self._POLL_OUT)
+        except IOError as e:
+            if e.errno == errno.EPERM:
+                # epoll(7) doesn't support certain file descriptors,
+                # e.g. filesystem files, so for those we just poll
+                # continuously:
+                self._continuousPolling.addReader(reader)
+            else:
+                raise
+
+
+    def addWriter(self, writer):
+        """
+        Add a FileDescriptor for notification of data available to write.
+        """
+        try:
+            self._add(writer, self._writes, self._reads, self._selectables,
+                      self._POLL_OUT, self._POLL_IN)
+        except IOError as e:
+            if e.errno == errno.EPERM:
+                # epoll(7) doesn't support certain file descriptors,
+                # e.g. filesystem files, so for those we just poll
+                # continuously:
+                self._continuousPolling.addWriter(writer)
+            else:
+                raise
+
+    def removeReader(self, reader):
+        """
+        Remove a Selectable for notification of data available to read.
+        """
+        if self._continuousPolling.isReading(reader):
+            self._continuousPolling.removeReader(reader)
+            return
+        self._remove(reader, self._reads, self._writes, self._selectables,
+                     self._POLL_IN, self._POLL_OUT)
+
+
+    def removeWriter(self, writer):
+        """
+        Remove a Selectable for notification of data available to write.
+        """
+        if self._continuousPolling.isWriting(writer):
+            self._continuousPolling.removeWriter(writer)
+            return
+        self._remove(writer, self._writes, self._reads, self._selectables,
+                      self._POLL_OUT, self._POLL_IN)
             
             
     def _remove(self, xer, primary, other, selectables, event, antievent):
         It does the inverse job of _add, and also add a check in case of the fd
         has gone away.
         """
-        fd = xer.fileno()
+        if isinstance(xer, udt.Port):
+            fd = xer.socket.UDTSOCKET
+        else:
+            fd = xer.fileno()
+            
         if fd == -1:
             for fd, fdes in selectables.items():
                 if xer is fdes:
                 # See comment above modify call in _add.
                 self._poller.modify(fd, flags)
             else:
+                flags = event
                 del selectables[fd]
                 # See comment above _control call in _add.
-                print "REMOVING:", fd
-                self._poller.remove_ssock(fd)
+                if isinstance(xer, udt.Port):
+                    print "REMOVING:", fd, "With flags:", flags
+                    self._poller.remove_usock(fd, flags)
+                else:
+                    #FIXME
+                    try:
+                        print "REMOVING:", fd, "With flags:", flags
+                        self._poller.remove_ssock(fd, flags)
+                    except TypeError:
+                        pass
             del primary[fd]
 
     def _handleSystemSocketSet(self, set, event):
             except KeyError:
                 pass
             else:
+                print "FD:", fd, "SELECT:", selectable
+                log.callWithLogger(selectable, _drdw, selectable, fd,
+                                   event)
+    def _handleUDTReadSocketSet(self, set, event):
+        _drdw = self._doReadOrWrite
+        for fd in set:
+            try:
+                selectable = self._selectables[fd]
+                socket, addr = accept(fd)
+                print socket, addr
+                selectable.addresses[fd] = addr
+                socket = UdtSocket(_sock=socket)
+                socket.setblocking(False)
+                self._poller.add_usock(socket.UDTSOCKET, event)
+            except KeyError:
+                pass
+            else:
+                pass
+
+    def _handleUDTWriteSocketSet(self, set, event):
+        _drdw = self._doReadOrWrite
+        
+        for fd in set:
+            try:
+                print "Hello!?!"
+                selectable = self._selectables[fd]
+            except KeyError:
+                pass
+            else:
+                print "FD:", fd, "SELECT:", selectable
                 log.callWithLogger(selectable, _drdw, selectable, fd,
                                    event)
 
         #handle system sockets
         sread, swrite = l[2:]
         
-
         self._handleSystemSocketSet(sread, self._POLL_IN)
         self._handleSystemSocketSet(swrite, self._POLL_OUT)
 
+        #handle UDT sockets
+        uread, uwrite = l[:2]
+        print uread, uwrite
+        self._handleUDTReadSocketSet(uread, self._POLL_IN)
+        self._handleUDTWriteSocketSet(uwrite, self._POLL_IN)
 
-    def stop(self):
+    def _doReadOrWrite(self, selectable, fd, event):
         """
-        Destroy UDT library and stop reactor.
+        fd is available for read or write, do the work and raise errors if
+        necessary.
         """
-        posixbase.PosixReactorBase.stop(self)
-        udt4.cleanup()
+        why = None
+        inRead = False
+        if event & self._POLL_DISCONNECTED and not (event & self._POLL_IN):
+            if fd in self._reads:
+                inRead = True
+                why = CONNECTION_DONE
+            else:
+                why = CONNECTION_LOST
+        else:
+            try:
+                if selectable.fileno() == -1:
+                    why = _NO_FILEDESC
+                else:
+                    if event & self._POLL_IN:
+                        # Handle a read event.
+                        if isinstance(fd, udt4.UDTSOCKET):
+                            why = selectable.doRead(fd)
+                        else:
+                            why = selectable.doRead()
+                        inRead = True
+                    if not why and event & self._POLL_OUT:
+                        # Handle a write event, as long as doRead didn't
+                        # disconnect us.
+                        why = selectable.doWrite()
+                        inRead = False
+            except:
+                # Any exception from application code gets logged and will
+                # cause us to disconnect the selectable.
+                why = sys.exc_info()[1]
+                log.err()
+        if why:
+            self._disconnectSelectable(selectable, why, inRead)
+
+
+
+    def listenUDT(self,
+                  port,
+                  protocol,
+                  interface='',
+                  maxPacketSize=8192,
+                  backlog=50):
+        
+        p = udt.Port(port, protocol, interface,
+                     maxPacketSize, self, backlog)
+        p.startListening()
+        return p
         
 
     doIteration = doPoll