Commits

Martin Gergov committed cad39de

Made some progress with udt epoll reactor.

  • Participants
  • Parent commits f85df3c

Comments (0)

Files changed (5)

File tests/test_port.py

+import socket as s
+from twisted.trial import unittest
+from twisted.internet.protocol import DatagramProtocol
+from twisted.internet import reactor
+import udt4
+from udt4twisted import udt
+
+class Echo(DatagramProtocol):
+
+    def datagramReceived(self, data, (host, port)):
+        log.msg("received %r from %s:%d" % (data, host, port))
+        self.transport.write(data, (host, port))
+
+class TestPort(unittest.TestCase):
+    
+    def setUp(self):
+        udt4.startup()
+
+    def tearDown(self):
+        udt4.cleanup()
+    
+    def test_server(self):
+        p = udt.Port(5000, Echo())
+        p.startListening()
+        pass

File tests/test_top_api.py

         pass
     
     def test_listening(self):
-        reactor.listenUDT(9999, Echo())
+        udtServer = reactor.listenUDT(9999, Echo())
+        udtServer.stopListening()
 
     test_listening.todo = "Make this work!"
 

File twisted/plugins/udtplugin.py

+from twisted.application.reactors import Reactor
+from twisted.application.service import ServiceMaker
+
+udtepoll = Reactor('udtepoll', 'udt4twisted.udtepollreactor',
+                   'UDT epoll reactor.')
+
+# class UDTEpollServiceMaker(object):
+#     implements(IServiceMaker, IPlugin)
+#     tapname = "udtepoll"
+#     description = "blabla"
+
+#     def makeService(self, options):
+#         """
+#         Constructs the main service.
+#         """
+#         settings = load_settings(options["settings"])
+#         log.startLogging(open(settings["server"]["log"], 'w'),
+#                          setStdout=False)
+#         return internet.TCPServer(int(settings['server']['port']),
+#                                   P2PLivePBServer(Server(settings)))
+
+# serviceMaker = UDTEpollServiceMaker()

File udt4twisted/udt.py

 import struct
 import socket
 from zope.interface import implementer
+from twisted.internet import fdesc
 from twisted.internet import base, defer, address, udp
 from twisted.python import log, failure
 from twisted.internet import abstract, error, interfaces
 class Port(udp.Port):
     """
     UDT port, listening for packets.
-    This is inherited from udp port implementation.
     """
 
+    def __init__(self,
+                 port,
+                 proto,
+                 interface='localhost',
+                 maxPacketSize=8192,
+                 reactor=None):
+        """
+        Initialize with a numeric port to listen on.
+        """
+        base.BasePort.__init__(self, reactor)
+        self.port = port
+        self.protocol = proto
+        self.maxPacketSize = maxPacketSize
+        self.interface = socket.gethostbyname_ex(interface)[2][0]
+        self.setLogStr()
+        self._connectedAddr = None
+
+
     def getHandle(self):
         """
         Return a socket object.
         """
         return pyudt.UdtSocket
 
+    def createInternetSocket(self):
+        s = pyudt.UdtSocket(self.addressFamily, self.socketType)
+        s.setblocking(False)
+        #FIXME
+        #fdesc._setCloseOnExec(s.UDTSOCKET.udtsocket)
+        return s
+
+
     def _bindSocket(self):
         """
         TODO
 
         self.connected = 1
         self.socket = skt
-        self.fileno = self.socket.fileno
+        #lies, it's all lies
+        self.fileno = skt
 
-    def _connectToProtocol(self):
-        """
-        TODO
-        """
-        self.protocol.makeConnection(self)
-        self.startReading()
-

File udt4twisted/udtepollreactor.py

+"""
+An UDT::epoll() implementation.
+For use with the UDT protocol.
+"""
+from __future__ import division, absolute_import
+import errno
+from zope.interface import implementer
+from twisted.internet.interfaces import IReactorFDSet
+from twisted.python import log
+from twisted.internet import posixbase
+from twisted.internet.epollreactor import EPollReactor, _ContinuousPolling
+import udt4
+from udt4 import pyudt
+
+@implementer(IReactorFDSet)
+class _UDTContinuousPolling(_ContinuousPolling):
+    _POLL_DISCONNECTED = (udt4.ECONNLOST | udt4.UDT_EPOLL_ERR)
+    _POLL_IN = udt4.UDT_EPOLL_IN
+    _POLL_OUT = udt4.UDT_EPOLL_OUT
+
+
+@implementer(IReactorFDSet)
+class UDTEPollReactor(EPollReactor):
+    _POLL_DISCONNECTED = (udt4.ECONNLOST | udt4.UDT_EPOLL_ERR)
+    _POLL_IN = udt4.UDT_EPOLL_IN
+    _POLL_OUT = udt4.UDT_EPOLL_OUT
+    
+    def __init__(self):
+        """
+        Initialize epoll object.
+        """
+        self._poller = udt4.UDTepoll()
+        self._reads = {}
+        self._writes = {}
+        self._selectables = {}
+        self._continuousPolling = _UDTContinuousPolling(self)
+        posixbase.PosixReactorBase.__init__(self)
+        
+
+    def _add(self, xer, primary, other, selectables, event, antievent):
+        """
+        Private method for adding a descriptor from the event loop.
+
+        It takes care of adding it if  new or modifying it if already added
+        for another state (read -> read/write for example).
+        """
+        print "FFF",type(xer)
+        fd = xer.fileno()
+        if fd not in primary:
+            flags = event
+            # epoll_ctl can raise all kinds of IOErrors, and every one
+            # indicates a bug either in the reactor or application-code.
+            # Let them all through so someone sees a traceback and fixes
+            # something.  We'll do the same thing for every other call to
+            # this method in this file.
+            if fd in other:
+                flags |= antievent
+                self._poller.modify(fd, flags)
+            else:
+                
+                self._poller.add_ssock(fd, flags)
+
+            # Update our own tracking state *only* after the epoll call has
+            # succeeded.  Otherwise we may get out of sync.
+            primary[fd] = 1
+            selectables[fd] = xer
+
+    def _handleSocketSet(self, set):
+        print set
+        pass
+        #handle system sockets
+        # for fd, event in set[2:]:
+        #     try:
+        #         selectable = self._selectables[fd]
+        #     except KeyError:
+        #         pass
+        #     else:
+        #         log.callWithLogger(selectable, _drdw, selectable, fd, event)
+
+        #handle udt sockets
+
+    def doPoll(self, timeout):
+        """
+        Poll the poller for new events.
+        """
+        if timeout is None:
+            timeout = -1  # Wait indefinitely.
+
+        try:
+            # Limit the number of events to the number of io objects we're
+            # currently tracking (because that's maybe a good heuristic) and
+            # the amount of time we block to the value specified by our
+            # caller.
+            #l = self._poller.poll(timeout, len(self._selectables))
+            l = self._poller.wait(True, True, timeout, True, True)
+        except IOError as err:
+            if err.errno == errno.EINTR:
+                return
+            # See epoll_wait(2) for documentation on the other conditions
+            # under which this can fail.  They can only be due to a serious
+            # programming error on our part, so let's just announce them
+            # loudly.
+            raise
+
+        _drdw = self._doReadOrWrite
+
+        self._handleSocketSet(l)
+        
+
+    doIteration = doPoll
+
+
+def install():
+    """
+    Install the epoll() reactor.
+    """
+    p = UDTEPollReactor()
+    from twisted.internet.main import installReactor
+    installReactor(p)
+
+
+__all__ = ["UDTEPollReactor", "install"]