Commits

Petar Marić  committed a23d5cf

Added parallelism to metaTED crawler, leading to substantial performance improvements

  • Participants
  • Parent commits 944ebf4

Comments (0)

Files changed (3)

File metaTED/crawler/get_downloadable_talks.py

 import logging
+from concurrent import futures
+from multiprocessing import cpu_count
+from metaTED.cache import cached_storage
 from metaTED.crawler.get_talk_info import get_talk_info, ExternallyHostedDownloads, NoDownloadsFound
 from metaTED.crawler.get_talks_urls import get_talks_urls
 
 _PAGINATE_BY = 20
 
 
-def get_downloadable_talks():
+class NoDownloadableTalksFound(Exception):
+    pass
+
+
+def get_downloadable_talks(num_workers=None):
     talks_urls = get_talks_urls()
-    num_urls = len(talks_urls)
+    
+    talks_info = cached_storage.get('talks_infos', {})
     downloadable_talks = []
-    for index, talk_url in enumerate(talks_urls):
-        try:
-            if index % _PAGINATE_BY == 0:
-                logging.info(
-                    "Getting download information on %d of %d talks...",
-                    index+1,
-                    num_urls
-                )
-            downloadable_talks.append(get_talk_info(talk_url))
-        except ExternallyHostedDownloads, e:
-            logging.info(
-                "Downloads for '%s' are not hosted by TED, skipping",
-                talk_url
+    new_talks_urls = []
+    for talk_url in talks_urls:
+        if talk_url in talks_info:
+            downloadable_talks.append(talks_info[talk_url])
+        else:
+            new_talks_urls.append(talk_url)
+    
+    if not new_talks_urls:
+        logging.info('No new talk urls found')
+    else:
+        num_new_talks = len(new_talks_urls)
+        logging.info("Found %d new talk url(s)", num_new_talks)
+        
+        if num_workers is None:
+            num_workers = 2*cpu_count() # Network IO is the bottleneck
+        with futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
+            future_to_url = dict(
+                (executor.submit(get_talk_info, talk_url), talk_url)
+                for talk_url in new_talks_urls
             )
-        except NoDownloadsFound, e:
-            logging.error("No downloads for '%s', skipping", talk_url)
-        except Exception, e:
-            logging.error("Skipping '%s', reason: %s", talk_url, e)
+            
+            for index, future in enumerate(futures.as_completed(future_to_url), start=1):
+                if index % _PAGINATE_BY == 1:
+                    logging.info(
+                        "Getting download information on %d of %d talks...",
+                        index,
+                        num_new_talks
+                    )
+                
+                talk_url = future_to_url[future]
+                if future.exception() is not None:
+                    e = future.exception()
+                    if isinstance(e, ExternallyHostedDownloads):
+                        logging.info(
+                            "Downloads for '%s' are not hosted by TED, skipping",
+                            talk_url
+                        )
+                    elif isinstance(e, NoDownloadsFound):
+                        logging.error("No downloads for '%s', skipping", talk_url)
+                    else:
+                        logging.error("Skipping '%s', reason: %s", talk_url, e)
+                else:
+                    talk_info = future.result()
+                    downloadable_talks.append(talk_info)
+                    talks_info[talk_url] = talk_info
+        
+        cached_storage['talks_infos'] = talks_info
+    
+    if not downloadable_talks:
+        raise NoDownloadableTalksFound('No downloadable talks found')
+    
     logging.info(
         "Found %d downloadable talks in total",
         len(downloadable_talks)

File metaTED/crawler/get_talk_info.py

 from os.path import splitext
 from urlparse import urljoin, urlsplit
 from metaTED import SITE_URL
-from metaTED.cache import cached_storage
 from metaTED.crawler.get_talks_urls import TALKS_LIST_URL
 
 
     )
 
 
-def _get_talk_info(talk_url):
+def get_talk_info(talk_url):
     document = html.parse(talk_url)
     file_base_name = _clean_up_file_name(
         document.find('/head/title').text.split('|')[0].strip(),
         'theme': _guess_theme(talk_url, document),
         'qualities': qualities,
     }
-
-
-def get_talk_info(talk_url):
-    talks_info = cached_storage.get('talks_infos', {})
-    logging.debug("Searching cache for talk info on '%s'...", talk_url)
-    if talk_url in talks_info:
-        logging.debug("Found the cached version of '%s' talk info", talk_url)
-        return talks_info[talk_url]
-    
-    # Cache miss
-    logging.debug(
-        "Failed to find the cached version of '%s' talk info, calculating.",
-        talk_url
-    )
-    info = _get_talk_info(talk_url)
-    talks_info[talk_url] = info
-    cached_storage['talks_infos'] = talks_info
-    return info

File requirements.txt

+futures>=2.1
 Jinja2>=2.1
 lxml>=2.2
 shove>=0.2.2