1. Martin Gergov
  2. udt4twisted

Commits

Martin Gergov  committed 2de9b0b

Client works, except when there is no server on the other side.
An epoll monitoring should be made on the connection with a customizable timeout.

  • Participants
  • Parent commits b9b5f0f
  • Branches implement_tcp_like_interface

Comments (0)

Files changed (4)

File examples/newclient.py

View file
+import sys
+sys.path.append("../")
+
+from udt4twisted import udtepollreactor
+udtepollreactor.install()
+
+from twisted.python import log
+from twisted.internet import reactor, defer
+from twisted.internet.protocol import DatagramProtocol, Protocol, ClientFactory
+from twisted.python import log
+from udt4twisted import udt
+log.startLogging(sys.stdout)
+
+class Echo(Protocol):
+    def connectionMade(self):
+        print "CONN MADE!"
+        self.transport.write("bomb!!!")
+
+    def dataReceived(self, data):
+        log.msg("received {0} from {1}".format(data, self.transport.getPeer()))
+        self.transport.write("bomb!!!", 200, False)
+
+    def connectionLost(self, reason):
+        print "CONN LOST!"
+
+class EchoFactory(ClientFactory):
+
+    def startedConnecting(self, connector):
+        print 'Started to connect.'
+
+    def buildProtocol(self, addr):
+        print 'Connected.'
+        return Echo()
+
+    def clientConnectionLost(self, connector, reason):
+        print 'Lost connection.  Reason:', reason
+
+    def clientConnectionFailed(self, connector, reason):
+        print 'Connection failed. Reason:', reason
+
+class Tester(object):
+    port = None
+    def connect_to_udt_port(self, res):
+        self.echo = EchoFactory()
+        self.port = reactor.connectUDT("127.0.0.1", 5000, self.echo)
+        print self.port
+
+    def print_hello(self, res):
+        print "Hello!"
+        return "hello"
+
+d = defer.Deferred()
+test = Tester()
+
+d.addCallback(test.print_hello)
+d.addCallback(test.connect_to_udt_port)
+
+reactor.callLater(0.3, d.callback, 1)
+reactor.run()

File examples/newserver.py

View file
 log.startLogging(sys.stdout)
 
 class Echo(Protocol):
+    counter=0
     def connectionMade(self):
-        print "CONN MADE!"
         self.transport.write("An apple a day keeps the doctor away\r\n")
 
     def dataReceived(self, data):
+        print self.counter
+        if self.counter == 100:
+            return
         log.msg("received {0} from {1}".format(data, self.transport.getPeer()))
-        self.transport.write(data, 200, False)
+        self.transport.write(data, 0, False)
+        self.counter += 1
 
     def connectionLost(self, reason):
         print "CONN LOST!"
     port = None
     def listen_to_udt_port(self, res):
         self.echo = EchoFactory()
-        self.port = reactor.tlistenUDT(5000, self.echo)
+        self.port = reactor.listenUDT(5000, self.echo)
         print self.port
 
     def print_hello(self, res):

File udt4twisted/udt.py

View file
 from twisted.python.compat import _PY3
 from twisted.internet import base, defer, address, udp, tcp
 from twisted.python import log, failure, _reflectpy3 as reflect
+from twisted.python._utilpy3 import unsignedID
 from twisted.python.runtime import platformType
 from twisted.internet import abstract, error, interfaces, main
 import udt4 as udt
         # in the result.
         return buffer(bObj, offset) + b"".join(bArray)
 
+# class Connector2(Port2):
 
-def find_key(dic, val):
-    """
-    From http://www.daniweb.com/software-development/python/ \
-    code/217019/search-a-python-dictionary-both-ways
-    return the key of dictionary dic given the value
-    """
-    return [k for k, v in dic.iteritems() if v == val][0]
+#     def doConnect(self, fd=None):
+#         if not fd:
+#             self.socket = self.createInternetSocket()
+#             self.fileno = self.socket.fileno
+#             self.socket.connect_ex(self._connectedAddr)
+#             self._connectDone()
+#             return
 
+#         self._read(fd, self._connectedAddr)
 
-@implementer(
-    interfaces.IListeningPort, interfaces.IUDPTransport,
-    interfaces.ISystemHandle)
-class Port2(udp.Port):
-    """
-    UDT port, listening for packets.
-    """
-    maxThroughput = 256 * 2048
-
-    def __init__(self,
-                 port,
-                 proto,
-                 interface='localhost',
-                 maxPacketSize=8192,
-                 reactor=None,
-                 backlog=50,
-                 ):
-        """
-        Initialize with a numeric port to listen on.
-        """
-        base.BasePort.__init__(self, reactor)
-        self.port = port
-        self.protocol = proto
-        self.maxPacketSize = maxPacketSize
-        if interface == '':
-            self.interface = "127.0.0.1"
-        else:
-            self.interface = socket.gethostbyname_ex(interface)[2][0]
-        self.setLogStr()
-        self._connectedAddr = None
-        self.backlog = backlog
-        self.addresses = {}
-
-
-    def getHandle(self):
-        """
-        Return a socket object.
-        """
-        return pyudt.UdtSocket
-
-    def createInternetSocket(self):
-        s = pyudt.UdtSocket(self.addressFamily,
-                            self.socketType,
-                            AI_PASSIVE)
-        s.setblocking(False)
-        #s.setsockopt(udt.
-        #FIXME
-        #fdesc._setCloseOnExec(s.UDTSOCKET.udtsocket)
-        return s
-
-    def _connectDone(self):
-        self.connected = 1
-        logPrefix = self._getLogPrefix(self.protocol)
-        self.logstr = "%s,client" % logPrefix
-        self.startReading()
-        self.protocol.makeConnection(self)
-
-
-    def connect(self, host, port):
-        """
-        Connect to a remote host.
-        """
-        if self._connectedAddr:
-            raise RuntimeError("already connected, reconnecting is not currently supported")
-        if not abstract.isIPAddress(host):
-            raise ValueError("please pass only IP addresses, not domain names")
-        self._connectedAddr = (host, port)
-        self.doConnect()
-
-    def _bindSocket(self):
-        """
-        Bind socket to an address.
-        """
-        try:
-            skt = self.createInternetSocket()
-            skt.bind((self.interface, self.port))
-        except socket.error as le:
-            raise error.CannotListenError(self.interface, self.port, le)
-
-        # Make sure that if we listened on port 0, we update that to
-        # reflect what the OS actually assigned us.
-        self._realPortNumber = skt.getsockname()[1]
-
-        log.msg("%s starting on %s" % (
-                self._getLogPrefix(self.protocol), self._realPortNumber))
-
-        self.connected = 1
-        self.socket = skt
-        self.fileno = skt.fileno
-
-    def startListening(self):
-        """
-        Create and bind my socket, and begin listening on it.
-
-        This is called on unserialization, and must be called after creating a
-        server to begin listening on the specified port.
-        """
-        udp.Port.startListening(self)
-        self.socket.listen(self.backlog)
-
-
-    def doRead(self, fd=None, addr=None):
-        """
-        Called when my socket is ready for reading.
-        """
-        try:
-            #print "recv from:", fd, " size:", self.maxPacketSize
-            data = udt.recvmsg(fd, self.maxPacketSize)
-            if addr == None:
-                addr = self.addresses[fd.UDTSOCKET][1]
-        except udt.UDTException as ue:
-            return ue
-        except socket.error as se:
-            pass
-        else:
-            #read += len(data)
-            try:
-                self.protocol.datagramReceived(data, addr)
-            except:
-                log.err()
-    def write(self, datagram, addr=None, ttl=-1, inorder=True):
-        """
-        Write datagram to address.
-        @type datagram: C{str}
-        @param datagram: The datagram to be sent.
-
-        @type addr: C{tuple} containing C{str} as first element and C{int} as
-            second element, or C{None}
-        @param addr: A tuple of (I{stringified dotted-quad IP address},
-            I{integer port number}); can be C{None} in connected mode.
-        """
-        if not self._connectedAddr:
-            assert addr != None
-            if not addr[0].replace(".", "").isdigit() and addr[0] != "<broadcast>":
-                warnings.warn("Please only pass IPs to write(), not hostnames",
-                              DeprecationWarning, stacklevel=2)
-        try:
-            if self._connectedAddr:
-                udt.sendmsg(self.socket.UDTSOCKET, datagram, len(datagram),
-                            ttl, inorder)
-                return
-            socket = [v[0] for k, v in self.addresses.iteritems() if  v[1] == addr][0]
-            return udt.sendmsg(socket, datagram, len(datagram), ttl, inorder)
-        except KeyError:
-            print "No key found!"
-        except udt.UDTException as ue:
-            print ue
-            if ue[0] == ECONNLOST:
-                del self.reactor._udtsockets[socket.UDTSOCKET]
-                self.stopListening()
-
-
-    def _connectToProtocol(self):
-        self.protocol.makeConnection(self)
-        self.startReading()
-
-
-
-class Connector2(Port2):
-
-    def doConnect(self, fd=None):
-        if not fd:
-            self.socket = self.createInternetSocket()
-            self.fileno = self.socket.fileno
-            self.socket.connect_ex(self._connectedAddr)
-            self._connectDone()
-            return
-
-        self._read(fd, self._connectedAddr)
-
-    def doRead(self, fd=None):
-        why = Port.doRead(self, self.socket.UDTSOCKET, self._connectedAddr)
-        #FIXME
-        if why:
-            if (why[0] == ECONNLOST):
-                self.stopListening()
-        return why
+#     def doRead(self, fd=None):
+#         why = Port.doRead(self, self.socket.UDTSOCKET, self._connectedAddr)
+#         #FIXME
+#         if why:
+#             if (why[0] == ECONNLOST):
+#                 self.stopListening()
+#         return why
 
 
 @implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
     """
     Superclass of all socket-based FileDescriptors.
 
-    This is an abstract superclass of all objects which represent a TCP/IP
-    connection based socket.
+    This is an abstract superclass of all objects which represent a UDT socket.
 
     @ivar logstr: prefix used when logging events related to this connection.
     @type logstr: C{str}
             l = self.writeSomeData(lazyByteSlice(self.dataBuffer, self.offset),
                                    self._ttl, self._inorder)
         else:
-            l = self.writeSomeData(self.dataBuffer,
-                                   self._ttl, self._inorder)
+            l = self.writeSomeData(self.dataBuffer, self._ttl, self._inorder)
 
         # There is no writeSomeData implementation in Twisted which returns
         # < 0, but the documentation for writeSomeData used to claim negative
         host, port = self.socket.getsockname()[:2]
         return self._addressType('UDP', host, port)
 
+class Client(Connection, tcp._BaseBaseClient):
+    """
+    A transport for a UDT protocol.
+    Do not create these directly; use L{IReactorTCP.connectTCP}.
+    """
+    _addressType = address.IPv4Address
+    _base = Connection
+    _commonConnection = Connection
+    addressFamily = socket.AF_INET
+    socketType = socket.SOCK_DGRAM
 
+    def _finishInit(self, whenDone, skt, error, reactor):
+        """
+        Called by subclasses to continue to the stage of initialization where
+        the socket connect attempt is made.
+
+        @param whenDone: A 0-argument callable to invoke once the connection is
+            set up.  This is C{None} if the connection could not be prepared
+            due to a previous error.
+
+        @param skt: The socket object to use to perform the connection.
+        @type skt: C{socket._socketobject}
+
+        @param error: The error to fail the connection with.
+
+        @param reactor: The reactor to use for this client.
+        @type reactor: L{twisted.internet.interfaces.IReactorTime}
+        """
+        if whenDone:
+            self._commonConnection.__init__(self, skt, None, reactor)
+            reactor.callLater(0, whenDone)
+        else:
+            reactor.callLater(0, self.failIfNotConnected, error)
+
+
+    def createInternetSocket(self):
+        s = pyudt.UdtSocket(self.addressFamily, self.socketType)
+        s.setblocking(False)
+        #FIXME
+        #fdesc._setCloseOnExec(s.fileno())
+        return s
+
+    def getHost(self):
+        """
+        Returns an L{IPv4Address}.
+
+        This indicates the address from which I am connecting.
+        """
+        return self._addressType('UDP', *self.socket.getsockname()[:2])
+
+
+    def getPeer(self):
+        """
+        Returns an L{IPv4Address}.
+
+        This indicates the address that I am connected to.
+        """
+        # an ipv6 realAddress has more than two elements, but the IPv6Address
+        # constructor still only takes two.
+        return self._addressType('UDP', *self.realAddress[:2])
+
+
+    def __repr__(self):
+        s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
+        return s
+
+
+    def __init__(self, host, port, bindAddress, connector, reactor=None):
+        # BaseClient.__init__ is invoked later
+        self.connector = connector
+        self.addr = (host, port)
+
+        whenDone = self.resolveAddress
+        err = None
+        skt = None
+
+        if abstract.isIPAddress(host):
+            self._requiresResolution = False
+        else:
+            self._requiresResolution = True
+        try:
+            skt = self.createInternetSocket()
+        except udt.UDTException as ue:
+            err = error.ConnectBindError(ue[0], ue[1])
+            whenDone = None
+        if whenDone and bindAddress is not None:
+            try:
+                bindinfo = bindAddress
+                skt.bind(bindinfo)
+            except udt.UDTException as ue:
+                err = error.ConnectBindError(ue[0], ue[1])
+                whenDone = None
+        self._finishInit(whenDone, skt, err, reactor)
+
+    def doConnect(self):
+        """
+        Initiate the outgoing connection attempt.
+        @note: Applications do not need to call this method; it will be invoked
+            internally as part of L{IReactorTCP.connectTCP}.
+        """
+        self.doWrite = self.doConnect
+        self.doRead = self.doConnect
+        if not hasattr(self, "connector"):
+            # this happens when connection failed but doConnect
+            # was scheduled via a callLater in self._finishInit
+            return
+
+        # err = self.socket.getsockopt(udt.UDT_STATE)
+        # if err == udt.UDTSTATUS_CONNECTED:
+        #     self.failIfNotConnected(error.getConnectError((
+        #                 err, err)))
+        #     return
+
+        # doConnect gets called twice.  The first time we actually need to
+        # start the connection attempt.  The second time we don't really
+        # want to (SO_ERROR above will have taken care of any errors, and if
+        # it reported none, the mere fact that doConnect was called again is
+        # sufficient to indicate that the connection has succeeded), but it
+        # is not /particularly/ detrimental to do so.  This should get
+        # cleaned up some day, though.
+        try:
+            #FIXME connection happens in background,
+            #epoll monitoring needed
+            connectResult = self.socket.connect_ex(self.realAddress)
+        except udt.UDTException as ue:
+            connectResult = ue[0]
+        if connectResult:
+            if connectResult == udt.EISCONN:
+                pass
+            elif connectResult in (EASYNCRCV,):
+                self.startReading()
+                self.startWriting()
+                return
+            else:
+                self.failIfNotConnected(error.getConnectError((
+                            connectResult, connectResult)))
+                return
+
+        # If I have reached this point without raising or returning, that means
+        # that the socket is connected.
+        del self.doWrite
+        del self.doRead
+        # we first stop and then start, to reset
+        #any references to the old doRead
+        self.stopReading()
+        self.stopWriting()
+        self._connectDone()
+
+    def _connectDone(self):
+        """
+        This is a hook for when a connection attempt has succeeded.
+
+        Here, we build the protocol from the
+        L{twisted.internet.protocol.ClientFactory} that was passed in, compute
+        a log string, begin reading so as to send traffic to the newly built
+        protocol, and finally hook up the protocol itself.
+        """
+        self.protocol = self.connector.buildProtocol(self.getPeer())
+        self.connected = 1
+        logPrefix = self._getLogPrefix(self.protocol)
+        self.logstr = "%s,client" % logPrefix
+        self.startReading()
+        self.protocol.makeConnection(self)
 
 class Connector(base.BaseConnector):
     """
     _addressType = address.IPv4Address
 
     def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
-        if isinstance(port, _portNameType):
-            try:
-                port = socket.getservbyname(port, 'tcp')
-            except socket.error as e:
-                raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
         self.host, self.port = host, port
-        if abstract.isIPv6Address(host):
-            self._addressType = address.IPv6Address
         self.bindAddress = bindAddress
         base.BaseConnector.__init__(self, factory, timeout, reactor)
 
         @return: a new L{Client}
         @rtype: L{Client}
         """
-        return Client(self.host, self.port, self.bindAddress, self, self.reactor)
+        return Client(self.host, self.port,
+                      self.bindAddress, self, self.reactor)
 
 
     def getDestination(self):
         """
         @see: L{twisted.internet.interfaces.IConnector.getDestination}.
         """
-        return self._addressType('TCP', self.host, self.port)
+        return self._addressType('UDP', self.host, self.port)

File udt4twisted/udtepollreactor.py

View file
 
         #handle UDT sockets
         uread, uwrite = l[:2]
+        #print uread, uwrite
         self._handleUDTSocketSet(uread, self._POLL_IN)
         self._handleUDTSocketSet(uwrite, self._POLL_OUT)
 
                         # disconnect us.
                         why = selectable.doWrite()
                         inRead = False
+                        if self._isUDT(selectable):
+                            #FIXME BIG ISSUE epoll keeps notifying for write
+                            #if client connects and doesn't recvmsg
+                            #bad things happen.
+                            gc.collect()
             except:
                 # Any exception from application code gets logged and will
                 # cause us to disconnect the selectable.
         p.startListening()
         return p
 
-    def tlistenUDT(self,
-                   port,
-                   factory,
-                   interface='',
-                   maxPacketSize=8192,
-                   backlog=50):
-        p = udt.Port(port, factory, interface,
-                      maxPacketSize, self, backlog)
-        p.startListening()
-        return p
-
     def connectUDT(self,
                    host,
                    port,
                    protocol,
-                   bindAddress = (0, "127.0.0.1"),
-                   maxPacketSize=8192):
+                   timeout=30,
+                   bindAddress=None):
 
-        c = udt.Connector(bindAddress[0], protocol, bindAddress[1],
-                     maxPacketSize, self, 0)
-        c.connect(host, port)
+        c = udt.Connector(host, port, protocol, timeout, bindAddress, self)
+        c.connect()
         return c