Marcin Kuzminski avatar Marcin Kuzminski committed 0c43c66

moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.

Comments (0)

Files changed (2)

rhodecode/lib/celerylib/__init__.py

 except KeyError:
     CELERY_ON = False
 
+
 class ResultWrapper(object):
     def __init__(self, task):
         self.task = task
     def result(self):
         return self.task
 
+
 def run_task(task, *args, **kwargs):
     if CELERY_ON:
         try:
             t = task.apply_async(args=args, kwargs=kwargs)
             log.info('running task %s:%s', t.task_id, task)
             return t
+
         except socket.error, e:
             if  e.errno == 111:
                 log.debug('Unable to connect to celeryd. Sync execution')
     return ResultWrapper(task(*args, **kwargs))
 
 
+def __get_lockkey(func, *fargs, **fkwargs):
+    params = list(fargs)
+    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
+
+    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
+
+    lockkey = 'task_%s' % \
+        md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
+    return lockkey
+
+
 def locked_task(func):
     def __wrapper(func, *fargs, **fkwargs):
-        params = list(fargs)
-        params.extend(['%s-%s' % ar for ar in fkwargs.items()])
-
-        lockkey = 'task_%s' % \
-            md5(str(func.__name__) + '-' + \
-                '-'.join(map(str, params))).hexdigest()
+        lockkey = __get_lockkey(func, *fargs, **fkwargs)
         log.info('running task with lockkey %s', lockkey)
         try:
             l = DaemonLock(lockkey)

rhodecode/lib/celerylib/tasks.py

 from pylons import config
 from pylons.i18n.translation import _
 
-from rhodecode.lib.celerylib import run_task, locked_task, str2bool
+from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
+    __get_lockkey, LockHeld, DaemonLock
 from rhodecode.lib.helpers import person
 from rhodecode.lib.smtp_mailer import SmtpMailer
 from rhodecode.lib.utils import OrderedDict, add_cache
 from rhodecode.model import init_model
 from rhodecode.model import meta
-from rhodecode.model.db import RhodeCodeUi
+from rhodecode.model.db import RhodeCodeUi, Statistics, Repository
 
 from vcs.backends import get_repo
 
 
 
 @task(ignore_result=True)
-@locked_task
 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
     try:
         log = get_commits_stats.get_logger()
     except:
         log = logging.getLogger(__name__)
 
-    from rhodecode.model.db import Statistics, Repository
+    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
+                            ts_max_y)
+    log.info('running task with lockkey %s', lockkey)
+    try:
+        lock = DaemonLock(lockkey)
 
-    #for js data compatibilty
-    akc = lambda k: person(k).replace('"', "")
+        #for js data compatibilty cleans the key for person from '
+        akc = lambda k: person(k).replace('"', "")
 
-    co_day_auth_aggr = {}
-    commits_by_day_aggregate = {}
-    repos_path = get_repos_path()
-    p = os.path.join(repos_path, repo_name)
-    repo = get_repo(p)
+        co_day_auth_aggr = {}
+        commits_by_day_aggregate = {}
+        repos_path = get_repos_path()
+        p = os.path.join(repos_path, repo_name)
+        repo = get_repo(p)
+        repo_size = len(repo.revisions)
+        #return if repo have no revisions
+        if repo_size < 1:
+            lock.release()
+            return True
 
-    skip_date_limit = True
-    parse_limit = int(config['app_conf'].get('commit_parse_limit'))
-    last_rev = 0
-    last_cs = None
-    timegetter = itemgetter('time')
+        skip_date_limit = True
+        parse_limit = int(config['app_conf'].get('commit_parse_limit'))
+        last_rev = 0
+        last_cs = None
+        timegetter = itemgetter('time')
 
-    sa = get_session()
+        sa = get_session()
 
-    dbrepo = sa.query(Repository)\
-        .filter(Repository.repo_name == repo_name).scalar()
-    cur_stats = sa.query(Statistics)\
-        .filter(Statistics.repository == dbrepo).scalar()
+        dbrepo = sa.query(Repository)\
+            .filter(Repository.repo_name == repo_name).scalar()
+        cur_stats = sa.query(Statistics)\
+            .filter(Statistics.repository == dbrepo).scalar()
 
-    if cur_stats is not None:
-        last_rev = cur_stats.stat_on_revision
+        if cur_stats is not None:
+            last_rev = cur_stats.stat_on_revision
 
-    #return if repo is empty
-    if not repo.revisions:
-        return True
+        if last_rev == repo.get_changeset().revision and repo_size > 1:
+            #pass silently without any work if we're not on first revision or
+            #current state of parsing revision(from db marker) is the
+            #last revision
+            lock.release()
+            return True
 
-    if last_rev == repo.get_changeset().revision and len(repo.revisions) > 1:
-        #pass silently without any work if we're not on first revision or
-        #current state of parsing revision(from db marker) is the last revision
-        return True
+        if cur_stats:
+            commits_by_day_aggregate = OrderedDict(json.loads(
+                                        cur_stats.commit_activity_combined))
+            co_day_auth_aggr = json.loads(cur_stats.commit_activity)
 
-    if cur_stats:
-        commits_by_day_aggregate = OrderedDict(
-                                       json.loads(
-                                        cur_stats.commit_activity_combined))
-        co_day_auth_aggr = json.loads(cur_stats.commit_activity)
+        log.debug('starting parsing %s', parse_limit)
+        lmktime = mktime
 
-    log.debug('starting parsing %s', parse_limit)
-    lmktime = mktime
+        last_rev = last_rev + 1 if last_rev > 0 else last_rev
 
-    last_rev = last_rev + 1 if last_rev > 0 else last_rev
+        for cs in repo[last_rev:last_rev + parse_limit]:
+            last_cs = cs  # remember last parsed changeset
+            k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
+                          cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
 
-    for cs in repo[last_rev:last_rev + parse_limit]:
-        last_cs = cs  # remember last parsed changeset
-        k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
-                      cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
+            if akc(cs.author) in co_day_auth_aggr:
+                try:
+                    l = [timegetter(x) for x in
+                         co_day_auth_aggr[akc(cs.author)]['data']]
+                    time_pos = l.index(k)
+                except ValueError:
+                    time_pos = False
 
-        if akc(cs.author) in co_day_auth_aggr:
-            try:
-                l = [timegetter(x) for x in
-                     co_day_auth_aggr[akc(cs.author)]['data']]
-                time_pos = l.index(k)
-            except ValueError:
-                time_pos = False
+                if time_pos >= 0 and time_pos is not False:
 
-            if time_pos >= 0 and time_pos is not False:
+                    datadict = \
+                        co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
 
-                datadict = co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
+                    datadict["commits"] += 1
+                    datadict["added"] += len(cs.added)
+                    datadict["changed"] += len(cs.changed)
+                    datadict["removed"] += len(cs.removed)
 
-                datadict["commits"] += 1
-                datadict["added"] += len(cs.added)
-                datadict["changed"] += len(cs.changed)
-                datadict["removed"] += len(cs.removed)
+                else:
+                    if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
+
+                        datadict = {"time": k,
+                                    "commits": 1,
+                                    "added": len(cs.added),
+                                    "changed": len(cs.changed),
+                                    "removed": len(cs.removed),
+                                   }
+                        co_day_auth_aggr[akc(cs.author)]['data']\
+                            .append(datadict)
 
             else:
                 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
+                    co_day_auth_aggr[akc(cs.author)] = {
+                                        "label": akc(cs.author),
+                                        "data": [{"time":k,
+                                                 "commits":1,
+                                                 "added":len(cs.added),
+                                                 "changed":len(cs.changed),
+                                                 "removed":len(cs.removed),
+                                                 }],
+                                        "schema": ["commits"],
+                                        }
 
-                    datadict = {"time": k,
-                                "commits": 1,
-                                "added": len(cs.added),
-                                "changed": len(cs.changed),
-                                "removed": len(cs.removed),
-                               }
-                    co_day_auth_aggr[akc(cs.author)]['data']\
-                        .append(datadict)
+            #gather all data by day
+            if k in commits_by_day_aggregate:
+                commits_by_day_aggregate[k] += 1
+            else:
+                commits_by_day_aggregate[k] = 1
 
-        else:
-            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
-                co_day_auth_aggr[akc(cs.author)] = {
-                                    "label": akc(cs.author),
-                                    "data": [{"time":k,
-                                             "commits":1,
-                                             "added":len(cs.added),
-                                             "changed":len(cs.changed),
-                                             "removed":len(cs.removed),
-                                             }],
-                                    "schema": ["commits"],
-                                    }
+        overview_data = sorted(commits_by_day_aggregate.items(),
+                               key=itemgetter(0))
 
-        #gather all data by day
-        if k in commits_by_day_aggregate:
-            commits_by_day_aggregate[k] += 1
-        else:
-            commits_by_day_aggregate[k] = 1
+        if not co_day_auth_aggr:
+            co_day_auth_aggr[akc(repo.contact)] = {
+                "label": akc(repo.contact),
+                "data": [0, 1],
+                "schema": ["commits"],
+            }
 
-    overview_data = sorted(commits_by_day_aggregate.items(), key=itemgetter(0))
-    if not co_day_auth_aggr:
-        co_day_auth_aggr[akc(repo.contact)] = {
-            "label": akc(repo.contact),
-            "data": [0, 1],
-            "schema": ["commits"],
-        }
+        stats = cur_stats if cur_stats else Statistics()
+        stats.commit_activity = json.dumps(co_day_auth_aggr)
+        stats.commit_activity_combined = json.dumps(overview_data)
 
-    stats = cur_stats if cur_stats else Statistics()
-    stats.commit_activity = json.dumps(co_day_auth_aggr)
-    stats.commit_activity_combined = json.dumps(overview_data)
+        log.debug('last revison %s', last_rev)
+        leftovers = len(repo.revisions[last_rev:])
+        log.debug('revisions to parse %s', leftovers)
 
-    log.debug('last revison %s', last_rev)
-    leftovers = len(repo.revisions[last_rev:])
-    log.debug('revisions to parse %s', leftovers)
+        if last_rev == 0 or leftovers < parse_limit:
+            log.debug('getting code trending stats')
+            stats.languages = json.dumps(__get_codes_stats(repo_name))
 
-    if last_rev == 0 or leftovers < parse_limit:
-        log.debug('getting code trending stats')
-        stats.languages = json.dumps(__get_codes_stats(repo_name))
+        try:
+            stats.repository = dbrepo
+            stats.stat_on_revision = last_cs.revision if last_cs else 0
+            sa.add(stats)
+            sa.commit()
+        except:
+            log.error(traceback.format_exc())
+            sa.rollback()
+            lock.release()
+            return False
 
-    try:
-        stats.repository = dbrepo
-        stats.stat_on_revision = last_cs.revision if last_cs else 0
-        sa.add(stats)
-        sa.commit()
-    except:
-        log.error(traceback.format_exc())
-        sa.rollback()
-        return False
-    if len(repo.revisions) > 1:
-        run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
+        #final release
+        lock.release()
 
-    return True
+        #execute another task if celery is enabled
+        if len(repo.revisions) > 1 and CELERY_ON:
+            run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
+        return True
+    except LockHeld:
+        log.info('LockHeld')
+        return 'Task with key %s already running' % lockkey
 
 
 @task(ignore_result=True)
     """
     Sends an email with defined parameters from the .ini files.
 
-
     :param recipients: list of recipients, it this is empty the defined email
         address from field 'email_to' is used instead
     :param subject: subject of the mail
 
 @task(ignore_result=True)
 def create_repo_fork(form_data, cur_user):
+    from rhodecode.model.repo import RepoModel
+    from vcs import get_backend
+
     try:
         log = create_repo_fork.get_logger()
     except:
         log = logging.getLogger(__name__)
 
-    from rhodecode.model.repo import RepoModel
-    from vcs import get_backend
-
     repo_model = RepoModel(get_session())
     repo_model.create(form_data, cur_user, just_db=True, fork=True)
     repo_name = form_data['repo_name']
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.