Source

hgsubversion / tests / test_util.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
import StringIO
import difflib
import errno
import gettext
import os
import shutil
import stat
import subprocess
import sys
import tarfile
import tempfile
import unittest
import urllib

_rootdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, _rootdir)

from mercurial import cmdutil
from mercurial import commands
from mercurial import context
from mercurial import dispatch as dispatchmod
from mercurial import hg
from mercurial import i18n
from mercurial import node
from mercurial import scmutil
from mercurial import ui
from mercurial import util
from mercurial import extensions

try:
    from mercurial import obsolete
except ImportError:
    obsolete = None

try:
    SkipTest = unittest.SkipTest
except AttributeError:
    try:
        from unittest2 import SkipTest
    except ImportError:
        try:
            from nose import SkipTest
        except ImportError:
            SkipTest = None

from hgsubversion import util
from hgsubversion import svnwrap

# Documentation for Subprocess.Popen() says:
#   "Note that on Windows, you cannot set close_fds to true and
#   also redirect the standard handles by setting stdin, stdout or
#   stderr."
canCloseFds = 'win32' not in sys.platform

if not 'win32' in sys.platform:
    def kill_process(popen_obj):
        os.kill(popen_obj.pid, 9)
else:
    import ctypes
    from ctypes.wintypes import BOOL, DWORD, HANDLE, UINT

    def win_status_check(result, func, args):
        if result == 0:
            raise ctypes.WinError()
        return args

    def WINAPI(returns, func, *params):
        assert len(params) % 2 == 0

        func.argtypes = tuple(params[0::2])
        func.resvalue = returns
        func.errcheck = win_status_check

        return func

    # dwDesiredAccess
    PROCESS_TERMINATE = 0x0001

    OpenProcess = WINAPI(HANDLE, ctypes.windll.kernel32.OpenProcess,
        DWORD, 'dwDesiredAccess',
        BOOL, 'bInheritHandle',
        DWORD, 'dwProcessId',
    )

    CloseHandle = WINAPI(BOOL, ctypes.windll.kernel32.CloseHandle,
        HANDLE, 'hObject'
    )

    TerminateProcess = WINAPI(BOOL, ctypes.windll.kernel32.TerminateProcess,
        HANDLE, 'hProcess',
        UINT, 'uExitCode'
    )

    def kill_process(popen_obj):
        phnd = OpenProcess(PROCESS_TERMINATE, False, popen_obj.pid)
        TerminateProcess(phnd, 1)
        CloseHandle(phnd)

# Fixtures that need to be pulled at a subdirectory of the repo path
subdir = {'truncatedhistory.svndump': '/project2',
          'fetch_missing_files_subdir.svndump': '/foo',
          'empty_dir_in_trunk_not_repo_root.svndump': '/project',
          'project_root_not_repo_root.svndump': '/dummyproj',
          'project_name_with_space.svndump': '/project name',
          'non_ascii_path_1.svndump': '/b\xC3\xB8b',
          'non_ascii_path_2.svndump': '/b%C3%B8b',
          'subdir_is_file_prefix.svndump': '/flaf',
          }

FIXTURES = os.path.join(os.path.abspath(os.path.dirname(__file__)),
                        'fixtures')

def getlocalpeer(repo):
    localrepo = getattr(repo, 'local', lambda: repo)()
    if isinstance(localrepo, bool):
        localrepo = repo
    return localrepo

def repolen(repo):
    """Naively calculate the amount of available revisions in a repository.

    this is usually equal to len(repo) -- except in the face of
    obsolete revisions.
    """
    # kind of nasty way of calculating the length, but fortunately,
    # our test repositories tend to be rather small
    return len([r for r in repo])

def _makeskip(name, message):
    if SkipTest:
        def skip(*args, **kwargs):
            raise SkipTest(message)
        skip.__name__ = name
        return skip

def requiresmodule(mod):
    """Skip a test if the specified module is not None."""
    def decorator(fn):
        if fn is None:
            return
        if mod is not None:
            return fn
        return _makeskip(fn.__name__, 'missing required feature')
    return decorator


def requiresoption(option):
    '''Skip a test if commands.clone does not take the specified option.'''
    def decorator(fn):
        for entry in cmdutil.findcmd('clone', commands.table)[1][1]:
            if entry[1] == option:
                return fn
        # no match found, so skip
        if SkipTest:
            return _makeskip(fn.__name__,
                             'test requires clone to accept %s' % option)
        # no skipping support, so erase decorated method
        return
    if not isinstance(option, str):
        raise TypeError('requiresoption takes a string argument')
    return decorator

def requiresreplay(method):
    '''Skip a test in stupid mode.'''
    def test(self, *args, **kwargs):
        if self.stupid:
            if SkipTest:
                raise SkipTest(message)
        else:
            return method(self, *args, **kwargs)

    test.__name__ = method.__name__
    return test

def filtermanifest(manifest):
    return [f for f in manifest if f not in util.ignoredfiles]

def fileurl(path):
    path = os.path.abspath(path).replace(os.sep, '/')
    drive, path = os.path.splitdrive(path)
    if drive:
        # In svn 1.7, the swig svn wrapper returns local svn URLs
        # with an uppercase drive letter, try to match that to
        # simplify svn info tests.
        drive = '/' + drive.upper()
    url = 'file://%s%s' % (drive, path)
    return url

def testui(stupid=False, layout='auto', startrev=0):
    u = ui.ui()
    bools = {True: 'true', False: 'false'}
    u.setconfig('ui', 'quiet', bools[True])
    u.setconfig('extensions', 'hgsubversion', '')
    u.setconfig('hgsubversion', 'stupid', bools[stupid])
    u.setconfig('hgsubversion', 'layout', layout)
    u.setconfig('hgsubversion', 'startrev', startrev)
    return u

def dispatch(cmd):
    cmd = getattr(dispatchmod, 'request', lambda x: x)(cmd)
    return dispatchmod.dispatch(cmd)

def rmtree(path):
    # Read-only files cannot be removed under Windows
    for root, dirs, files in os.walk(path):
        for f in files:
            f = os.path.join(root, f)
            try:
                s = os.stat(f)
            except OSError, e:
                if e.errno == errno.ENOENT:
                    continue
                raise
            if (s.st_mode & stat.S_IWRITE) == 0:
                os.chmod(f, s.st_mode | stat.S_IWRITE)
    shutil.rmtree(path)

def _verify_our_modules():
    '''
    Verify that hgsubversion was imported from the correct location.

    The correct location is any location within the parent directory of the
    directory containing this file.
    '''

    for modname, module in sys.modules.iteritems():
        if not module or not modname.startswith('hgsubversion.'):
            continue

        modloc = module.__file__
        cp = os.path.commonprefix((os.path.abspath(__file__), modloc))
        assert cp.rstrip(os.sep) == _rootdir, (
            'Module location verification failed: hgsubversion was imported '
            'from the wrong path!'
        )

def hgclone(ui, source, dest, update=True, rev=None):
    if getattr(hg, 'peer', None):
        # Since 1.9 (d976542986d2)
        src, dest = hg.clone(ui, {}, source, dest, update=update, rev=rev)
    else:
        src, dest = hg.clone(ui, source, dest, update=update, rev=rev)
    return src, dest

def svnls(repo_path, path, rev='HEAD'):
    path = repo_path + '/' + path
    path = util.normalize_url(fileurl(path))
    args = ['svn', 'ls', '-r', rev, '-R', path]
    p = subprocess.Popen(args,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    stdout, stderr = p.communicate()
    if p.returncode:
        raise Exception('svn ls failed on %s: %r' % (path, stderr))
    entries = [e.strip('/') for e in stdout.splitlines()]
    entries.sort()
    return entries

def svnpropget(repo_path, path, prop, rev='HEAD'):
    path = repo_path + '/' + path
    path = util.normalize_url(fileurl(path))
    args = ['svn', 'propget', '-r', str(rev), prop, path]
    p = subprocess.Popen(args,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    stdout, stderr = p.communicate()
    if p.returncode:
        raise Exception('svn ls failed on %s: %r' % (path, stderr))
    return stdout.strip()


def _obsolete_wrap(cls, name):
    origfunc = getattr(cls, name)

    if not name.startswith('test_') or not origfunc:
        return

    if not obsolete:
        wrapper = _makeskip(name, 'obsolete not available')
    else:
        def wrapper(self, *args, **opts):
            self.assertFalse(obsolete._enabled, 'obsolete was already active')

            obsolete._enabled = True

            try:
                    origfunc(self, *args, **opts)
                    self.assertTrue(obsolete._enabled, 'obsolete remains active')
            finally:
                obsolete._enabled = False

    if not wrapper:
        return

    wrapper.__name__ = name + ' obsolete'
    wrapper.__module__ = origfunc.__module__

    if origfunc.__doc__:
        firstline = origfunc.__doc__.strip().splitlines()[0]
        wrapper.__doc__ = firstline + ' (obsolete)'

    assert getattr(cls, wrapper.__name__, None) is None

    setattr(cls, wrapper.__name__, wrapper)


def _stupid_wrap(cls, name):
    origfunc = getattr(cls, name)

    if not name.startswith('test_') or not origfunc:
        return

    def wrapper(self, *args, **opts):
        self.assertFalse(self.stupid, 'stupid mode was already active')

        self.stupid = True

        try:
            origfunc(self, *args, **opts)
        finally:
            self.stupid = False

    wrapper.__name__ = name + ' stupid'
    wrapper.__module__ = origfunc.__module__

    if origfunc.__doc__:
        firstline = origfunc.__doc__.strip().splitlines()[0]
        wrapper.__doc__ = firstline + ' (stupid)'

    assert getattr(cls, wrapper.__name__, None) is None

    setattr(cls, wrapper.__name__, wrapper)

class TestMeta(type):
    def __init__(cls, *args, **opts):
        if cls.obsolete_mode_tests:
            for origname in dir(cls):
                _obsolete_wrap(cls, origname)

        if cls.stupid_mode_tests:
            for origname in dir(cls):
                _stupid_wrap(cls, origname)

        return super(TestMeta, cls).__init__(*args, **opts)

class TestBase(unittest.TestCase):
    __metaclass__ = TestMeta

    obsolete_mode_tests = False
    stupid_mode_tests = False

    stupid = False

    def setUp(self):
        _verify_our_modules()

        # the Python 2.7 default of 640 is obnoxiously low
        self.maxDiff = 4096

        self.oldenv = dict([(k, os.environ.get(k, None),) for k in
                           ('LANG', 'LC_ALL', 'HGRCPATH',)])
        self.oldt = i18n.t
        os.environ['LANG'] = os.environ['LC_ALL'] = 'C'
        i18n.t = gettext.translation('hg', i18n.localedir, fallback=True)

        self.oldwd = os.getcwd()
        self.tmpdir = tempfile.mkdtemp(
            'svnwrap_test', dir=os.environ.get('HGSUBVERSION_TEST_TEMP', None))
        self.hgrc = os.path.join(self.tmpdir, '.hgrc')
        os.environ['HGRCPATH'] = self.hgrc
        scmutil._rcpath = None
        rc = open(self.hgrc, 'w')
        rc.write('[ui]\nusername=test-user\n')
        for l in '[extensions]', 'hgsubversion=':
            print >> rc, l

        self.repocount = 0
        self.wc_path = '%s/testrepo_wc' % self.tmpdir
        self.svn_wc = None

        self.config_dir = self.tmpdir
        svnwrap.common._svn_config_dir = self.config_dir
        self.setup_svn_config('')

        # Previously, we had a MockUI class that wrapped ui, and giving access
        # to the stream. The ui.pushbuffer() and ui.popbuffer() can be used
        # instead. Using the regular UI class, with all stderr redirected to
        # stdout ensures that the test setup is much more similar to usage
        # setups.
        self.patch = (ui.ui.write_err, ui.ui.write)
        setattr(ui.ui, self.patch[0].func_name, self.patch[1])

    def setup_svn_config(self, config):
        c = open(self.config_dir + '/config', 'w')
        try:
            c.write(config)
        finally:
            c.close()

    def _makerepopath(self):
        self.repocount += 1
        return '%s/testrepo-%d' % (self.tmpdir, self.repocount)

    def tearDown(self):
        for var, val in self.oldenv.iteritems():
            if val is None:
                del os.environ[var]
            else:
                os.environ[var] = val
        i18n.t = self.oldt
        rmtree(self.tmpdir)
        os.chdir(self.oldwd)
        setattr(ui.ui, self.patch[0].func_name, self.patch[0])

        _verify_our_modules()

    def ui(self, layout='auto'):
        return testui(self.stupid, layout)

    def load_svndump(self, fixture_name):
        '''Loads an svnadmin dump into a fresh repo. Return the svn repo
        path.
        '''
        path = self._makerepopath()
        assert not os.path.exists(path)
        subprocess.call(['svnadmin', 'create', path,],
                        stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        inp = open(os.path.join(FIXTURES, fixture_name))
        proc = subprocess.Popen(['svnadmin', 'load', path,], stdin=inp,
                                stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        proc.communicate()
        return path

    def load_repo_tarball(self, fixture_name):
        '''Extracts a tarball of an svn repo and returns the svn repo path.'''
        path = self._makerepopath()
        assert not os.path.exists(path)
        os.mkdir(path)
        tarball = tarfile.open(os.path.join(FIXTURES, fixture_name))
        # This is probably somewhat fragile, but I'm not sure how to
        # do better in particular, I think it assumes that the tar
        # entries are in the right order and that directories appear
        # before their contents.  This is a valid assummption for sane
        # tarballs, from what I can tell.  In particular, for a simple
        # tarball of a svn repo with paths relative to the repo root,
        # it seems to work
        for entry in tarball:
            tarball.extract(entry, path)
        return path

    def fetch(self, repo_path, subdir=None, layout='auto',
            startrev=0, externals=None, noupdate=True, dest=None, rev=None,
            config=None):
        if layout == 'single':
            if subdir is None:
                subdir = 'trunk'
        elif subdir is None:
            subdir = ''
        projectpath = repo_path
        if subdir:
            projectpath += '/' + subdir

        cmd = [
            'clone',
            '--layout=%s' % layout,
            '--startrev=%s' % startrev,
            fileurl(projectpath),
            self.wc_path,
            ]
        if self.stupid:
            cmd.append('--stupid')
        if noupdate:
            cmd.append('--noupdate')
        if rev is not None:
            cmd.append('--rev=%s' % rev)
        config = dict(config or {})
        if externals:
            config['hgsubversion.externals'] = str(externals)
        for k,v in reversed(sorted(config.iteritems())):
            cmd[:0] = ['--config', '%s=%s' % (k, v)]

        r = dispatch(cmd)
        assert not r, 'fetch of %s failed' % projectpath

        return hg.repository(testui(), self.wc_path)

    def load_and_fetch(self, fixture_name, *args, **opts):
        if fixture_name.endswith('.svndump'):
            repo_path = self.load_svndump(fixture_name)
        elif fixture_name.endswith('tar.gz'):
            repo_path = self.load_repo_tarball(fixture_name)
        else:
            assert False, 'Unknown fixture type'

        return self.fetch(repo_path, *args, **opts), repo_path

    def _load_fixture_and_fetch(self, *args, **kwargs):
        repo, repo_path = self.load_and_fetch(*args, **kwargs)
        return repo

    def add_svn_rev(self, repo_path, changes):
        '''changes is a dict of filename -> contents'''
        if self.svn_wc is None:
            self.svn_wc = os.path.join(self.tmpdir, 'testsvn_wc')
            subprocess.call([
                'svn', 'co', '-q', fileurl(repo_path),
                self.svn_wc
            ],
            stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

        for filename, contents in changes.iteritems():
            # filenames are / separated
            filename = filename.replace('/', os.path.sep)
            filename = os.path.join(self.svn_wc, filename)
            open(filename, 'w').write(contents)
            # may be redundant
            subprocess.call(['svn', 'add', '-q', filename],
                            stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        subprocess.call([
            'svn', 'commit', '-q', self.svn_wc, '-m', 'test changes'],
            stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    # define this as a property so that it reloads anytime we need it
    @property
    def repo(self):
        return hg.repository(testui(), self.wc_path)

    def pushrevisions(self, expected_extra_back=0):
        before = repolen(self.repo)
        self.repo.ui.setconfig('hgsubversion', 'stupid', str(self.stupid))
        res = commands.push(self.repo.ui, self.repo)
        after = repolen(self.repo)
        self.assertEqual(expected_extra_back, after - before)
        return res

    def svnco(self, repo_path, svnpath, rev, path):
        path = os.path.join(self.wc_path, path)
        subpath = os.path.dirname(path)
        if not os.path.isdir(subpath):
            os.makedirs(subpath)
        svnpath = fileurl(repo_path + '/' + svnpath)
        args = ['svn', 'co', '-r', rev, svnpath, path]
        p = subprocess.Popen(args,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        stdout, stderr = p.communicate()
        if p.returncode:
            raise Exception('svn co failed on %s: %r' % (svnpath, stderr))

    def commitchanges(self, changes, parent='tip', message='automated test'):
        """Commit changes to mercurial directory

        'changes' is a sequence of tuples (source, dest, data). It can look
        like:
        - (source, source, data) to set source content to data
        - (source, dest, None) to set dest content to source one, and mark it as
        copied from source.
        - (source, dest, data) to set dest content to data, and mark it as copied
        from source.
        - (source, None, None) to remove source.
        """
        repo = self.repo
        parentctx = repo[parent]

        changed, removed = [], []
        for source, dest, newdata in changes:
            if dest is None:
                removed.append(source)
            else:
                changed.append(dest)

        def filectxfn(repo, memctx, path):
            if path in removed:
                raise IOError(errno.ENOENT,
                              "File \"%s\" no longer exists" % path)
            entry = [e for e in changes if path == e[1]][0]
            source, dest, newdata = entry
            if newdata is None:
                newdata = parentctx[source].data()
            copied = None
            if source != dest:
                copied = source
            return context.memfilectx(path=dest,
                                      data=newdata,
                                      islink=False,
                                      isexec=False,
                                      copied=copied)

        ctx = context.memctx(repo,
                             (parentctx.node(), node.nullid),
                             message,
                             changed + removed,
                             filectxfn,
                             'an_author',
                             '2008-10-07 20:59:48 -0500')
        nodeid = repo.commitctx(ctx)
        repo = self.repo
        hg.clean(repo, nodeid)
        return nodeid

    def assertchanges(self, changes, ctx):
        """Assert that all 'changes' (as in defined in commitchanged())
        went into ctx.
        """
        for source, dest, data in changes:
            if dest is None:
                self.assertTrue(source not in ctx)
                continue
            self.assertTrue(dest in ctx)
            if data is None:
                data = ctx.parents()[0][source].data()
            self.assertEqual(ctx[dest].data(), data)
            if dest != source:
                copy = ctx[dest].renamed()
                self.assertEqual(copy[0], source)

    def assertMultiLineEqual(self, first, second, msg=None):
        """Assert that two multi-line strings are equal. (Based on Py3k code.)
        """
        try:
            return super(TestBase, self).assertMultiLineEqual(first, second,
                                                              msg)
        except AttributeError:
            pass

        self.assert_(isinstance(first, str),
                     ('First argument is not a string'))
        self.assert_(isinstance(second, str),
                     ('Second argument is not a string'))

        if first != second:
            diff = ''.join(difflib.unified_diff(first.splitlines(True),
                                                second.splitlines(True),
                                                fromfile='a',
                                                tofile='b'))
            msg = '%s\n%s' % (msg or '', diff)
            raise self.failureException, msg

    def getgraph(self, repo):
        """Helper function displaying a repository graph, especially
        useful when debugging comprehensive tests.
        """
        # Could be more elegant, but it works with stock hg
        _ui = ui.ui()
        _ui.setconfig('extensions', 'graphlog', '')
        extensions.loadall(_ui)
        graphlog = extensions.find('graphlog')
        templ = """\
changeset: {rev}:{node|short}
branch:    {branches}
tags:      {tags}
summary:   {desc|firstline}
files:     {files}

"""
        _ui.pushbuffer()
        graphlog.graphlog(_ui, repo, rev=None, template=templ)
        return _ui.popbuffer()

    def draw(self, repo):
        sys.stdout.write(self.getgraph(repo))