1. edgewall
  2. Trac

Commits

Christian Boos  committed 10f02f6

0.10.4dev: ported r4965 to stable (''improve concurrent resyncs, #4685 and #4043'')

  • Participants
  • Parent commits ee70a83
  • Branches 0.10-stable

Comments (0)

Files changed (5)

File trac/scripts/admin.py

View file
 
     ## Resync
     def do_resync(self, line):
-        print 'Resyncing repository history...'
+        from trac.versioncontrol.cache import CACHE_METADATA_KEYS
+        print 'Resyncing repository history... '
+        print '(this will take a time proportional to the number of your ' \
+              'changesets)'
         cnx = self.db_open()
         cursor = cnx.cursor()
         cursor.execute("DELETE FROM revision")
         cursor.execute("DELETE FROM node_change")
-        cursor.execute("DELETE FROM system WHERE name='repository_dir'")
+        cursor.executemany("DELETE FROM system WHERE name=%s",
+                           [(k,) for k in CACHE_METADATA_KEYS])
+        cursor.executemany("INSERT INTO system (name, value) VALUES (%s, %s)",
+                           [(k, '') for k in CACHE_METADATA_KEYS])
         repos = self.__env.get_repository() # this will do the sync()
+        cursor.execute("SELECT count(rev) FROM revision")
+        for cnt, in cursor:
+            print cnt, 'revisions cached.',
         print 'Done.'
 
     ## Wiki

File trac/versioncontrol/api.py

View file
         """Close the connection to the repository."""
         raise NotImplementedError
 
-    def clear(self):
-        """Clear any data that may have been cached in instance properties."""
+    def clear(self, youngest_rev=None):
+        """Clear any data that may have been cached in instance properties.
+
+        `youngest_rev` can be specified as a way to force the value
+        of the `youngest_rev` property (''will change in 0.12'').
+        """
         pass
 
     def get_changeset(self, rev):
         The way revisions are sequenced is version control specific.
         By default, one assumes that the revisions are sequenced in time
         (... which is ''not'' correct for most VCS, including Subversion).
+
+        (Deprecated, will not be used anymore in Trac 0.12)
         """
         cursor = db.cursor()
         cursor.execute("SELECT rev FROM revision ORDER BY time DESC LIMIT 1")

File trac/versioncontrol/cache.py

View file
               'D': Changeset.DELETE, 'E': Changeset.EDIT,
               'M': Changeset.MOVE}
 
+CACHE_REPOSITORY_DIR = 'repository_dir'
+CACHE_YOUNGEST_REV = 'youngest_rev'
+
+CACHE_METADATA_KEYS = (CACHE_REPOSITORY_DIR, CACHE_YOUNGEST_REV)
+
 
 class CachedRepository(Repository):
 
         Repository.__init__(self, repos.name, authz, log)
         self.db = db
         self.repos = repos
-        try:
-            self.sync()
-        except TracError:
-            raise
-        except Exception, e: # most probably 2 concurrent resync attempts
-            log.warning('Error during sync(): %s' % e) 
+        self.sync()
 
     def close(self):
         self.repos.close()
                        "WHERE time >= %s AND time < %s "
                        "ORDER BY time", (start, stop))
         for rev, in cursor:
-            if self.authz.has_permission_for_changeset(rev):
-                yield self.get_changeset(rev)
+            try:
+                if self.authz.has_permission_for_changeset(rev):
+                    yield self.get_changeset(rev)
+            except NoSuchChangeset:
+                pass # skip changesets currently being resync'ed
 
     def sync(self):
         cursor = self.db.cursor()
 
-        # -- repository used for populating the cache
-        cursor.execute("SELECT value FROM system WHERE name='repository_dir'")
-        for previous_repository_dir, in cursor:
-            if previous_repository_dir != self.name:
+        cursor.execute("SELECT name, value FROM system WHERE name IN (%s)" %
+                       ','.join(["'%s'" % key for key in CACHE_METADATA_KEYS]))
+        metadata = {}
+        for name, value in cursor:
+            metadata[name] = value
+        
+        # -- check that we're populating the cache for the correct repository
+        repository_dir = metadata.get(CACHE_REPOSITORY_DIR)
+        if repository_dir:
+            if repository_dir != self.name:
                 self.log.info("'repository_dir' has changed from %r to %r"
                               % (previous_repository_dir, self.name))
                 raise TracError("The 'repository_dir' has changed, "
                                 "a 'trac-admin resync' operation is needed.")
-            break
-        else: # no 'repository_dir' stored yet, assume everything's OK
-            cursor.execute("INSERT INTO system (name,value) "
-                           "VALUES ('repository_dir',%s)", (self.name,))
+        elif repository_dir is None: # no 'repository_dir' stored yet
+            cursor.execute("INSERT INTO system (name,value) VALUES (%s,%s)",
+                           (CACHE_REPOSITORY_DIR, self.name,))
+        else: # 'repository_dir' cleared by a resync
+            cursor.execute("UPDATE system SET value=%s WHERE name=%s",
+                           (self.name,CACHE_REPOSITORY_DIR))
 
+        # -- retrieve the youngest revision cached so far
+        if CACHE_YOUNGEST_REV not in metadata:
+            # ''upgrade'' using the legacy `get_youngest_rev_in_cache` method
+            self.youngest = self.repos.get_youngest_rev_in_cache(self.db) or ''
+            cursor.execute("INSERT INTO system (name, value) VALUES (%s, %s)",
+                           (CACHE_YOUNGEST_REV, self.youngest))
+            self.log.info('Upgraded cache metadata (youngest_rev=%s)' %
+                          self.youngest_rev)
+        else:
+            self.youngest = metadata[CACHE_YOUNGEST_REV]
+
+        if self.youngest:
+            self.youngest = self.repos.normalize_rev(self.youngest)
+        else:
+            self.youngest = None
+
+        # -- retrieve the youngest revision in the repository
         self.repos.clear()
-        youngest_stored = self.repos.get_youngest_rev_in_cache(self.db)
+        repos_youngest = self.repos.youngest_rev
 
-        if youngest_stored != str(self.repos.youngest_rev):
+        # -- compare them and try to resync if different
+        self.log.info("Check for sync [%s] vs. cached [%s]" %
+                      (self. youngest, repos_youngest))
+        if self.youngest != repos_youngest:
+            if self.youngest:
+                next_youngest = self.repos.next_rev(self.youngest)
+            else:
+                next_youngest = None
+                try:
+                    next_youngest = self.repos.oldest_rev
+                    next_youngest = self.repos.normalize_rev(next_youngest)
+                except TracError:
+                    pass
+
+            if next_youngest is None: # nothing to cache yet
+                return
+
+            # 0. first check if there's no (obvious) resync in progress
+            cursor.execute("SELECT rev FROM revision WHERE rev=%s",
+                           (str(next_youngest),))
+            for rev, in cursor:
+                # already there, but in progress, so keep ''previous''
+                # notion of 'youngest'
+                self.repos.clear(youngest_rev=self.youngest)
+                return
+
+            # 1. prepare for resyncing
+            #    (there still might be a race condition at this point)
+
             authz = self.repos.authz
             self.repos.authz = Authorizer() # remove permission checking
 
             kindmap = dict(zip(_kindmap.values(), _kindmap.keys()))
             actionmap = dict(zip(_actionmap.values(), _actionmap.keys()))
-            self.log.info("Syncing with repository (%s to %s)"
-                          % (youngest_stored, self.repos.youngest_rev))
-            if youngest_stored:
-                current_rev = self.repos.next_rev(youngest_stored)
-            else:
-                try:
-                    current_rev = self.repos.oldest_rev
-                    current_rev = self.repos.normalize_rev(current_rev)
-                except TracError:
-                    current_rev = None
-            while current_rev is not None:
-                changeset = self.repos.get_changeset(current_rev)
-                cursor.execute("INSERT INTO revision (rev,time,author,message) "
-                               "VALUES (%s,%s,%s,%s)", (str(current_rev),
-                               changeset.date, changeset.author,
-                               changeset.message))
-                for path,kind,action,base_path,base_rev in changeset.get_changes():
-                    self.log.debug("Caching node change in [%s]: %s"
-                                   % (current_rev, (path, kind, action,
-                                      base_path, base_rev)))
-                    kind = kindmap[kind]
-                    action = actionmap[action]
-                    cursor.execute("INSERT INTO node_change (rev,path,"
-                                   "node_type,change_type,base_path,base_rev) "
-                                   "VALUES (%s,%s,%s,%s,%s,%s)",
-                                   (str(current_rev), path, kind, action,
-                                   base_path, base_rev))
-                current_rev = self.repos.next_rev(current_rev)
-            self.db.commit()
-            self.repos.authz = authz # restore permission checking
+
+            try:
+                while next_youngest is not None:
+                    
+                    # 1.1 Attempt to resync the 'revision' table
+                    self.log.info("Trying to sync revision [%s]" %
+                                  next_youngest)
+                    cset = self.repos.get_changeset(next_youngest)
+                    try:
+                        cursor.execute("INSERT INTO revision "
+                                       " (rev,time,author,message) "
+                                       "VALUES (%s,%s,%s,%s)",
+                                       (str(next_youngest), cset.date,
+                                        cset.author, cset.message))
+                    except Exception, e: # *another* 1.1. resync attempt won 
+                        self.log.warning('Revision %s already cached: %s' %
+                                         (next_youngest, e))
+                        # also potentially in progress, so keep ''previous''
+                        # notion of 'youngest'
+                        self.repos.clear(youngest_rev=self.youngest)
+                        return
+
+                    # 1.2. now *only* one process was able to get there
+                    #      (i.e. there *shouldn't* be any race condition here)
+
+                    for path,kind,action,bpath,brev in cset.get_changes():
+                        self.log.debug("Caching node change in [%s]: %s"
+                                       % (next_youngest,
+                                          (path,kind,action,bpath,brev)))
+                        kind = kindmap[kind]
+                        action = actionmap[action]
+                        cursor.execute("INSERT INTO node_change "
+                                       " (rev,path,node_type,change_type, "
+                                       "  base_path,base_rev) "
+                                       "VALUES (%s,%s,%s,%s,%s,%s)",
+                                       (str(next_youngest),
+                                        path, kind, action, bpath, brev))
+
+                    # 1.3. iterate (1.1 should always succeed now)
+                    self.youngest = next_youngest                    
+                    next_youngest = self.repos.next_rev(next_youngest)
+
+                    # 1.4. update 'youngest_rev' metadata (minimize failures at 0.)
+                    cursor.execute("UPDATE system SET value=%s WHERE name=%s",
+                                   (str(self.youngest), CACHE_YOUNGEST_REV))
+                    self.db.commit()
+            finally:
+                # 3. restore permission checking (after 1.)
+                self.repos.authz = authz
 
     def get_node(self, path, rev=None):
         return self.repos.get_node(path, rev)
         return self.repos.oldest_rev
 
     def get_youngest_rev(self):
-        return self.repos.get_youngest_rev_in_cache(self.db)
+        return self.youngest
 
     def previous_rev(self, rev):
         return self.repos.previous_rev(rev)

File trac/versioncontrol/svn_fs.py

View file
         assert self.scope[0] == '/'
         self.clear()
 
-    def clear(self):
+    def clear(self, youngest_rev=None):
         self.youngest = None
+        if youngest_rev is not None:
+            self.youngest = self.normalize_rev(youngest_rev)
         self.oldest = None
 
     def __del__(self):

File trac/versioncontrol/tests/cache.py

View file
     def setUp(self):
         self.db = InMemoryDatabase()
         self.log = logger_factory('test')
+        cursor = self.db.cursor()
+        cursor.execute("INSERT INTO system (name, value) VALUES (%s,%s)",
+                       ('youngest_rev', ''))
 
     def test_initial_sync_with_empty_repos(self):
         changeset = Mock(Changeset, 0, '', '', 42000,
                            "VALUES ('1',%s,%s,%s,%s,%s)",
                            [('trunk', 'D', 'A', None, None),
                             ('trunk/README', 'F', 'A', None, None)])
+        cursor.execute("UPDATE system SET value='1' WHERE name='youngest_rev'")
 
         changes = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README', 1)]
         changeset = Mock(Changeset, 2, 'Update', 'joe', 42042,
         repos = Mock(Repository, 'test-repos', None, self.log,
                      get_changeset=lambda x: changeset,
                      get_youngest_rev=lambda: 2,
-                     next_rev=lambda x: int(x) == 1 and 2 or None)
+                     get_oldest_rev=lambda: 0,
+                     normalize_rev=lambda x: x,                    
+                     next_rev=lambda x: x and int(x) == 1 and 2 or None)
         cache = CachedRepository(self.db, repos, None, self.log)
         cache.sync()
 
                            "VALUES ('1',%s,%s,%s,%s,%s)",
                            [('trunk', 'D', 'A', None, None),
                             ('trunk/README', 'F', 'A', None, None)])
+        cursor.execute("UPDATE system SET value='1' WHERE name='youngest_rev'")
 
         repos = Mock(Repository, 'test-repos', None, self.log,
                      get_changeset=lambda x: None,
                      get_youngest_rev=lambda: 1,
-                     next_rev=lambda x: None, normalize_rev=lambda rev: rev)
+                     get_oldest_rev=lambda: 0,
+                     next_rev=lambda x: None,
+                     normalize_rev=lambda rev: rev)
         cache = CachedRepository(self.db, repos, None, self.log)
         self.assertEqual('1', cache.youngest_rev)
         changeset = cache.get_changeset(1)