Anonymous avatar Anonymous committed c1dab69

replace poll in shortcuts.Pool to select

Comments (0)

Files changed (4)

 curl_multi_fdset = init_func(native, 'curl_multi_fdset',
     errcheck=curl_multi_errcheck)
 
+class curl_msg(object):
+    def __init__(self, msg):
+        self.__msg = msg
+
+    def __getattr__(self, name):
+        return getattr(self.__msg, name)
+
+    @property
+    def easy_handle(self):
+        return self.__msg.easy_handle.contents.value
+
+    def __repr__(self):
+        return super(curl_msg, self).__repr__()
+        
 # info_read
-curl_multi_info_read = init_func(native, 'curl_multi_info_read',
+_curl_multi_info_read = init_func(native, 'curl_multi_info_read',
     restype=ctypes.POINTER(const.CURLMsg),
     errcheck=curl_multi_errcheck)
 
+def curl_multi_info_read(handle):
+    number = ctypes.c_int()
+    msg = _curl_multi_info_read(handle, ctypes.byref(number))
+    if msg:
+        return curl_msg(msg.contents), number.value
+
 
 # curl_multi_init
 curl_multi_init = init_func(native, 'curl_multi_init',

pylibcurl/multi.py

                 or
             return None
         """
-        number = ctypes.c_int()
-        message = lib.curl_multi_info_read(self._handle, ctypes.byref(number))
+        return lib.curl_multi_info_read(self._handle)
 
-        if not message:
-            return None
-
-        msg = message.contents
-        #msg.easy_handle = Curl(msg.easy_handle) # put curl instance of Curl
-
-        return msg, number.value
 
     def setopt(self, **kwargs):
         """

pylibcurl/shortcuts.py

     def __init__(self, maxconnects, qsize=0, **kwargs):
         self._multi = Multi(**kwargs)
         self._multi.maxconnects = maxconnects
+        # TODO: fix bug with pipelining Pool not work
+        #self._multi.pipelining = 1
+
+
         self._queue = Queue(qsize)
         self._size = maxconnects
 
         self._data = {}
-        self._sockets = set()
+        
+        # sockets
         self._complete = set() # pair(curl, socket)
+        self._read_sockets = set()
+        self._write_sockets = set()
+        self._error_sockets = set()
 
 
         def socket_cb(curl, socket, event):
+            r = self._read_sockets
+            w = self._write_sockets
+            complete = self._complete
+
             if event == const.CURL_POLL_REMOVE:
-                self._complete.add((curl, socket))
-            elif socket not in self._sockets:
-                self._sockets.add(socket)
+                complete.add((curl, socket))
+                if socket in r:
+                    r.remove(socket)
+
+                if socket in w:
+                    w.remove(socket)
+            else:
+                if event == const.CURL_POLL_IN:
+                    r.add(socket)
+                    if socket in w:
+                        w.remove(socket)
+                elif event == const.CURL_POLL_OUT:
+                    w.add(socket)
+                    if socket in r:
+                        r.remove(socket)
+                elif event == const.CURL_POLL_INOUT:
+                    r.add(socket)
+                    w.add(socket)
             
         # TODO: timer
         def timer_cb(m, timeout):
 
     def _do_remove(self):
         # remove complete curls and sockets
-        for curl, sock in self._complete:
+        for curl, sock in self._complete.copy():
             self._multi.remove_handle(curl)
-            self._sockets.remove(sock)
             
             # call callback
             url, header, body, callback = self._data[curl]
         self._do_add()
         running = 1
 
+        r = self._read_sockets
+        w = self._write_sockets
+        e = self._error_sockets
+        do_action = self._do_socket_action
+
         while True:
             
             while running:
-                (r, w, e) = select.select(self._sockets, self._sockets, self._sockets, 1)
+                rr, ww, ee = select.select(r, w, e, 1.0)
 
-                sockets = set(r + w + e)
-                for sock in sockets:
-                    running = self._do_socket_action(sock, 0)
                 
+                for socket in rr:
+                    running = do_action(socket, const.CURL_CSELECT_IN)
+                
+                for socket in ww:
+                    running = do_action(socket, const.CURL_CSELECT_OUT)
+                
+                for socket in ee:
+                    running = do_action(socket, const.CURL_CSELECT_ERR)
                 
                 self._do_remove()
                 self._do_add()

tests/http_multi.py

             'http://docs.djangoproject.com/en/dev/ref/contrib/admin/actions/',
             'http://docs.python.org/library/select.html#module-select',
             'http://docs.djangoproject.com/en/1.2/ref/forms/widgets/',
+
+
+            'http://curl.haxx.se/libcurl/c/curl_multi_socket_action.html',
+            'http://www.codeproject.com/KB/IP/ScalableClientServer.aspx',
+            'http://habrahabr.ru/blogs/twitter/108230/',
+            'http://habrahabr.ru/company/intel/blog/108231/',
+            'http://docs.python.org/library/select.html#module-select',
+            'http://docs.python.org/using/windows.html',
+            'http://chardet.feedparser.org/download/',
+            'http://pycurl.cvs.sourceforge.net/viewvc/pycurl/pycurl/tests/test_multi_socket_select.py?revision=1.1&view=markup',
+            'http://docs.djangoproject.com/en/dev/ref/class-based-views/#django.views.generic.detail.DetailView',
         ]
 
 
             writefunction=writefunction,
         )
 
+    def dump_info_read(self):
+        while True:
+            obj = self._multi.info_read()
+            if not obj:
+                break
+            msg, number_in_queue = obj
+
+            print 'msg', msg.msg
+            print 'easy_handle', msg.easy_handle
+            print 'whatever', msg.data.whatever
+            print 'result', msg.data.result
+
     def tearDown(self):
         self._multi.close()
 
                 print 'curl socket: %s event: %s' % (socket, event)
                 print
 
+    
             if event == const.CURL_POLL_REMOVE:
-                while True:
-                    obj = m.info_read()
-                    if not obj:
-                        break
-                    msg, number_in_queue = obj
-
-                    if self.debug:
-                        print msg.easy_handle
-                        print 'result: %s' % msg.data.result
-                
+                pass
             else:
                 if socket not in sockets: 
                     sockets.add(socket)
         while code == const.CURLM_CALL_MULTI_SOCKET:
             code, running = m.socket_action(const.CURL_SOCKET_TIMEOUT, 0)
         
-        #return 
         while running:
             events = epoll.poll(1)
             for sock, event in events:
                     code = const.CURLM_CALL_MULTI_SOCKET
                     while code == const.CURLM_CALL_MULTI_SOCKET:
                         code, running = m.socket_action(sock, mask)
-                        #print code, running
 
 
     def test_pool(self):
 
 
             
+    def test_socket_action_select(self):
+        sockets = set()
+        r = set()
+        w = set()
+        e = set()
+        
+        def socket_cb(curl, socket, event):
+            if event == const.CURL_POLL_REMOVE:
+                sockets.remove(socket)
+                if socket in r:
+                    r.remove(socket)
 
+                if socket in w:
+                    w.remove(socket)
+            else:
+                sockets.add(socket)
+                if event == const.CURL_POLL_IN:
+                    r.add(socket)
+                    if socket in w:
+                        w.remove(socket)
+                elif event == const.CURL_POLL_OUT:
+                    w.add(socket)
+                    if socket in r:
+                        r.remove(socket)
+                elif event == const.CURL_POLL_INOUT:
+                    r.add(socket)
+                    w.add(socket)
+
+        
+        def timer_cb(m, timeout):
+            if timeout == 0:
+                code, running = m.socket_action(const.CURL_SOCKET_TIMEOUT, 0)
+
+        def do_action(m, socket, event_bitmask=0):
+            code = const.CURLM_CALL_MULTI_SOCKET
+            while code:
+                code, running = self._multi.socket_action(socket, event_bitmask)
+            return running
+
+        self._multi.socketfunction = socket_cb
+        self._multi.timerfunction = timer_cb
+
+
+        for url in self.urls:
+            c = Curl(
+                url=url,
+                verbose=self.debug,
+                **self._defautl_curl_settings
+            )
+            self._multi.add_handle(c)
+
+
+        code, running = self._multi.socket_action(const.CURL_SOCKET_TIMEOUT, 0)
+        while running:
+            rr, ww, ee = select.select(r, w, e, 1.0)
+
+            
+            for socket in rr:
+                running = do_action(self._multi, socket, const.CURL_CSELECT_IN)
+            
+            for socket in ww:
+                running = do_action(self._multi, socket, const.CURL_CSELECT_OUT)
+            
+            for socket in ee:
+                running = do_action(self._multi, socket, const.CURL_CSELECT_ERR)
+            
+            
+
+            
 
 
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.