Martin Gergov avatar Martin Gergov committed 61d19cb

System socket now should work, because real listening can be handled

Comments (0)

Files changed (3)

examples/slisten.py

+import sys
+sys.path.append("../")
+
+from udt4twisted import udtepollreactor
+udtepollreactor.install()
+
+from twisted.internet import reactor, defer
+from twisted.internet.protocol import DatagramProtocol
+from twisted.python import log
+
+class Echo(DatagramProtocol):
+
+    def datagramReceived(self, data, (host, port)):
+        log.msg("received %r from %s:%d" % (data, host, port))
+        self.transport.write(data, (host, port))
+
+
+
+class Tester(object):
+    port = None
+    def listen_to_some_port(self, res):
+        self.port = reactor.listenUDP(5000, Echo())
+        print self.port
+    
+    def print_hello(self, res):
+        print "Hello!"
+        return "hello"
+
+
+    def bye(self):
+        print self.port
+        self.port.stopListening()
+        reactor.stop()
+        print "Bye!"
+
+d = defer.Deferred()
+test = Tester()
+
+d.addCallback(test.print_hello)
+d.addCallback(test.listen_to_some_port)
+
+reactor.callLater(0.3, d.callback, 1)
+reactor.callLater(100, test.bye)
+reactor.run()

tests/test_port.py

         self.transport.write(data, (host, port))
 
 class TestPort(unittest.TestCase):
-    
-    def setUp(self):
-        udt4.startup()
-
-    def tearDown(self):
-        udt4.cleanup()
 
     def test_udt_port_handling(self):
         #p = udt.Port(5000, Echo())

udt4twisted/udtepollreactor.py

 from zope.interface import implementer
 from twisted.internet.interfaces import IReactorFDSet
 from twisted.python import log
-from twisted.internet import posixbase
+from twisted.internet import posixbase, base
 from twisted.internet.epollreactor import EPollReactor, _ContinuousPolling
 import udt4
 from udt4 import pyudt
         """
         Initialize epoll object.
         """
+        udt4.startup()
         self._poller = udt4.UDTepoll()
         self._reads = {}
         self._writes = {}
                 flags |= antievent
                 self._poller.modify(fd, flags)
             else:
+                print "ADDING:", fd
                 self._poller.add_ssock(fd, flags)
 
             # Update our own tracking state *only* after the epoll call has
             else:
                 del selectables[fd]
                 # See comment above _control call in _add.
+                print "REMOVING:", fd
                 self._poller.remove_ssock(fd)
-            del primary[fd]    
+            del primary[fd]
 
-
-    def _handleSocketSet(self, set):
-        #print set
-        pass
-        #handle system sockets
-        # for fd, event in set[2:]:
-        #     try:
-        #         selectable = self._selectables[fd]
-        #     except KeyError:
-        #         pass
-        #     else:
-        #         log.callWithLogger(selectable, _drdw, selectable, fd, event)
-
-        #handle udt sockets
+    def _handleSystemSocketSet(self, set, event):
+        _drdw = self._doReadOrWrite
+        for fd in set:
+            try:
+                selectable = self._selectables[fd]
+            except KeyError:
+                pass
+            else:
+                log.callWithLogger(selectable, _drdw, selectable, fd,
+                                   event)
 
     def doPoll(self, timeout):
         """
             timeout = -1  # Wait indefinitely.
 
         try:
-            # Limit the number of events to the number of io objects we're
-            # currently tracking (because that's maybe a good heuristic) and
-            # the amount of time we block to the value specified by our
-            # caller.
-            #l = self._poller.poll(timeout, len(self._selectables))
-            l = self._poller.wait(True, True, timeout, True, True)
+            #doesn't work with floats :/
+            l = self._poller.wait(True, True, int(timeout*1000), True, True)
         except IOError as err:
             if err.errno == errno.EINTR:
                 return
-            # See epoll_wait(2) for documentation on the other conditions
-            # under which this can fail.  They can only be due to a serious
-            # programming error on our part, so let's just announce them
-            # loudly.
             raise
 
-        _drdw = self._doReadOrWrite
+        #handle system sockets
+        sread, swrite = l[2:]
+        
 
-        self._handleSocketSet(l)
+        self._handleSystemSocketSet(sread, self._POLL_IN)
+        self._handleSystemSocketSet(swrite, self._POLL_OUT)
+
+
+    def stop(self):
+        """
+        Destroy UDT library and stop reactor.
+        """
+        posixbase.PosixReactorBase.stop(self)
+        udt4.cleanup()
         
 
     doIteration = doPoll
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.