Commits

Evgeniy Tatarkin committed c13343f

timeout on twisted downloader

Comments (0)

Files changed (4)

pomp/contrib/twistedtools.py

 
 class TwistedDownloader(BaseDownloader):
 
-    def __init__(self, reactor, middlewares=None):
+    def __init__(self, reactor, timeout=5, middlewares=None):
         super(TwistedDownloader, self).__init__(middlewares=middlewares)
         self.reactor = reactor
         self.agent = Agent(self.reactor)
+        self.timeout = timeout
 
     def get(self, requests):
         responses = []
             request.data
         )
 
+        # Set timeout to request
+        # on timeout will be errorBack with CancelledError
+        watchdog = self.reactor.callLater(self.timeout, d.cancel)
+        def _reset_timeout(res):
+            if watchdog.active():
+                watchdog.cancel()
+            return res
+        d.addBoth(_reset_timeout)
+
         d.addCallback(self._handle_response)
 
         # deferred result of tiwsted request processing
         return d
 
 
-
 class TwistedHttpRequest(BaseHttpRequest):
 
     def __init__(self, url, data=None, headers=None, method='GET'):

pomp/core/base.py

 
     def __init__(self, request, exception):
         self.request = request
-        self.execption = exception
+        self.exception = exception
 
     def __str__(self):
-        return 'Exception on %s - %s' % (self.request, self.execption)
+        return 'Exception on %s - %s' % (self.request, self.exception)

tests/mockserver.py

+import time
 import json
 import logging
 import multiprocessing
            for key, value in environ.iteritems()]
     return ret
 
+
 def sitemap_app(environ, start_response):
     #setup_testing_defaults(environ)
 
     start_response(status, headers)
 
     requested_url = environ['PATH_INFO']
-    response = sitemap_app.sitemap.get(requested_url)
+    if requested_url == '/sleep':
+        time.sleep(2)
+        response = 'Done'
+    else:
+        response = sitemap_app.sitemap.get(requested_url)
     log.debug('Requested url: %s, response: %s', requested_url, response)
     ret = [response.encode('utf-8')]
     return ret 

tests/test_contrib_twisted.py

 
         done_defer.addCallback(check)
         return done_defer 
+
+
+    @deferred(timeout=1.0)
+    def test_timeout(self):
+
+        req_resp_midlleware = RequestResponseMiddleware(prefix_url=self.httpd.location)
+        collect_middleware = CollectRequestResponseMiddleware()
+
+        downloader = TwistedDownloader(reactor,
+            timeout=0.5,
+            middlewares=[collect_middleware]
+        )
+
+        downloader.middlewares.insert(0, req_resp_midlleware)
+
+        pomp = Pomp(
+            downloader=downloader,
+            pipelines=[PrintPipeline()],
+        )
+
+        DummyCrawler.ENTRY_URL = '/sleep'
+
+        done_defer = defer.Deferred()
+        d = pomp.pump(DummyCrawler())
+
+        d.add_callback(done_defer.callback)
+
+        def check(x):
+            assert len(collect_middleware.exceptions) == 1
+            e = collect_middleware.exceptions[0]
+            assert isinstance(e, BaseDownloadException)
+            assert isinstance(e.exception, defer.CancelledError)
+
+        done_defer.addCallback(check)
+        return done_defer