Commits

Tarek Ziadé committed f20df61

now using 10 threads to speed up the mirroring work

Comments (0)

Files changed (1)

pep381client/__init__.py

 from xml.etree import ElementTree
 import xml.parsers.expat
 import sqlite
+import threading
 from httplib import (IncompleteRead, BadStatusLine, CannotSendRequest,
                      ResponseNotReady)
 import socket
     return int(time.time())
 
 # Main class
+class Worker(threading.Thread):
+    def __init__(self, sync, queue):
+        self.sync = sync
+        self.queue = queue
+        threading.Thread.__init__(self)
+
+    def run(self):
+        while not queue.empty():
+            project = queue.get()
+            try:
+                self.sync._synchronize(project)
+            except Exception, e:
+                print "Failed on %s" % project
+                print str(e)
+
 
 class Synchronization:
     "Picklable status of a mirror"
         self.skip_file_contents = False
 
         self.filter_external_hrefs = True
+        self.lock = threading.RLock()
 
     def defaults(self):
         # Fill fields that may not exist in the pickle
                 setattr(self, field, value)
 
     def store(self):
-        with open(os.path.join(self.homedir, "status"), "wb") as f:
-            cPickle.dump(self, f, cPickle.HIGHEST_PROTOCOL)
-            self.storage.commit()
+        with self.lock:
+            with open(os.path.join(self.homedir, "status"), "wb") as f:
+                cPickle.dump(self, f, cPickle.HIGHEST_PROTOCOL)
+                self.storage.commit()
 
     @staticmethod
     def load(homedir, storage=None):
         if not self.quiet:
             print "Starting the mirror sync."
 
+        # preparing a queue
         # sort projects to allow for repeatable runs
+        queue = Queue.Queue()
         for project in sorted(self.projects_to_do):
             if not project:
-                # skip empty project names resulting from PyPI-wide changes
                 continue
-            if not self.quiet:
-                print "Synchronizing", project.encode('utf-8')
+            queue.put(project)
 
-            try:
-                data = self.copy_simple_page(project)
-            except ValueError:
-                if not self.quiet:
-                    print ("Could not copy simple page for %s -- will retry"
-                           % project)
-                continue
+        # launching some threads to do the job
+        if not self.quiet:
+            print "Creating 10 workers"
 
-            if not data:
-                self.delete_project(project)
-                self.store()
-                continue
+        workers = [Worker(self, queue) for i in range(10)]
+        for worker in workers:
+            worker.start()
 
-            try:
-                files = set(self.get_package_files(data))
-            except xml.parsers.expat.ExpatError, e:
-                # not well-formed, skip for now
-                if not self.quiet:
-                    print "Page for %s cannot be parsed: %r" % (project, e)
-                raise
+        # waiting for them to end the job
+        for worker in workers:
+            worker.join()
 
-            copy_failed = False
-            for file in files:
-                if not self.quiet:
-                    print "Copying", file
-                try:
-                    self.maybe_copy_file(project, file)
-                except IOError:
-                    # we need to try again
-                    copy_failed = True
-
-            if copy_failed:
-                continue
-
-            # files start with /; remove it
-            relfiles = set(p[1:] for p in files)
-            for file in self.storage.files(project)-relfiles:
-                    self.remove_file(file)
-            self.complete_projects.add(project)
-            self.projects_to_do.remove(project)
-            self.store()
         self.update_timestamp(self.last_started)
         self.last_completed = self.last_started
         self.last_started = 0
         self.store()
 
+    def _synchronize(project):
+
+        if not self.quiet:
+            print "Synchronizing", project.encode('utf-8')
+
+        try:
+            data = self.copy_simple_page(project)
+        except ValueError:
+            if not self.quiet:
+                print ("Could not copy simple page for %s -- will retry"
+                        % project)
+            return
+
+        if not data:
+            self.delete_project(project)
+            self.store()
+            return
+
+        try:
+            files = set(self.get_package_files(data))
+        except xml.parsers.expat.ExpatError, e:
+            # not well-formed, skip for now
+            if not self.quiet:
+                print "Page for %s cannot be parsed: %r" % (project, e)
+            return
+
+        copy_failed = False
+        for file in files:
+            if not self.quiet:
+                print "Copying", file
+            try:
+                self.maybe_copy_file(project, file)
+            except IOError:
+                # we need to try again
+                copy_failed = True
+
+        if copy_failed:
+            return
+
+        # files start with /; remove it
+        relfiles = set(p[1:] for p in files)
+        for file in self.storage.files(project)-relfiles:
+                self.remove_file(file)
+
+        self.complete_projects.add(project)
+        self.projects_to_do.remove(project)
+        self.store()
+
     def update_timestamp(self, when):
         with open(os.path.join(self.homedir, "web", "last-modified"), "wb") as f:
             f.write(time.strftime("%Y%m%dT%H:%M:%S\n", time.gmtime(when)))
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.