Commits

Lynn Rees committed c481b2e

-more async

  • Participants
  • Parent commits 8634ca2

Comments (0)

Files changed (1)

 
     '''HTTP and FTP crawling, reporting, and checking'''
     
-    import os as _os
+    import os as _os    
     import urllib as _ulib
     import asyncore as _async
     import urlparse as _uparse    
     from os import path as _path 
     from ftplib import FTP as _ftp
     import mimetools as _mimetools
-    import traceback as _traceback    
+    import traceback as _traceback
+    from time import time as _time    
     from socket import AF_INET as _afinet
     from time import strftime as _formtime
-    from time import localtime as _localtime    
+    from time import localtime as _localtime
     from ftplib import error_perm as _ftperr
     from cStringIO import StringIO as _stringio
     from socket import SOCK_STREAM as _sockstream
     except ImportError: pass
     
     _bdsig, _bfsig, _session, _newparser = None, None, None, None
-    _badurl, _badhtm, _visited, _badhtms = None, None, {}, []
     _bqsig = None
     # HTML tags with URLs
     _urltags = {'a':1, 'img':1, 'link':1, 'script':1, 'iframe':1, 'object':1,
         else: self.width = None
         if depth: self.depth = depth
         else: self.depth = None
+
+    def _defasync(self):        
         urlparse, stringio = self._uparse.urlparse, self._stringio
         sockstream, afinet = self._sockstream, self._afinet
         rcodes, okcodes = self._rcodes, self._okcodes
         badurl, badhtm = self._badurl, self._badhtm
         sperror, visited = self._sperror, self._visited
-        badhtms, async, traceback = self._badhtms, self._async, self._traceback 
-        mimetools = self._mimetools
+        badhtms, async, traceback = self.badhtms, self._async, self._traceback
+        badurls = self.badurls
+        mimetools, time = self._mimetools, self._time
         
         class AsyncHttp(async.dispatcher_with_send):
             
-            def __init__(self, url, consumer, base=None, oldurl=None, nurl=None):
+            def __init__(self, url, consumer, base=None, old=None, new=None):
                 async.dispatcher_with_send.__init__(self)
-                self.url, self.consumer = url, consumer
-                if base: self.base = base
+                if base: self.base, consumer.base = base, base
                 else: self.base = None
-                if oldurl: self.oldurl = oldurl
-                else: self.oldurl = None
-                if nurl: self.nurl = nurl
-                else: self.nurl = self.url
-                getstring = 'GET %s HTTP/1.0\r\nHost: %s\r\n\r\n'
+                if old: self.old, consumer.old = old, old
+                else: self.old = None
+                if new: self.new = new
+                else: self.new = url
+                consumer.url, consumer.new = url, self.new
                 scheme, host, path, params, query, fragment = urlparse(url)
                 assert scheme == 'http', 'only supports HTTP requests'
                 if not path: path = '/'
                 if params: path = ';'.join([path, params])
                 if query: path = '?'.join([path, query])
-                self.request = getstring % (path, host)
-                self.host, self.port, self.status = host, 80, None
-                self.header, self.data = None, ''
+                self.url, self.consumer, self.path = url, consumer, path
+                self.host, self.port, self.status = host, 80, None                
+                self.header, self.data, self.timestamp = None, '', time()
+                self.bytes_in, self.bytes_out = 0, 0
                 self.create_socket(afinet, sockstream)
-                self.connect((host, self.port))
+                self.connect((self.host, self.port))
 
             def handle_connect(self):
-                self.send(self.request)
+                request = 'GET %s HTTP/1.0\r\nHost: %s\r\n\r\n'
+                request = request % (self.path, self.host)
+                self.send(request)
+                self.bytes_out = self.bytes_out + len(request)
 
             def handle_error(self):
-                traceback.print_exc()
                 self.close()
+                traceback.print_exc()                
 
             def handle_expt(self):
                 self.close()
 
             def handle_read(self):
                 data = self.recv(2048)
+                self.bytes_in = self.bytes_in + len(data)
                 if not self.header:
-                    self.data = ''.join([self.data, data])
-                    try: i = self.data.index('\r\n\r\n')
-                    except ValueError: return
-                    else:
-                        fp = stringio(self.data[:i+4])
-                        status = fp.readline()
-                        self.status = status.split(' ', 2)
-                        self.header = mimetools.Message(fp)
-                        data = self.data[i+4:]
-                        self.data = ''
-                        self.type = self.header['Content-Type']
-                        self.consumer.url = self.url
-                        self.consumer.type = self.type
-                        self.consumer.nurl = self.nurl
-                        if self.base: self.consumer.base = self.base
-                        if self.oldurl: self.consumer.oldurl = self.oldurl
-                        try: http_header = self.consumer.http_header
-                        except AttributeError: pass
-                        else: http_header(self)
-                        if not self.connected: return
-                try: self.consumer.feed(data)
+                    self.data = ''.join([self.data, data])                    
+                    header = self.data.split('\r\n\r\n', 1)
+                    if len(header) <= 1: return
+                    header, data = header
+                    fp = stringio(header)
+                    self.status = fp.readline().split(' ', 2)
+                    self.header = mimetools.Message(fp)
+                    self.type = self.header['Content-Type']
+                    self.consumer.type, self.data = self.type, ''  
+                    try: http_header = self.consumer.http_header
+                    except AttributeError: pass
+                    else: http_header(self)
+                    if not self.connected: return
+                try:
+                    if data: self.consumer.feed(data)
                 except sperror:
+                    self.close()
                     visited[self.url] = 1
                     if badhtm: badhtms[self.url] = 1
                 
                 self.consumer.close()
                 self.close()
 
+        class Manager:
+
+            max_con = 20
+            max_size = 100000000
+            max_time = 40
+
+            def __init__(self):
+                self.queue = []
+
+            def add(self, new, consumer, base, old):
+                self.queue.append((new, consumer, base, old))
+
+            def poll(self, timeout=0.1):
+                while self.queue:
+                    if len(self.queue) > self.max_con:
+                        while len(async.socket_map) < self.max_con:                        
+                            AsyncHttp(*self.queue.pop(0))
+                    else:
+                        while self.queue: AsyncHttp(*self.queue.pop(0))
+                    now, count, length = time(), 0, 0
+                    while async.socket_map:
+                        if len(async.socket_map) == length:
+                            count += 1
+                            print count
+                            if count == 500:
+                                async.socket_map.popitem()
+                                count = 0
+                        else: count = 0
+                        for channel in async.socket_map.values():
+                            if channel.bytes_in > self.max_size:
+                                channel.close()
+                            if now - channel.timestamp > self.max_time:
+                                channel.close()
+                        async.poll(timeout)
+                        print async.socket_map                        
+                        length = len(async.socket_map)
+                    
         class Redir:
             
             def __init__(self, consumer, stash):
                 self.consumer, self.url, self.type = consumer, None, None
-                self.badurl, self.oldurl, self.base = None, None, None
-                self.nurl, self.stash = None, stash
+                self.badurl, self.old, self.base = None, None, None
+                self.new, self.stash = None, stash
                 
             def http_header(self, request):
                 if request.status is None or request.status[1] not in rcodes:
                     elif 'uri' in request.header:
                         url = request.header['uri']
                     request.close()
-                    AsyncHTTP(url, self, self.base, self.oldurl, self.url)
+                    AsyncHttp(url, self, self.base, self.old, self.url)
 
         class ExRedir(Redir):
 
                 if self.consumer.badurl:
                     visited[self.url] = 1
                     if badurl:
-                        badurls.append((self.base, self.oldurl, self.url))
+                        badurls.append((self.base, self.old, self.url))
                 elif self.consumer.urls:
-                    self.stash[(self.nurl, self.url)] = self.consumer.urls
-                else: self.stash[(self.nurl, self.url)] = []
+                    self.stash[(self.new, self.url)] = self.consumer.urls
+                else: self.stash[(self.new, self.url)] = []
                 self.consumer.close()
 
         class BadRedir(Redir):
                 self.consumer.close()
 
         self._BadRedir, self._ExRedir = BadRedir, ExRedir
-        self._AsyncHttp = AsyncHttp
+        self._AsyncHttp, self._Manager = AsyncHttp, Manager
 
     def _ftpopen(self, base, name='anonymous', password=None, attempts=3):
         '''Returns FTP client session
         base = self.base
         BadUrl, BadRedir = self._BadUrl, self._BadRedir
         urljoin, AsyncHttp = self._uparse.urljoin, self._AsyncHttp
-        stash = []
+        stash, async, time = [], self._async, self._time
         # Generate random string of jibber
         from string import letters, digits
         from random import choice, randint
         AsyncHttp(urljoin(base, '%s/' % ru), BadRedir(BadUrl(), stash))
         # Builds signature of a bad URL for a query
         AsyncHttp(urljoin(base, '%s?%s' % (ru, ru)), BadRedir(BadUrl(), stash))
-        # Loop
-        self._async.loop()
+        max_time = 40
+        now = time()
+        while async.socket_map:                        
+            for channel in async.socket_map.values():
+                if now - channel.timestamp > max_time:
+                    channel.close()
+            async.poll(0.01)
         # Store signatures
         self._bfsig.extend(stash[0])
         self._bdsig.extend(stash[1])
         # Return badurl marker and list of child URLS
         return urlget.badurl, urlget.urls
 
-    def _webopen(self, newurl, base, oldurl):
+    def _webopen(self, new, base, old):
         '''Returns real URL and extracted child URLs
 
         Arguments:
-        newurl -- newly resolved URL
+        new -- newly resolved URL
         base -- referring URL
-        oldurl - original URL'''
+        old - original URL'''
         try:
             # If webspiders can access URL, open it
-            if self._robot.can_fetch('*', newurl):
-                url = self._ulib.urlopen(newurl)
+            if self._robot.can_fetch('*', new):
+                url = self._ulib.urlopen(new)
             # Otherwise, mark as visited and abort
             else:
-                self._visited[newurl] = 1
+                self._visited[new] = 1
                 return 0, 0
         # If HTTP error, log bad URL and abort
         except IOError:
-            self._visited[newurl] = 1
-            if self._badurl: self.badurls.append((base, oldurl, newurl))
+            self._visited[new] = 1
+            if self._badurl: self.badurls.append((base, old, new))
             return 0, 0
         # Get real URL
-        newurl = url.geturl()
+        new = url.geturl()
         # URLs with mimetype 'text/html" scanned for URLs
         if url.headers.type == 'text/html':
             # Feed parser
             try: badurl, urls = self._webparser(contents)
             # Log URL if SGML parser can't parse it 
             except self._sperror:
-                self._visited[newurl] = 1
-                if self._badhtm: self.badhtms[newurl] = 1
+                self._visited[new] = 1
+                if self._badhtm: self.badhtms[new] = 1
                 return 0, 0
             url.close()
             # Return URL and extracted urls if it's good
-            if not badurl: return newurl, urls
+            if not badurl: return new, urls
             # If the URL is bad (after BadUrl), stop processing and log URL
             else:
-                self._visited[newurl] = 1
-                if self._badurl: self.badurls.append((base, oldurl, newurl))
+                self._visited[new] = 1
+                if self._badurl: self.badurls.append((base, old, new))
                 return 0, 0
         # Return URL of non-HTML resources and empty list
         else:
             url.close()
-            return newurl, []
+            return new, []
 
     def _genverify2(self, urllist, base):
         '''Returns a full URL relative to a base URL
         outs, redirs, supported = self.outs, self.redirs, self._supported
         redir, other, out = self._redir, self._other, self._out
         cache, toprocess, robot = self._cache, [], self._robot
-        AsyncHttp, ExRedir = self._AsyncHttp, self._ExRedir
-        UrlExtract, stash = self._UrlExtract, {}
+        ExRedir = self._ExRedir
+        UrlExtract, stash, manager = self._UrlExtract, {}, self._Manager()
         # Strip file off base URL for joining
-        newbase = base.replace(base.split('/')[-1], '')
+        try: newbase = base.replace(base.split('/')[-1], '')
+        except ValueError: newbase = base
         # Handle any child URLs
         for url in urllist:
             if url not in visited:
                 # Remove whitespace from URL
                 if url.find(' ') != -1:
-                    visited[url], nurl = 1, url.replace(' ', '')
+                    visited[url], new = 1, url.replace(' ', '')
                     if url in visited: continue
                 # Remove fragments i.e. 'http:foo/bar#frag'
                 if url.find('#') != -1:
-                    visited[url], nurl = 1, urldefrag(url)[0]
+                    visited[url], new = 1, urldefrag(url)[0]
                     if url in visited: continue
                 # Process full URLs i.e. 'http://foo/bar
                 if url.find(':') != -1:
                     elif not urlseg[2] and urlseg[1] == sb:
                         visited[url] = 1
                         continue
-                    nurl = url
+                    new = url
                 # Handle relative URLs i.e. ../foo/bar
                 elif url.find(':') == -1:
                     # Join root domain and relative URL
-                    visited[url], nurl = 1, urljoin(newbase, url)
-                    if nurl in visited: continue
-                toprocess.append((nurl, base, url))
-        for nurl, base, url in toprocess:
-            if robot.can_fetch('*', nurl):
+                    visited[url], new = 1, urljoin(newbase, url)
+                    if new in visited: continue
+                toprocess.append((new, base, url))
+        for new, base, url in toprocess:
+            if robot.can_fetch('*', new):
                 consumer = ExRedir(UrlExtract(), stash)
-                AsyncHttp(nurl, consumer, base, url)
-            else: visited[nurl] = 1
-        self._async.loop()
-        print stash
-        for turl, nurl in stash:
+                manager.add(new, consumer, base, url)
+            else: visited[new] = 1
+        manager.poll()
+        for turl, new in stash:
             if turl not in visited:
-                visited[nurl], visited[turl]  = 1, 1
+                visited[new], visited[turl]  = 1, 1
                 # If URL resolved to a different URL, process it
-                if turl != nurl:
+                if turl != new:
                     urlseg = urlsplit(turl)
                     # If URL is not in root domain, block it
                     if urlseg[1] not in sb:
                         # Log as a redirected internal URL
-                        if redir: redirs[(nurl, turl)] = base
+                        if redir: redirs[(new, turl)] = base
                         continue
                     # Block duplicate root URLs
                     elif not urlseg[2] and urlseg[1] == sb: continue
                 if len(turl.split('/')) >= depth: continue
                 # Otherwise return URL
                 else:
-                    newurls, rawurls = {}, stash.get((turl, nurl))
+                    newurls, rawurls = {}, stash.get((turl, new))
                     if rawurls:
                         for rawurl in rawurls:
                             if rawurl not in visited: newurls[rawurl] = 1
                         cache[turl] = newurls
-                    print len(visited)
                     yield turl, base
 
     def _urlverify(self, url, base, newbase):
         if url not in visited:
             # Remove whitespace from URL
             if url.find(' ') != -1:
-                visited[url], nurl = 1, url.replace(' ', '')
+                visited[url], new = 1, url.replace(' ', '')
                 if url in visited: return 0, 0
             # Remove fragments i.e. 'http:foo/bar#frag'
             if url.find('#') != -1:
-                visited[url], nurl = 1, urldefrag(url)[0]
+                visited[url], new = 1, urldefrag(url)[0]
                 if url in visited: return 0, 0
             # Process full URLs i.e. 'http://foo/bar
             if url.find(':') != -1:
                 elif not urlseg[2] and urlseg[1] == sb:
                     visited[url] = 1
                     return 0, 0
-                nurl = url
+                new = url
             # Handle relative URLs i.e. ../foo/bar
             elif url.find(':') == -1:
                 # Join root domain and relative URL
-                visited[url], nurl = 1, urljoin(newbase, url)
-                if nurl in visited: return 0, 0
+                visited[url], new = 1, urljoin(newbase, url)
+                if new in visited: return 0, 0
             # Test URL by attempting to open it
-            turl, rawurls = webopen(nurl, base, url)
+            turl, rawurls = webopen(new, base, url)
             if turl and turl not in visited:
-                visited[nurl], visited[turl]  = 1, 1
+                visited[new], visited[turl]  = 1, 1
                 # If URL resolved to a different URL, process it
-                if turl != nurl:
+                if turl != new:
                     urlseg = urlsplit(turl)
                     # If URL is not in root domain, block it
                     if urlseg[1] not in sb:
                         # Log as a redirected internal URL
-                        if redir: redirs[(nurl, turl)] = base
+                        if redir: redirs[(new, turl)] = base
                         return 0, 0
                     # Block duplicate root URLs
                     elif not urlseg[2] and urlseg[1] == sb: return 0, 0
         else: self.width = width
         # sgmlop crashes Python after too many iterations
         #if width > 5000:
-        self._parserpick(1)
+        self._defasync()
+        self._parserpick(1)        
         #else: self._parserpick() 
         # Use global base if present
         if not base: base = self.base
             if newurls:
                 # Cache URLs individually if threads are desired
                 if thread:
-                    for newurl in newurls: cache[newurl] = base
+                    for new in newurls: cache[new] = base
                 # Cache in group if no threads
                 else: cache[base] = newurls
             # Make base URL, get split, and put in verified URL list
 redireport = _inst.redireport
 othereport = _inst.othereport
 badurlreport = _inst.badurlreport
-badhtmreport = _inst.badhtmreport
+badhtmreport = _inst.badhtmreport