Source

hgsubversion / hgsubversion / svnexternals.py

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
import cStringIO

import os, re, shutil, stat, subprocess
from mercurial import util as hgutil
from mercurial.i18n import _

try:
    from mercurial import subrepo
    # require svnsubrepo and hg >= 1.7.1
    subrepo.svnsubrepo
    hgutil.checknlink
except (ImportError, AttributeError), e:
    subrepo = None

import util

class externalsfile(dict):
    """Map svn directories to lists of externals entries.
    """
    def __init__(self):
        super(externalsfile, self).__init__()
        self.encoding = 'utf-8'

    def __setitem__(self, key, value):
        if value is None:
            value = []
        elif isinstance(value, basestring):
            value = value.splitlines()
        if key == '.':
            key = ''
        if not value:
            if key in self:
                del self[key]
        else:
            super(externalsfile, self).__setitem__(key, value)

    def write(self):
        fp = cStringIO.StringIO()
        for target in sorted(self):
            lines = self[target]
            if not lines:
                continue
            if not target:
                target = '.'
            fp.write('[%s]\n' % target)
            for l in lines:
                l = ' ' + l + '\n'
                fp.write(l)
        return fp.getvalue()

    def read(self, data):
        self.clear()
        fp = cStringIO.StringIO(data)
        dirs = {}
        target = None
        for line in fp.readlines():
            if not line.strip():
                continue
            if line.startswith('['):
                line = line.strip()
                if line[-1] != ']':
                    raise hgutil.Abort('invalid externals section name: %s' % line)
                target = line[1:-1]
                if target == '.':
                    target = ''
            elif line.startswith(' '):
                line = line.rstrip('\n')
                if target is None or not line:
                    continue
                self.setdefault(target, []).append(line[1:])

def diff(ext1, ext2):
    """Compare 2 externalsfile and return a list of tuples like (dir,
    value1, value2) where value1 is the external value in ext1 for dir
    or None, and value2 the same in ext2.
    """
    changes = []
    for d in ext1:
        if d not in ext2:
            changes.append((d, '\n'.join(ext1[d]), None))
        elif ext1[d] != ext2[d]:
            changes.append((d, '\n'.join(ext1[d]), '\n'.join(ext2[d])))
    for d in ext2:
        if d not in ext1:
            changes.append((d, None, '\n'.join(ext2[d])))
    return changes

class BadDefinition(Exception):
    pass

re_defold = re.compile(r'^\s*(.*?)\s+(?:-r\s*(\d+|\{REV\})\s+)?([a-zA-Z]+://.*)\s*$')
re_defnew = re.compile(r'^\s*(?:-r\s*(\d+|\{REV\})\s+)?((?:[a-zA-Z]+://|\^/).*)\s+(\S+)\s*$')
re_scheme = re.compile(r'^[a-zA-Z]+://')

def parsedefinition(line):
    """Parse an external definition line, return a tuple (path, rev, source)
    or raise BadDefinition.
    """
    # The parsing is probably not correct wrt path with whitespaces or
    # potential quotes. svn documentation is not really talkative about
    # these either.
    pegrev, revgroup = None, 1
    m = re_defnew.search(line)
    if m:
        rev, source, path = m.group(1, 2, 3)
        if '@' in source:
            source, pegrev = source.rsplit('@', 1)
    else:
        m = re_defold.search(line)
        if not m:
            raise BadDefinition()
        revgroup = 2
        path, rev, source = m.group(1, 2, 3)
    try:
        nrev = int(rev)
        norevline = line[:m.start(revgroup)] + '{REV}' + line[m.end(revgroup):]
    except (TypeError, ValueError):
        norevline = line
    return (path, rev, source, pegrev, norevline)

class RelativeSourceError(Exception):
    pass

def resolvesource(ui, svnroot, source):
    if re_scheme.search(source):
        return source
    if source.startswith('^/'):
        if svnroot is None:
            raise RelativeSourceError()
        return svnroot + source[1:]
    ui.warn(_('ignoring unsupported non-fully qualified external: %r' % source))
    return None

def parsedefinitions(ui, repo, svnroot, exts):
    """Return (targetdir, revision, source) tuples. Fail if nested
    targetdirs are detected. source is an svn project URL.
    """
    defs = []
    for base in sorted(exts):
        for line in exts[base]:
            try:
                path, rev, source, pegrev, norevline = parsedefinition(line)
            except BadDefinition:
                ui.warn(_('ignoring invalid external definition: %r' % line))
                continue
            source = resolvesource(ui, svnroot, source)
            if source is None:
                continue
            wpath = hgutil.pconvert(os.path.join(base, path))
            wpath = hgutil.canonpath(repo.root, '', wpath)
            defs.append((wpath, rev, source, pegrev, norevline, base))
    # Check target dirs are not nested
    defs.sort()
    for i, d in enumerate(defs):
        for d2 in defs[i+1:]:
            if d2[0].startswith(d[0] + '/'):
                raise hgutil.Abort(_('external directories cannot nest:\n%s\n%s')
                                   % (d[0], d2[0]))
    return defs

def computeactions(ui, repo, svnroot, ext1, ext2):

    def listdefs(data):
        defs = {}
        exts = externalsfile()
        exts.read(data)
        for d in parsedefinitions(ui, repo, svnroot, exts):
            defs[d[0]] = d
        return defs

    ext1 = listdefs(ext1)
    ext2 = listdefs(ext2)
    for wp1 in ext1:
        if wp1 in ext2:
            yield 'u', ext2[wp1]
        else:
            yield 'd', ext1[wp1]
    for wp2 in ext2:
        if wp2 not in ext1:
            yield 'u', ext2[wp2]

def getsvninfo(svnurl):
    """Return a tuple (url, root) for supplied svn URL or working
    directory path.
    """
    # Yes, this is ugly, but good enough for now
    args = ['svn', 'info', '--xml', svnurl]
    shell = os.name == 'nt'
    p = subprocess.Popen(args, shell=shell,
                         stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = p.communicate()[0]
    if p.returncode:
        raise hgutil.Abort(_('cannot get information about %s')
                         % svnurl)
    m = re.search(r'<root>(.*)</root>', stdout, re.S)
    if not m:
        raise hgutil.Abort(_('cannot find SVN repository root from %s')
                           % svnurl)
    root = m.group(1).rstrip('/')

    m = re.search(r'<url>(.*)</url>', stdout, re.S)
    if not m:
        raise hgutil.Abort(_('cannot find SVN repository URL from %s') % svnurl)
    url = m.group(1)

    m = re.search(r'<entry[^>]+revision="([^"]+)"', stdout, re.S)
    if not m:
        raise hgutil.Abort(_('cannot find SVN revision from %s') % svnurl)
    rev = m.group(1)
    return url, root, rev

class externalsupdater:
    def __init__(self, ui, repo):
        self.repo = repo
        self.ui = ui

    def update(self, wpath, rev, source, pegrev):
        path = self.repo.wjoin(wpath)
        revspec = []
        if rev:
            revspec = ['-r', rev]
        if os.path.isdir(path):
            exturl, extroot, extrev = getsvninfo(path)
            # Comparing the source paths is not enough, but I don't
            # know how to compare path+pegrev. The following update
            # might fail if the path was replaced by another unrelated
            # one. It can be fixed manually by deleting the externals
            # and updating again.
            if source == exturl:
                if extrev != rev:
                    self.ui.status(_('updating external on %s@%s\n') %
                                   (wpath, rev or 'HEAD'))
                    cwd = os.path.join(self.repo.root, path)
                    self.svn(['update'] + revspec, cwd)
                return
            self.delete(wpath)
        cwd, dest = os.path.split(path)
        cwd = os.path.join(self.repo.root, cwd)
        if not os.path.isdir(cwd):
            os.makedirs(cwd)
        if not pegrev and rev:
            pegrev = rev
        if pegrev:
            source = '%s@%s' % (source, pegrev)
        self.ui.status(_('fetching external %s@%s\n') % (wpath, rev or 'HEAD'))
        self.svn(['co'] + revspec + [source, dest], cwd)

    def delete(self, wpath):
        path = self.repo.wjoin(wpath)
        if os.path.isdir(path):
            self.ui.status(_('removing external %s\n') % wpath)

            def onerror(function, path, excinfo):
                if function is not os.remove:
                    raise
                # read-only files cannot be unlinked under Windows
                s = os.stat(path)
                if (s.st_mode & stat.S_IWRITE) != 0:
                    raise
                os.chmod(path, stat.S_IMODE(s.st_mode) | stat.S_IWRITE)
                os.remove(path)

            shutil.rmtree(path, onerror=onerror)
            return 1

    def svn(self, args, cwd):
        args = ['svn'] + args
        self.ui.debug(_('updating externals: %r, cwd=%s\n') % (args, cwd))
        shell = os.name == 'nt'
        p = subprocess.Popen(args, cwd=cwd, shell=shell,
                             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        for line in p.stdout:
            self.ui.note(line)
        p.wait()
        if p.returncode != 0:
            raise hgutil.Abort("subprocess '%s' failed" % ' '.join(args))

def updateexternals(ui, args, repo, **opts):
    """update repository externals
    """
    if len(args) > 2:
        raise hgutil.Abort(_('updateexternals expects at most one changeset'))
    node = None
    if len(args) == 2:
        svnurl = util.normalize_url(repo.ui.expandpath(args[0]))
        args = args[1:]
    else:
        svnurl = util.normalize_url(repo.ui.expandpath('default'))
    if args:
        node = args[0]

    svnroot = getsvninfo(svnurl)[1]

    # Retrieve current externals status
    try:
        oldext = file(repo.join('svn/externals'), 'rb').read()
    except IOError:
        oldext = ''
    newext = ''
    ctx = repo[node]
    if '.hgsvnexternals' in ctx:
        newext = ctx['.hgsvnexternals'].data()

    updater = externalsupdater(ui, repo)
    actions = computeactions(ui, repo, svnroot, oldext, newext)
    for action, ext in actions:
        if action == 'u':
            updater.update(ext[0], ext[1], ext[2], ext[3])
        elif action == 'd':
            updater.delete(ext[0])
        else:
            raise hgutil.Abort(_('unknown update actions: %r') % action)

    file(repo.join('svn/externals'), 'wb').write(newext)

def getchanges(ui, repo, parentctx, exts):
    """Take a parent changectx and the new externals definitions as an
    externalsfile and return a dictionary mapping the special file
    hgsubversion needs for externals bookkeeping, to their new content
    as raw bytes or None if the file has to be removed.
    """
    mode = ui.config('hgsubversion', 'externals', 'svnexternals')
    if mode == 'svnexternals':
        files = {
            '.hgsvnexternals': None,
            }
        if exts:
            files['.hgsvnexternals'] = exts.write()
    elif mode == 'subrepos':
        # XXX: clobering the subrepos files is good enough for now
        files = {
            '.hgsub': None,
            '.hgsubstate': None,
            }
        if exts:
            defs = parsedefinitions(ui, repo, '', exts)
            hgsub, hgsubstate = [], []
            for path, rev, source, pegrev, norevline, base in sorted(defs):
                hgsub.append('%s = [hgsubversion] %s:%s\n'
                             % (path, base, norevline))
                if rev is None:
                    rev = 'HEAD'
                hgsubstate.append('%s %s\n' % (rev, path))
            files['.hgsub'] = ''.join(hgsub)
            files['.hgsubstate'] = ''.join(hgsubstate)
    elif mode == 'ignore':
        files = {}
    else:
        raise hgutil.Abort(_('unknown externals modes: %s') % mode)

    # Should the really be updated?
    updates = {}
    for fn, data in files.iteritems():
        if data is not None:
            if fn not in parentctx or parentctx[fn].data() != data:
                updates[fn] = data
        else:
            if fn in parentctx:
                updates[fn] = None
    return updates

def parse(ui, ctx):
    """Return the externals definitions stored in ctx as a (possibly empty)
    externalsfile().
    """
    external = externalsfile()
    mode = ui.config('hgsubversion', 'externals', 'svnexternals')
    if mode == 'svnexternals':
        if '.hgsvnexternals' in ctx:
            external.read(ctx['.hgsvnexternals'].data())
    elif mode == 'subrepos':
        for path in ctx.substate:
            src, rev = ctx.substate[path][:2]
            base, norevline = src.split(':', 1)
            base = base.strip()
            if rev is None:
                rev = 'HEAD'
            line = norevline.replace('{REV}', rev)
            external.setdefault(base, []).append(line)
    elif mode == 'ignore':
        pass
    else:
        raise hgutil.Abort(_('unknown externals modes: %s') % mode)
    return external

if subrepo:
    class svnsubrepo(subrepo.svnsubrepo):
        def __init__(self, ctx, path, state):
            state = (state[0].split(':', 1)[1], state[1])
            super(svnsubrepo, self).__init__(ctx, path, state)

        def get(self, state):
            # Resolve source first
            line = state[0].split(':', 1)[1]
            source, pegrev = parsedefinition(line)[2:4]
            try:
                # Getting the root SVN repository URL is expensive.
                # Assume the externals is absolute.
                source = resolvesource(self._ui, None, source)
            except RelativeSourceError:
                svnurl = self._ctx._repo.ui.expandpath('default')
                svnroot = getsvninfo(util.normalize_url(svnurl))[1]
                source = resolvesource(self._ui, svnroot, source)
            if pegrev is not None:
                source = source + '@' + pegrev
            return super(svnsubrepo, self).get((source, state[1]))

        def dirty(self):
            # You cannot compare anything with HEAD. Just accept it
            # can be anything.
            wcrev = self._wcrev()
            if (wcrev == 'HEAD' or self._state[1] == 'HEAD' or
                wcrev == self._state[1]) and not self._wcchanged()[0]:
                return False
            return True

        def commit(self, text, user, date):
            rev = super(svnsubrepo, self).commit(text, user, date)
            # Keep unversioned externals unversioned
            if self._state[1] == 'HEAD':
                rev = 'HEAD'
            return rev