Commits

holger krekel  committed 8385edc

initial import (flat from subversion)

  • Participants

Comments (0)

Files changed (35)

+
+uconf provides a API to system level information
+about users, their email addresses, password hashes etc.pp.
+You usually run it as the root user to have full information.
+
+The package is licensed under the MIT license.
+
+from setuptools import setup
+
+setup(
+    name='uconf',
+    description='linux/unix access to system level user/group information',
+    long_description = open('README.txt').read(),
+    version='0.8.0.dev0',
+    license='MIT license',
+    platforms=['unix', 'linux', ],
+    author='holger krekel',
+    author_email='holger at merlinux.eu',
+    classifiers=['Development Status :: 6 - Mature',
+                 'Intended Audience :: Developers',
+                 'License :: OSI Approved :: MIT License',
+                 'Operating System :: POSIX',
+                 'Operating System :: Microsoft :: Windows',
+                 'Operating System :: MacOS :: MacOS X',
+                 'Topic :: Software Development :: Libraries',
+                 'Topic :: Utilities',
+                 'Programming Language :: Python',
+    ],
+    entry_points = {'console_scripts': [
+        'uconf.system.loadusers = uconf.cmdline.loadusers:main',
+        'uconf.system.dumpusers = uconf.cmdline.dumpusers:main',
+    ]},
+    packages=['uconf',],
+    install_requires = ['py>=1.4',],
+    zip_safe=False,
+)
+
+

File uconf/__init__.py

+"""
+uconf provides a nice API to system level information
+about users, their email addresses, password hashes etc.pp.
+You usually run it as the root user to have full information.
+"""
+from py.apipkg import initpkg
+__version__ = "0.7.0-alpha1",
+
+initpkg(__name__,
+    exportdefs={
+    'readconfig':         '.config:readconfig',
+    'readaliases':        '.misc:readaliases',
+    'Password':           '.password:Password',
+
+    'system': {
+        'hasuser':     '._system:hasuser',
+        'hasgroup':    '._system:hasgroup',
+        'ensuregroup': '._system:ensuregroup',
+        'delgroup':    '._system:delgroup',
+        'deluser':     '._system:deluser',
+        'nextuid':     '._system:nextuid',
+        'adduser':     '._system:adduser',
+        'changeuser':  '._system:changeuser',
+        'listuser':    '._system:listuser',
+        'usersbyname': '._system:usersbyname',
+        'listgroup':   '._system:listgroup',
+        'Group':       '._system:SystemGroup',
+        'User':        '._system:SystemUser',
+    },
+
+    'roundup': {
+        'ensure_users': '.roundup2.ensure_users',
+        'setsyspath': '.roundup2.setsyspath',
+        'openinstance': '.roundup2.openinstance',
+    },
+   
+    'cmdline': {
+        'loaduserconfig': {
+            'main':   '.loaduserconfig:main',
+        },
+        'loadusers': {
+            'main':   '._cmdline.loadusers:main',
+        },
+        'dumpusers': {
+            'main':   '._cmdline.dumpusers:main',
+        },
+    },
+    'svnconf': {
+        'parser':      '.svnconfparser.svnconfparser:svnconf',
+        'pwdtrigger':  '.svnconfparser.svnconfparser:pwdtrigger',
+        'authz':       '.svnconfparser.svnconfparser:authz',
+    },
+})
+

File uconf/_cmdline/__init__.py

Empty file added.

File uconf/_cmdline/_find.py

+#!/usr/bin/python 
+
+#
+# find and import a close-by package 
+#
+import sys
+import os
+from os.path import dirname as opd, exists, join, basename, abspath
+
+rootpackagename = 'uconf' 
+
+def search(current): 
+    while 1:
+        last = current
+        initpy = join(current, '__init__.py')
+        if not exists(initpy):
+            pdir = join(current, rootpackagename) 
+            # recognize package and ensure it is importable
+            if exists(pdir) and exists(join(pdir, '__init__.py')):
+                #for p in sys.path:
+                #    if p == current:
+                #        return True
+                if current != sys.path[0]:  # if we are already first, then ok
+                    print >>sys.stderr, "inserting into sys.path:", current
+                    sys.path.insert(0, current)
+                return True
+        current = opd(current)
+        if last == current:
+            return False
+
+if not search(abspath(os.curdir)):
+    if not search(opd(abspath(sys.argv[0]))):
+        if not search(opd(__file__)):
+            pass # let's hope it is just on sys.path 
+
+globals()[rootpackagename] = __import__(rootpackagename) 
+
+if __name__ == '__main__': 
+    print rootpackagename, "is at", globals()[rootpackagename].__file__ 

File uconf/_cmdline/dumpusers.py

+#!/usr/bin/python
+
+import uconf
+import py
+from uconf.rangestr import rangestr
+import sys
+
+minuid = 500
+
+def main(args=None):
+    if args is None:
+        args = py.std.sys.argv[1:]
+    else:
+        args = map(str, args)
+
+    parser = py.compat.optparse.OptionParser()
+    Option = py.compat.optparse.Option
+    options = [Option("--minuid", action="store", type="int", default=minuid,
+                      dest="minuid",
+                      help="minimum userid"),
+               Option("--group", action="store", type="string", default="",
+                      dest="groups",
+                      help="user must belong to this group(s), delmit groups with ','")]
+    parser.add_options(options)
+    option, args = parser.parse_args(args)
+    if len(args):
+        users = [uconf.system.User(x) for x in args]
+    else:
+        users = uconf.system.listuser()
+        
+    users = [user for user in users if user.uid >= option.minuid]
+    
+    if option.groups:
+        requested_groups = option.groups.split(',')
+        users = [user for user in users if True in
+                 [requested_group in user.groups for requested_group in requested_groups]]
+    
+    users.sort(lambda x,y: cmp(x.uid, y.uid))
+    for user in users:
+        print user.repr_config()
+    

File uconf/_cmdline/loadusers.py

+#!/usr/bin/python
+
+def main():
+    import uconf
+    uconf.cmdline.loaduserconfig.main()

File uconf/_cmdline/rsync_from_thoth.py

+#!/usr/bin/python 
+import py
+mypath = py.magic.autopath()
+import uconf 
+import sys
+#py.magic.invoke(assertion=1)
+
+rsync = py.path.local.sysfind('rsync') 
+
+if __name__ == '__main__':
+    config = uconf.UserConfig(py.path.local(sys.argv[1]))
+    if len(sys.argv) > 2:
+        onlyuser = sys.argv[2]
+    else:
+        onlyuser = None
+    for user in config.getlist():
+        if onlyuser and onlyuser != user.name: 
+            continue
+        if not uconf.system.hasuser(user.name):
+            print "user %r DOES NOT exist, skipping" % user.name 
+            continue
+        print "preparing", user
+        homedir = user.homedir 
+        assert homedir.check(dir=1)
+        assert uconf.system.hasuser(user.name) 
+        for group in user.groups: 
+            assert uconf.system.hasgroup(group), "group %r missing" % group
+        print "Rsyncing %s from codespeak" % homedir
+        rsync.sysexec('-az', 'codespeak.net:%s/' % homedir, 
+                      str(homedir))
+        print "fixing ownership in %s" % homedir
+        for x in user.homedir.visit(py.path.checker(link=0)):
+            if x.group() in ('users', 'svnusers', 'code', 'pypy'): 
+                x.chown(user.name, user.name) 
+        user.homedir.chmod(0755)
+        break
+    py.process.cmdexec('pam_pwd_trigger')

File uconf/_cmdline/thoth_to_code1.py

+import py
+py.magic.autopath() 
+import uconf
+import sys
+
+if __name__ == '__main__': 
+    source = py.path.local(sys.argv[1])
+    dest = py.path.local(sys.argv[2])
+
+    config = uconf.UserConfig(source) 
+    newlist = []
+    for user in config.getlist():
+        for obsoletegroup in 'cvsusers', 'users': 
+            if obsoletegroup in user.groups: 
+                user.groups.remove(obsoletegroup)
+        if user.name not in user.groups: 
+            user.groups.insert(0, user.name) 
+            #print "%s has groups %s" % (user.name, user.groups) 
+        newlist.append(user) 
+    newlist.sort()  
+    dest.write("\n".join([x.repr_config() for x in newlist]))

File uconf/_cmdline/uconf.roundup.loadusers

+#!/usr/bin/python 
+usage = """
+invoke
+
+    %s tracker_instance_home_directory [path/to/userconfig.ini]
+
+to synchronize users from the userconfig.ini towards the roundup instance. 
+by default the userconfig.ini file resides in the tracker instance home 
+directory. 
+
+""" % __file__
+
+import sys
+from _find import uconf
+import py
+
+parser = py.compat.optparse.OptionParser(usage=usage)
+parser.add_option("-e", "--defaultmaildomain", dest="defaultmaildomain",
+                  help="for missing user emails use NAME@defaultmaildomain") 
+parser.add_option("-o", "--defaultorganisation", dest="defaultorganisation",
+                  help="for missing user organisations") 
+parser.add_option("--debug", action="store_true", dest="debug",
+                  default=False,
+                  help="print debug output and don't delete files")
+
+if __name__ == '__main__': 
+    options, args = parser.parse_args()
+    if len(args) == 0:
+        parser.error("please supply a TRACKER_INSTANCE_DIRECTORY") 
+    tracker_home = py.path.local(args[0])
+    uconf.roundup.setsyspath(tracker_home) 
+    
+    if len(args) > 1:
+        userconfig_path = py.path.local(args[1])
+    else:
+        userconfig_path = tracker_home / 'userconfig.ini'
+    assert userconfig_path.check(), (
+        "userconfig file does not exist %s" % userconfig_path)
+  
+    tracker = uconf.roundup.openinstance(tracker_home) 
+    userlist = uconf.readconfig(userconfig_path)
+    db = tracker.open('admin')
+    try: 
+        uconf.roundup.ensure_users(db, userlist, 
+                                   defaultorganisation=options.defaultorganisation,
+                                   defaultmaildomain=options.defaultmaildomain) 
+        db.commit()
+    finally: 
+        db.close()

File uconf/_system.py

+
+import py
+import uconf
+import pwd, grp
+from uconf.user import User
+from py._path.cacheutil import AgingCache
+from uconf.rangestr import rangestr
+
+from log import log
+
+entrycache = AgingCache(maxseconds=-1) # 60) # NO CACHING
+
+def hasuser(uid):
+    if isinstance(uid, int):
+        try:
+            pwd.getpwuid(uid)
+        except KeyError:
+            return False
+        else:
+            return True
+    else:
+        try:
+            pwd.getpwnam(uid)
+        except KeyError:
+            return False
+        else:
+            return True
+
+def hasgroup(gid):
+    if isinstance(gid, int):
+        try:
+            grp.getgrgid(gid)
+        except KeyError:
+            return False
+        else:
+            return True
+    else:
+        try:
+            grp.getgrnam(gid)
+        except KeyError:
+            return False
+        else:
+            return True
+
+def nextuid(start=500, end=None):
+    i = start
+    while not end or i < end:
+        try:
+            pwd.getpwuid(i)
+        except KeyError:
+            return i
+        else:
+            i += 1
+    raise ValueError, "did not find new UID"
+
+def newuid(uidrange):
+    ids = usersbyid()
+    for i in uidrange:
+        if i not in ids:
+            return i
+
+def newgid(gidrange):
+    ids = groupsbyid()
+    for i in gidrange:
+        if i not in ids:
+            return i
+
+def ensuregroup(name, gid=None):
+    if not uconf.system.hasgroup(name):
+        if gid is None:
+            execcmd('groupadd %s' % name)
+        else:
+            execcmd('groupadd -g %d %s' % (gid, name))
+
+def delgroup(name):
+    if not uconf.system.hasgroup(name):
+        raise ValueError, "group %s does not exist" % name
+    if isinstance(name, int):
+        name = grp.getgrgid(gid).gr_name
+    execcmd('groupdel %s' % name)
+
+def deluser(name, removehomedir=False):
+    if not uconf.system.hasuser(name):
+        raise ValueError, "user %s does not exist" % name
+    flag = removehomedir and "-r" or ""
+    cmd = "userdel %s %s" %(flag, name)
+    execcmd(cmd)
+
+def changeuser(user, gidrange=rangestr("1000-")):
+    #if not uconf.system.hasuser(user.name):
+    #    print "user %r does not exist" % user.name
+
+    sysuser = uconf.system.User(user.name)
+    assert sysuser.name == user.name
+    for group in user.groups:
+        if not uconf.system.hasgroup(group):
+            uconf.system.ensuregroup(group, gid=newgid(gidrange))
+
+    primarygroup = user.groups[0]
+    secondarygroups = user.groups[1:]
+    if getattr(user, 'password', None) is None:
+        user.password = sysuser.password
+
+    l = ['usermod -c "%s"' % user.realname,
+         '-d %s' % user.homedir,
+         '-g %s' % primarygroup,
+         '-p %r' % user.password.hashspec,
+         '-s %s' % user.shell,
+        ]
+    if secondarygroups:
+        l.append("-G %s" % ",".join(secondarygroups))
+    l.append(user.name)
+    execcmd(*l)
+    ensure_homedir_layout(user)
+
+def ensure_homedir_layout(user):
+    if user.homedir.check(file=1):
+       log.warn("home directory %s is a file - skipping setup of layout" %(user.homedir,))
+       return
+    sshdir = user.homedir.ensure('.ssh', dir=1)
+    authorized_keys = sshdir.ensure('authorized_keys')
+    if user.password.clear:
+        sshdir.join('password').write(user.password.clear)
+    if user.ssh_authorized_keys:
+        content = "\n".join(user.ssh_authorized_keys)
+        authorized_keys.write(content)
+    if user.email:
+        user.homedir.join('.forward').write(user.email)
+        #try:
+        #    content = mailaliases.read().rstrip()
+        #    content = "%s\n%s: %s\n" %(content, user.name, user.email)
+        #    mailaliases.write(content)
+        #except (py.error.ENOENT, py.error.EACCES):
+        #    pass
+        user.homedir.join('.forward').chown(user.name, user.primarygroup.name)
+
+    # fixup perms
+    user.homedir.chown(user.name, user.primarygroup.name)
+    sshdir.chown(user.name, user.primarygroup.name, rec=1)
+    sshdir.chmod(0700, rec=1)
+    forward = user.homedir.join('.forward')
+    if forward.check():
+        forward.chmod(0700)
+
+def adduser(user, uidrange=None, gidrange=None):
+    if uconf.system.hasuser(user.name):
+        raise EnvironmentError("userid already exists", user.name)
+
+    for group in user.groups:
+        if not uconf.system.hasgroup(group):
+            log.warn("warning, implicitely creating group %r" % group)
+            uconf.system.ensuregroup(group)
+
+
+    if uidrange is None:
+        uidrange = rangestr("1000-")
+    if gidrange is None:
+        gidrange = rangestr("1000-")
+    uid = newuid(uidrange)
+    primarygroup = user.groups[0]
+    groups = user.groups[1:]
+    if user.password is not None:
+        password = user.password
+    else:
+        password = uconf.Password()
+    l = ['useradd -c "%s"' % user.realname,
+         '-u %d' % uid,
+         '-g %s' % primarygroup,
+         '-p %r' % password.hashspec,
+         '-s %s' % user.shell,
+        ]
+    if groups:
+        l.append("-G %s" % ",".join(groups))
+    l.append(user.name)
+    execcmd(*l)
+
+    if user.homedir.check(exists=1, dir=0):
+        log.warn("skipping creation of ssh-keys for", user)
+        return password
+
+    try:
+        authorized_keys = user.homedir.ensure(dir=1)
+        sshdir = user.homedir.ensure('.ssh', dir=1)
+        authorized_keys = sshdir.ensure('authorized_keys')
+        if password.clear:
+            sshdir.join('password').write(password.clear)
+        if user.ssh_authorized_keys:
+            content = "\n".join(user.ssh_authorized_keys)
+            authorized_keys.write(content)
+        if user.email:
+            user.homedir.join('.forward').write(user.email)
+            try:
+                content = mailaliases.read().rstrip()
+                content = "%s\n%s: %s\n" %(content, user.name, user.email)
+                mailaliases.write(content)
+            except (py.error.ENOENT, py.error.EACCES):
+                pass
+
+        # fixup perms
+        user.homedir.chown(user.name, primarygroup, rec=1)
+        sshdir.chmod(0700, rec=1)
+        forward = user.homedir.join('.forward')
+        if forward.check():
+            forward.chmod(0700)
+    except:
+        execcmd("userdel %s" % user.name)
+        raise
+    return password
+
+    #f = open(mailfn, 'a')
+    #print >>f, "%s: %s" %(username, email)
+    #f.close()
+    #print "username/email info written to", mailfn
+
+
+def execcmd(*parts):
+    cmd = " ".join(parts)
+    log.shell(cmd)
+    py.process.cmdexec(cmd)
+
+class SystemGroup(object):
+    def __init__(self, key):
+        if not hasgroup(key):
+            raise ValueError("system group %r does not exist" % (key,))
+        if isinstance(key, str):
+            self._name = key
+        elif isinstance(key, int):
+            self._id = key
+        else:
+            raise TypeError("%r is not string nor int" % (key,))
+
+    def __repr__(self):
+        return "<SystemGroup %r>" % (self.name)
+
+    def __eq__(self, other):
+        if isinstance(other, str):
+            return self.name == other
+        elif isinstance(other, int):
+            return self.id == other
+        elif isinstance(other, self.__class__):
+            return self.name == other.name and self.id == other.id
+        return False
+
+    def __hash__(self):
+        return self.id
+    def __cmp__(self, other):
+        return cmp(self.name, other.name)
+
+    def name(self):
+        try:
+            return self._name
+        except AttributeError:
+            self._setentry(grp.getgrgid(self._id))
+            return self._name
+    name = property(name)
+
+    def id(self):
+        try:
+            return self._id
+        except AttributeError:
+            self._setentry(grp.getgrnam(self._name))
+            return self._id
+    id = property(id)
+
+    def _setentry(self, entry):
+        self._name = entry.gr_name
+        self._id = entry.gr_gid
+        self._members = [SystemUser(x) for x in entry.gr_mem]
+
+    def members(self):
+        try:
+            return self._members
+        except AttributeError:
+            self.name, self.id # trigger load of entry
+            mems = [x.name for x in self._members]
+            for user in uconf.system.listuser():
+                if user.name not in mems:
+                    if user.groups and user.groups[0] == self.name:
+                        self._members.append(user)
+            return self._members
+    members = property(members)
+
+class SystemUser(User):
+    _attrs = ('name', 'email', 'realname', 'shell', 'password',
+               'homedir', 'ssh_authorized_keys', 'groups')
+
+    def __init__(self, name, **kw):
+        self.name = name
+        self.__dict__.update(kw)
+
+    for name in ('uid', 'groups', 'homedir', 'shell', 'realname'):
+        exec py.code.Source("""
+            def %(name)s():
+                def fget(self):
+                    try:
+                        return self._%(name)s
+                    except AttributeError:
+                        self._loadentry()
+                        return self._%(name)s
+                return property(fget)
+            %(name)s = %(name)s()
+        """ % locals()).compile()
+
+    def check(self):
+        return hasuser(self.name)
+
+    def _loadentry(self):
+        entry = pwd.getpwnam(self.name)
+        self._uid = entry.pw_uid
+        try:
+            self._groups = [grp.getgrgid(entry.pw_gid).gr_name]
+        except:
+            self._groups = []
+        self._homedir = py.path.local(entry.pw_dir)
+        self._shell = py.path.local(entry.pw_shell)
+        self._realname = entry.pw_gecos
+        for g in entrycache.getorbuild('grall', grp.getgrall):
+            if g.gr_name not in self._groups:
+                if self.name in g.gr_mem:
+                    self.groups.append(g.gr_name)
+
+    def password(self):
+        try:
+            return self._password
+        except AttributeError:
+            self._password = uconf.Password(hashspec=gethashspec(self.name))
+            return self._password
+    password = property(password)
+
+    def email():
+        def fget(self):
+            try:
+                return self._email
+            except AttributeError:
+                self._email = getemail(self)
+                return self._email
+        return property(fget)
+    email = email()
+
+    def _get_ssh_authorized_keys(self):
+        try:
+            return self._ssh_authorized_keys
+        except AttributeError:
+            self._ssh_authorized_keys = parse_authorized_keys(self)
+            return self._ssh_authorized_keys
+
+    ssh_authorized_keys = property(_get_ssh_authorized_keys)
+
+    def __repr__(self):
+        return "<SystemUser %r>" % self.name
+    __str__ = __repr__
+
+emails = {}
+
+mailaliases = py.path.local('/etc/mail/user')
+
+def readsystem():
+    if not emails and mailaliases.check(file=1):
+        try:
+            for x in mailaliases.readlines():
+                if x.strip() and x.lstrip()[0] != '#':
+                    username, email = x.split(':')
+                    emails[username.strip()] = email.strip()
+        except (py.error.EPERM, py.error.EACCES):
+            pass
+
+def getemail(user):
+    f = user.homedir.join('/.forward')
+    if f.check(file=1):
+        for a in f.read().split(','):
+            if '@' in a:
+                return a.strip().lstrip('\\')
+    readsystem()
+    try:
+        return emails[user.name]
+    except KeyError:
+        return ""
+
+def parse_authorized_keys(user):
+    f = user.homedir.join('/.ssh/authorized_keys')
+    if f.check(file=1):
+        return f.read().strip().split('\n')
+    return []
+
+def listuser(filterfunc=None):
+    l = []
+    for entry in entrycache.getorbuild('pwall', pwd.getpwall):
+        user =  SystemUser(entry[0])
+        if filterfunc and not filterfunc(user):
+            continue
+        l.append(user)
+    return l
+
+def usersbyname():
+    return dict([(x.name, x) for x in listuser()])
+
+def usersbyid():
+    return dict([(x.uid, x) for x in listuser()])
+
+def groupsbyid():
+    return dict([(x.id, x) for x in listgroup()])
+
+def listgroup(filterfunc=None):
+    l = []
+    for entry in entrycache.getorbuild('grall', grp.getgrall):
+        group =  SystemGroup(entry.gr_name)
+        group._setentry(entry)
+        if filterfunc and not filterfunc(group):
+            continue
+        l.append(group)
+    return l
+
+
+def gethashspec(name):
+    entry = pwd.getpwnam(name)
+    if entry.pw_passwd == 'x':
+        for x in py.path.local('/etc/shadow').readlines():
+            l = x.split(':')
+            if l[0] == name:
+                return l[1]
+        else:
+            return None #
+            # raise ValueError("%r: password not found" % self.name)
+    else:
+        return entry.pw_passwd
+
+

File uconf/config.py

+
+import py
+import ConfigParser
+from uconf.user import User
+import uconf
+
+def getlist(userlistpath = None):
+    if userlistpath is None:
+        userlist = config.userlistpath
+    userlistpath = py.path.local(userlistpath)
+    assert userlistpath.check(file=1)
+
+    parser = UserConfig(userlistpath)
+    return parser.getuserlist()
+
+def readconfig(path):
+    """ return list of User object from configuration file. (ini-format)"""
+    path = py.path.local(path)
+    assert path.check(file=1)
+    
+    parser = ConfigParser.ConfigParser()
+    parser.read(str(path))
+    l = [UserFromConfig(parser, name)
+            for name in parser.sections()
+                if name != 'default']
+    return l
+
+class UserFromConfig(User):
+    __attrs__ = ('name', 'email', 'realname', 'shell', 'hashspec',
+                 'organisation', 'homedir', 'ssh_authorized_keys',
+                 'groups')
+    def __init__(self, parser, name):
+        self.name = name
+        self._parser = parser
+
+    def __str__(self):
+        return "User(%r)" % self.name
+    __repr__ = __str__
+
+    def __getattr__(self, name):
+        if name not in self.__attrs__:
+            raise AttributeError, name
+        try:
+            return self._parser.get(self.name, name)
+        except ConfigParser.NoSectionError:
+            raise ValueError, "User not found: %s" % self.name
+        except ConfigParser.NoOptionError:
+            try:
+                return self._parser.get('default', name)
+            except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
+                return None
+
+    def password(self):
+        try:
+            return self._password
+        except AttributeError:
+            if self.hashspec is not None:
+                self._password = uconf.Password(hashspec=self.hashspec)
+            else:
+                self._password = None
+            return self._password
+    password=property(password)
+        
+    def _get_homedir(self):
+        try:
+            return py.path.local(self.__getattr__('home'))
+        except AttributeError:
+            return py.path.local('/home/%s' % self.name)
+    homedir = property(_get_homedir)
+
+    def _get_groups(self):
+        try:
+            return self._groups
+        except AttributeError:
+            groups = self.__getattr__('groups')
+            groups = [x.strip() for x in groups.split(',')]
+            self._groups = groups
+            return groups
+    groups = property(_get_groups)
+
+    def _get_ssh_authorized_keys(self):
+        try:
+            key = self.__getattr__('ssh_authorized_keys')
+        except AttributeError:
+            return None
+        else:
+            keys = [x.strip() for x in key.split(',')]
+            return keys
+    ssh_authorized_keys = property(_get_ssh_authorized_keys)
+
+    #def _getdict(self):
+    #    for name in self.__attrs__:
+    #        setattr(self, name, getattr(self, name))
+    #    dictdescr = object.__dict__
+    #    return dictdescr.__get__(self)
+    #__dict__ = property(_getdict)

File uconf/doc.txt

+User configuration 
+==================
+
+uconf works on /etc/uconf/user.list and is configured
+by /etc/uconf/uconf.cfg 
+
+which contains all current user information including
+plain text passwords.  this relies on unix permissions. 
+
+Commands::
+    uconf list [name1] [...]
+        display information about users in user.db 
+
+    uconf install [--notify-email] [name1] [name2] [name3]
+        installs the given users on the system, 
+        if they already exit a warning is printed 
+        and the user information is not modified 
+
+    # later
+    uconf fixperm [name1] [name2] [name3]
+        fixes permissions for the given users 
+
+    uconf [check]
+        checks if all system users appear in uconf.db 
+        and vice versa 
+
+    uconf read [name1] [name2] ... 
+        loads system information into uconf.db taking 
+        the the system state as the origin 
+
+    uconf dump filename 
+        dumps the current database into the given filename 
+
+    uconf load filename 
+        load the given database into the uconf.db 
+
+format of user.list
+-------------------
+
+[username]
+realname = X Y Z 
+email = xyz@a.b.c 
+shell = /bin/sh 
+home = /home/username 
+hashedpassword = 1923871982379123
+ssh_authorized_key = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx \
+                     xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx \
+                     xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx \
+                     xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

File uconf/loaduserconfig.py

+
+import sys
+import py
+import uconf
+import optparse
+from uconf.rangestr import rangestr
+
+from log import log
+
+def newgid():
+    for gid in SECGROUPRANGE:
+        if not uconf.system.hasgroup(gid):
+            return gid
+    else:
+        raise ValueError("could not get GID")
+
+def newuid():
+    for uid in USERRANGE:
+        if not uconf.system.hasuser(uid) and not uconf.system.hasgroup(uid):
+            return uid
+    else:
+        raise ValueError("could not get userid/groupid pair")
+
+def ensure_secondary_groups(users):
+    newgroups = {}
+    for user in users:
+        groups = user.groups
+        primary = groups[0]
+        assert primary == user.name
+        for group in groups[1:]:
+            newgroups.setdefault(group, []).append(user.name)
+    sysgroups = dict([(x.name, x) for x in uconf.system.listgroup()])
+    log.debug("new groups")
+    for group, members in newgroups.items():
+        if group not in sysgroups: # and group not in IGNOREGROUPS:
+            gid = newgid()
+            py.process.cmdexec("groupadd -g %d %s" %(gid, group))
+            log.debug("   [%d]" % gid, group, ":", " ".join(members))
+
+if __name__ == 'XXX__main__':
+    sysusers = uconf.system.listuser()
+    sysusers = dict([(x.name, x) for x in sysusers])
+    users = uconf.readconfig('user.dump')
+    #ensure_secondary_groups(users)
+    users.sort()
+    for user in users:
+        if user.name != py.std.sys.argv[1]:
+            continue
+        if user.name in sysusers:
+            print user.name, "already exists"
+            uid = sysusers[user.name].uid
+            sysuser = sysusers[user.name]
+            newgroups = []
+            for group in user.groups[1:]:
+                if group not in sysuser.groups:
+                    newgroups.append(group)
+            if newgroups:
+                print "adding groups to user", user.name, newgroups
+                newgroups = user.groups[1:]
+                py.process.cmdexec("usermod -G %s %s" % (
+                    ",".join(newgroups), user.name))
+            if len(sysuser.password.hashspec) < 2:
+                print "password missing", user.name
+                py.process.cmdexec("usermod -p '%s' %s" %(user.password.hashspec, user.name))
+        else:
+            try:
+                group = uconf.system.Group(user.name)
+            except ValueError:
+                gid = uid = newuid()
+                py.process.cmdexec("groupadd -g %d %s" %(gid, user.name))
+            else:
+                gid = uid = group.id
+                assert not uconf.system.hasuser(uid)
+            realname = user.realname
+            name = user.name
+            homedir = '/home/%s' % name
+            group = user.groups[0]
+            groups = ",".join(user.groups[1:])
+            assert group == name
+            if groups.strip():
+                groups = "-G " + groups
+            #py.process.cmdexec("groupadd -g %d %s" %(uid, user.name))
+            cmd = ("useradd -c %(realname)r -d %(homedir)s "
+                   "-g %(gid)s %(groups)s -u %(uid)s %(name)s" % locals())
+            #print cmd
+            py.process.cmdexec(cmd)
+            homedir = py.path.local(homedir)
+            homedir.ensure(dir=1)
+            homedir.chown(name, name)
+
+            print "   [%d] %s (secgroups: %s)" %(uid, user.name, groups)
+
+class UserError(Exception):
+    """signalling a user error"""
+
+class MainCommand(object):
+    def __init__(self, options, args):
+        self.option = options
+        self.args = args
+        self.uidrange = rangestr(self.option.uidrange)
+        self.gidrange = rangestr(self.option.gidrange)
+
+    def run(self):
+        try:
+            self.execute()
+        except UserError, e:
+            print >>py.std.sys.stderr, "ERROR", e.args[0]
+            raise SystemExit(1)
+
+    def execute(self):
+        if len(self.args) != 1:
+            raise UserError("need exactly one user configuration file argument, got", self.args)
+        config = py.path.local(self.args[0])
+        if not config.check(file=1):
+            raise UserError("%s does not exist" % config)
+        log.debug("loading config file", config)
+        userlist = uconf.readconfig(config)
+        self.synctosystem(userlist)
+
+    def synctosystem(self, userlist):
+        sysusers = uconf.system.usersbyname()
+        for user in userlist:
+            if not user.name in sysusers:
+                uconf.system.adduser(user, self.uidrange, self.gidrange)
+            else:
+                log.debug("modifying existing user", user.name)
+                if not self.option.delgroups:
+                    sysuser = sysusers[user.name]
+                    for group in sysuser.groups:
+                        if group not in user.groups:
+                            user.groups.append(group)
+                uconf.system.changeuser(user)
+            ensure_ssh_key(user)
+
+def ensure_ssh_key(user):
+    log.debug("checking ssh-keys for", user.name)
+
+    ssh_keys = user.ssh_authorized_keys
+    if ssh_keys:
+        ssh_keys = filter(None, ssh_keys)
+    if not ssh_keys:
+        log.debug("user", user.name, "has no ssh-keys defined")
+        return
+
+    p = user.homedir.join('.ssh', 'authorized_keys')
+    lines = []
+    if p.check():
+        lines = p.readlines(cr=0)
+    while ssh_keys:
+        key = ssh_keys.pop()
+        if key not in lines:
+            lines.append(key)
+    if not lines:
+        log.debug("all keys are already in", p)
+
+    content = "\n".join(lines)
+    log.debug("would write to", p, "content", repr(content))
+    p.write(content)
+
+def main(args=None):
+    if args is None:
+        args = py.std.sys.argv[1:]
+    else:
+        args = map(str, args)
+    parser = optparse.OptionParser()
+    Option = optparse.Option
+    options = [Option("--ignoregroups", action="store", type="string", default="",
+                      help="commat separate groups to ignore"),
+               Option("--delgroups", action="store_true", default=False,
+                      dest="delgroups",
+                      help="delete existing groups when they don't exist in the config"),
+               Option("--uidrange", action="store", type="string", default="1000-10000",
+                      dest="uidrange",
+                      help="userid range when creating new users"),
+               Option("--gidrange", action="store", type="string", default="1000-10000",
+                      dest="gidrange",
+                      help="group id range when creating new users"),
+          ]
+    parser.add_options(options)
+    option, args = parser.parse_args(args)
+    cmd = MainCommand(option, args)
+    cmd.run()

File uconf/log.py

+
+import py
+
+log = py.log.Producer("uconf")
+py.log.setconsumer("uconf", py.log.STDERR)

File uconf/md5crypt.py

+
+#########################################################
+# md5crypt.py
+#
+# 0423.2000 by michal wallace http://www.sabren.com/
+# based on perl's Crypt::PasswdMD5 by Luis Munoz (lem@cantv.net)
+# based on /usr/src/libcrypt/crypt.c from FreeBSD 2.2.5-RELEASE
+#
+# MANY THANKS TO
+#
+#  Carey Evans - http://home.clear.net.nz/pages/c.evans/
+#  Dennis Marti - http://users.starpower.net/marti1/
+#
+#  For the patches that got this thing working!
+#
+#########################################################
+"""md5crypt.py - Provides interoperable MD5-based crypt() function
+
+SYNOPSIS
+
+	import md5crypt.py
+
+	cryptedpassword = md5crypt.md5crypt(password, salt);
+
+DESCRIPTION
+
+unix_md5_crypt() provides a crypt()-compatible interface to the
+rather new MD5-based crypt() function found in modern operating systems.
+It's based on the implementation found on FreeBSD 2.2.[56]-RELEASE and
+contains the following license in it:
+
+ "THE BEER-WARE LICENSE" (Revision 42):
+ <phk@login.dknet.dk> wrote this file.  As long as you retain this notice you
+ can do whatever you want with this stuff. If we meet some day, and you think
+ this stuff is worth it, you can buy me a beer in return.   Poul-Henning Kamp
+
+apache_md5_crypt() provides a function compatible with Apache's
+.htpasswd files. This was contributed by Bryan Hart <bryan@eai.com>.
+
+"""
+
+MAGIC = '$1$'			# Magic string
+ITOA64 = "./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+
+import md5
+
+def to64 (v, n):
+    ret = ''
+    while (n - 1 >= 0):
+        n = n - 1
+	ret = ret + ITOA64[v & 0x3f]
+	v = v >> 6
+    return ret
+
+
+def apache_md5_crypt (pw, salt):
+    # change the Magic string to match the one used by Apache
+    return unix_md5_crypt(pw, salt, '$apr1$')
+
+
+def unix_md5_crypt(pw, salt, magic=None):
+    
+    if magic==None:
+        magic = MAGIC
+
+    # Take care of the magic string if present
+    if salt[:len(magic)] == magic:
+        salt = salt[len(magic):]
+        
+
+    # salt can have up to 8 characters:
+    import string
+    salt = string.split(salt, '$', 1)[0]
+    salt = salt[:8]
+
+    ctx = pw + magic + salt
+
+    final = md5.md5(pw + salt + pw).digest()
+
+    for pl in range(len(pw),0,-16):
+        if pl > 16:
+            ctx = ctx + final[:16]
+        else:
+            ctx = ctx + final[:pl]
+
+
+    # Now the 'weird' xform (??)
+
+    i = len(pw)
+    while i:
+        if i & 1:
+            ctx = ctx + chr(0)  #if ($i & 1) { $ctx->add(pack("C", 0)); }
+        else:
+            ctx = ctx + pw[0]
+        i = i >> 1
+
+    final = md5.md5(ctx).digest()
+    
+    # The following is supposed to make
+    # things run slower. 
+
+    # my question: WTF???
+
+    for i in range(1000):
+        ctx1 = ''
+        if i & 1:
+            ctx1 = ctx1 + pw
+        else:
+            ctx1 = ctx1 + final[:16]
+
+        if i % 3:
+            ctx1 = ctx1 + salt
+
+        if i % 7:
+            ctx1 = ctx1 + pw
+
+        if i & 1:
+            ctx1 = ctx1 + final[:16]
+        else:
+            ctx1 = ctx1 + pw
+            
+            
+        final = md5.md5(ctx1).digest()
+
+
+    # Final xform
+                                
+    passwd = ''
+
+    passwd = passwd + to64((int(ord(final[0])) << 16)
+                           |(int(ord(final[6])) << 8)
+                           |(int(ord(final[12]))),4)
+
+    passwd = passwd + to64((int(ord(final[1])) << 16)
+                           |(int(ord(final[7])) << 8)
+                           |(int(ord(final[13]))), 4)
+
+    passwd = passwd + to64((int(ord(final[2])) << 16)
+                           |(int(ord(final[8])) << 8)
+                           |(int(ord(final[14]))), 4)
+
+    passwd = passwd + to64((int(ord(final[3])) << 16)
+                           |(int(ord(final[9])) << 8)
+                           |(int(ord(final[15]))), 4)
+
+    passwd = passwd + to64((int(ord(final[4])) << 16)
+                           |(int(ord(final[10])) << 8)
+                           |(int(ord(final[5]))), 4)
+
+    passwd = passwd + to64((int(ord(final[11]))), 2)
+
+
+    return magic + salt + '$' + passwd
+
+
+## assign a wrapper function:
+md5crypt = unix_md5_crypt

File uconf/misc.py

+
+import py
+
+def readaliases(path): 
+    """ return dictionary with key-value items
+        from reading the given path in alias-format
+    """
+    d = {}
+    l = []
+    for i, line in py.builtin.enumerate(path.readlines(cr=0)): 
+        line = line.strip()
+        if not line or line.startswith('#'): 
+            continue
+        try:  
+            key, value = line.split(':')
+        except TypeError: 
+            raise ValueError("%s:%d format error" %(path, i))
+        else: 
+            value = value.strip() 
+            key = key.strip()
+            if key not in d: 
+                l.append((key, value)) 
+                d[key] = value 
+    return l 

File uconf/password.py

+import py
+from md5crypt import md5crypt 
+
+def newpassword():
+    output = py.process.cmdexec('passook')
+    p = output.strip().lower()
+    return p
+
+def assert_hashspec(clear, hashspec):
+    if hashspec.startswith('$1$'):
+        i = hashspec.find('$', 3)
+        assert i!=-1
+        salt = hashspec[3:i]
+        rehashspec = py.process.cmdexec('openssl passwd -1 -salt %s %s' % 
+                                    (salt, clear))
+        rehashspec = rehashspec.strip()
+        assert rehashspec == hashspec 
+
+class Password:
+    def __init__(self, clear=None, hashspec=None):
+        if not clear and not hashspec: 
+            clear = newpassword()
+            hashspec = py.process.cmdexec('openssl passwd -1 %s' % clear).strip()
+        elif clear and hashspec:
+            assert_hashspec(clear, hashspec) 
+        elif clear and not hashspec:
+            hashspec = py.process.cmdexec('openssl passwd -1 %s' % clear).strip()
+        self.hashspec = hashspec
+        self.clear = clear 
+
+    def salt(self): 
+        try:
+            tag, salt, hash = filter(None, self.hashspec.split('$'))
+        except ValueError: 
+            return None 
+        return salt 
+    salt = property(salt) 
+
+    def hash(self): 
+        tag, salt, hash = filter(None, self.hashspec.split('$'))
+        return hash 
+    hash = property(hash) 
+
+    def validate(self, clear): 
+        return md5crypt(clear, self.salt) == self.hashspec 
+
+    def __str__(self):
+        return "Password(%r, %r)" %(self.clear, self.hashspec) 
+
+    __repr__ = __str__
+            
+

File uconf/rangestr.py

+import itertools
+import sys
+
+class rangestr(object):
+    def __init__(self, spec):
+        parts = spec.strip().split('-')
+        self.start = int(parts[0])
+        if len(parts) == 2:
+            if parts[1]:
+                self.end = int(parts[1])
+            else:
+                self.end = None
+        else:
+            raise ValueError("range %r is not in 'X-[Y]' format" %(spec,))
+
+    def __len__(self):
+        if self.end is None:
+            return sys.maxint
+        return (self.end-self.start)
+        
+    def __iter__(self):
+        count = itertools.count(self.start)
+        if self.end is None:
+            return count
+        return itertools.islice(count, self.end-self.start)

File uconf/roundup2.py

+#!/usr/bin/python 
+
+import sys
+
+import py 
+import uconf
+
+keep = 'admin', 'anonymous' 
+
+log = py.log.get(
+    report = py.log.STDOUT, 
+    error = py.log.STDERR, 
+    warn = py.log.STDERR,
+    debug = None)
+
+def setsyspath(tracker_home):
+    """ determine roundup version from the given trackerhome
+        and try to determine a suitable roundup installation path
+        and prepend it to the python sys.path list. 
+    """
+    v = py.path.local(tracker_home).join('ROUNDUP_VERSION')
+    if not v.check():
+        log.warn("did not find %s" %(v,))
+    else:
+        ver = v.read().strip()
+        syspath = py.path.local('/www/roundup/roundup-%s' %(ver,))
+        if not syspath.check():
+            log.error("cannot locate roundup version: %s" %(syspath,))
+        else:
+            assert syspath.join('roundup').check()
+            sys.path.insert(0, str(syspath))
+
+def openinstance(tracker_home): 
+    from roundup import instance
+    tracker = instance.open(str(tracker_home))
+    return tracker 
+    
+def ensure_users(db, userlist, 
+                 autonosydefaults=None, 
+                 defaultorganisation="", 
+                 defaultmaildomain=None): 
+    Keyword = db.getclass('keyword')
+    if autonosydefaults is None: 
+        autonosydefaults = {}
+    try: 
+        lines = []
+        def action(msg):
+            lines.append(msg)
+            log.report(msg) 
+
+        for user in userlist: 
+            log.report("processing", user)
+            if not user.email: 
+                if not defaultmaildomain: 
+                    log.error("user %r does not have an email address "
+                          "and we have no defaultmaildomain!" %(user.name,))
+                    continue
+                user.email = "%s@%s" %(user.name, defaultmaildomain)
+            try: 
+                uid = db.user.lookup(user.name) 
+            except KeyError: 
+                if user.name not in autonosydefaults: 
+                    log.warn("user %r does not have autonosydeafults" %(user.name,))
+                    keyword = []
+                else: 
+                    keyword = autonosydefaults[user.name]
+                    keyword = filter(None, keyword.split(','))
+                    keyword = map(Keyword.lookup, keyword)
+                
+                orga = user.organisation or defaultorganisation 
+                db.user.create(username=user.name, realname=user.realname, 
+                               address=user.email, roles='User', 
+                               keyword=keyword,
+                               organisation=defaultorganisation)
+                action('ADD %s - %s - %s' % (user.name, user.realname, defaultorganisation))
+                uid = db.user.lookup(user.name) 
+    
+            if user.realname != db.user.get(uid, 'realname'): 
+                db.user.set(uid, realname=user.realname) 
+                action('FIX %s -%s' %(user.name, user.realname))
+            #if orga and not db.user.get(uid, 'defaultorganisationorganisation'): 
+            #    db.user.set(uid, organisation=defaultorganisationorga) 
+            #    lines.append('FIX %s -%s' %(user.name, orga))
+            if user.organisation and user.organisation != db.user.get(uid, 'organisation'):
+                db.user.set(uid, organisation=user.organisation)
+                action('FIX %s -%s' %(user.name, user.organisation))
+            if user.email and not db.user.get(uid, 'address'): 
+                db.user.set(uid, address=user.email)
+                action('FIX %s -%s' %(user.name, user.email))
+            if 'wheel' in user.groups: 
+                roles = db.user.get(uid, 'roles') .split(',')
+                if 'admin' not in roles: 
+                    roles.append('admin')
+                    db.user.set(uid, roles=",".join(roles) )
+                    action('FIX %s - added admin role' %(user.name,))
+
+        existing = dict([(x.name,x) for x in userlist])
+        for uid in db.user.list(): 
+            username = db.user.get(uid, 'username')
+            #if username not in existing and username not in keep: 
+            #    #db.user.retire(uid)
+            #    action('SHOULD RETIRE? %s' % username)
+            
+        if lines: 
+            msg = '''Subject: %s user database maintenance\n\n''' % db.config.TRACKER_NAME
+            msg += '\n'.join(lines) 
+            #smtp = py.std.smtplib.SMTP(db.config.MAILHOST)
+            #addr = db.config.ADMIN_EMAIL 
+            #smtp.sendmail(addr, addr, msg) 
+            log.debug("XXX would send mail:", msg)
+        db.commit()
+    finally: 
+        db.close() 

File uconf/svnconfparser/__init__.py

Empty file added.

File uconf/svnconfparser/svnconfparser.py

+import py
+import re
+import uconf
+from ConfigParser import ConfigParser as cparser
+
+def enumlines(data, rstrip=True, nonempty=False):
+    for lineno, line in py.builtin.enumerate(data):
+        i = line.find('#')
+        if i != -1:
+            line = line[:i]
+        if rstrip:
+            line = line.rstrip()
+        if not nonempty and not line.strip():
+            continue
+        yield lineno, line
+
+_authz_inheritance_tree = {}
+class resource(object):
+    def __init__(self, path, specs):
+        if path != '/' and path.endswith('/'):
+            path = path[:-1]
+        self.path = path
+        self.specs = specs.copy()
+
+    def __repr__(self):
+        return "<resource path=\"%s\" specs=\"%s\">" % (self.path,
+                                                        self.specs)
+
+    def userlist(self, name):
+        if name not in self.specs:
+            return []
+        l = []
+        for username in self.specs[name]:
+            #print "examining", line
+            if username.startswith('@'):
+                group = self._getgroup(username[1:])
+                l += [user.name for user in group.members]
+            elif username:
+                l.append(username)
+        return l
+
+    def _users(self):
+        for spec in self.specs:
+            for username in self.userlist(spec):
+                yield username
+    users = property(_users)
+
+    def asauthz(self):
+        path = self.path
+        lines = []
+        for mode, access in [('readwrite', 'rw'),
+                             ('readonly', 'r'),
+                             ('noaccess', '')]:
+            ulist = self.userlist(mode)
+            if ulist:
+                for username in ulist:
+                    if username == 'ANONYMOUS':
+                        username = '*'
+                    lines.append("%s = %s" % (username, access))
+        if lines:
+            lines.insert(0, "[%s]" % (path,))
+            lines.append("")
+        return '\n'.join(lines)
+
+    def ashtaccessconf(self, root, realm, htpasswd):
+        if root.endswith('/'):
+            root = root[:-1]
+        if 'ANONYMOUS' in self.userlist('noaccess'):
+            # don't inherit, ANONYMOUS really means '*' (this is how it
+            # ends up in the authz files, too, behaviour should be consistent
+            # this way)
+            users = ([], [])
+        else:
+            users = self._inherited_users()
+        for i, mode in enumerate(['readonly', 'readwrite']):
+            ulist = self.userlist(mode)
+            if ulist:
+                users[i][:] = list(ulist)
+        if 'ANONYMOUS' in users[0] + users[1]:
+            # we allow access to everyone, and reset inheritance
+            _authz_inheritance_tree[self.path] = ([], [])
+            return ''
+        _authz_inheritance_tree[self.path] = users
+        users = users[0] + users[1]
+        require = users and 'user %s' % (' '.join(users),) or 'valid-user'
+        return (
+            '<Location %s%s>\n'
+            '    AuthType Basic\n'
+            '    AuthName %s\n'
+            '    AuthUserFile %s\n'
+            '    require %s\n'
+            '</Location>\n' % (root, self.path, realm, htpasswd, require))
+
+    def _getgroup(self, name):
+        return uconf.system.Group(name)
+
+    def _inherited_users(self):
+        path = self.path.split('/')
+        while 1:
+            if len(path) <= 1:
+                break
+            path = path[:-1]
+            ppath = '/'.join(path)
+            users = _authz_inheritance_tree.get(ppath)
+            if users is not None:
+                return users
+        return ([], [])
+
+class svnconf(object):
+    """an svnaccess file
+
+        a more user-friendly file format for declaring authorization rules
+        (than the default authz format)
+    
+        basically all this does is parse the file to 'resources'
+    """
+    def __init__(self, path):
+        self.path = path
+        self._data = None
+
+    def _lines(self, rstrip=True, nonempty=True):
+        if self._data is None:
+            self._data = self.path.readlines(cr=0)
+        return enumlines(self._data, rstrip, nonempty)
+    lines = property(_lines)
+
+    def _resources(self):
+        specs = {}
+        path = None
+        indentation = ''
+        pathjustchanged = False
+        for i, line in self._lines(nonempty=False):
+            if line == line.lstrip(): # resource finished
+                assert line.find(':') == -1, (
+                       "colon in path: %s:%d" % (self.path, i+1))
+                if path is not None:
+                    yield resource(path, specs)
+                specs.clear()
+                path = line
+                pathjustchanged = True
+            else:
+                parts = line.split(':')
+                if pathjustchanged:
+                    pathjustchanged = False
+                    indentation = re.match('^\s*', line).group(0)
+                else:
+                    assert (line.startswith(indentation) and
+                            line[len(indentation)] not in ' \t\n'), (
+                            'indentation error: %s:%d' % (self.path, i+1))
+                assert len(parts) == 2, (
+                        "not in X:Y format: %s:%d" % (self.path, i+1))
+                assert path is not None, (
+                        "unknown path: %s:%d" % (self.path, i+1))
+                userlist = filter(None,
+                                  map(str.strip, parts[1].split(',')))
+                specs.setdefault(parts[0].strip(), []).extend(userlist)
+        if path is not None:
+            yield resource(path, specs)
+    resources = property(_resources)
+
+    def toauthz(self):
+        return authz(self.resources)
+
+    def tohtaccessconf(self, root, realm, htpasswd):
+        return htaccessconf(self.resources, root, realm, htpasswd)
+
+class pwdtsection(object):
+    group = None
+    specs = None
+
+    def __init__(self, group, specs):
+        self.group = group
+        self.update(specs)
+    
+    def update(self, specs):
+        assert sorted(specs.keys()) == ['sysgroups', 'writeformat',
+                                        'writepath']
+        if not isinstance(specs['sysgroups'], list):
+            specs['sysgroups'] = [s.strip() for s in
+                                  specs['sysgroups'].split(',')]
+        self.specs = specs.copy()
+
+    def __getattr__(self, name):
+        return self.specs[name]
+
+    def __repr__(self):
+        return "<%s group=\"%s\" specs=%r>" % (type(self).__name__,
+                                               self.group,
+                                               str(self.specs))
+
+# XXX this should be merged with /admin/bin/pwdtrigger at some point, I guess,
+# I decided not to use pwdtrigger because I couldn't even get its tests to
+# run... and it didn't serve my goals exactly either (but obviously that can
+# be changed, do need working tests though)
+class pwdtrigger(object):
+    """read from and write to the pwdtrigger file"""
+    def __init__(self, path):
+        self.path = path
+        self._data = None
+        self._parsed = None
+
+    def _parse(self):
+        if self._data is None:
+            self._data = cparser()
+            self._data.readfp(self.path.open('r'))
+        data = self._data
+        for group in data.sections():
+            s = pwdtsection(group, dict(data.items(group)))
+            yield s
+
+    def _sections(self):
+        if not self._parsed:
+            self._parsed = list(self._parse())
+        return self._parsed
+    sections = property(_sections)
+
+    def set(self, group, sysgroups, writeformat, writepath):
+        """set the specs for a group, overwriting if appropriate"""
+        sections = self.sections
+        specs = {'sysgroups': sysgroups,
+                 'writeformat': writeformat,
+                 'writepath': writepath}
+        for i, s in enumerate(sections):
+            if s.group == group:
+                s.update(specs)
+                break
+        else:
+            sections.append(pwdtsection(group, specs))
+
+    def __str__(self):
+        ret = []
+        for s in self.sections:
+            ret += ['[%s]' % (s.group,),
+                    'sysgroups = %s' % (', '.join(s.sysgroups),),
+                    'writeformat = %s' % (s.writeformat,),
+                    'writepath = %s' % (s.writepath,),
+                    '']
+        return '\n'.join(ret)
+
+    def save(self):
+        self.path.write(str(self))
+
+class authz(object):
+    """an authz file
+
+        has api to build and represent itself
+    """
+
+    debug = True
+    
+    add_group_command = '/usr/bin/groupadd -f %s'
+    add_users_command = '/usr/bin/gpasswd -M %s'
+    _log = py.std.sys.stderr
+    
+    def __init__(self, resources):
+        self.resources = list(resources)
+
+    def updatesystemgroups(self):
+        """adds groups on system level and adds users to them"""
+        for resource in self.resources:
+            group = self.pathtogroup(resource.path)
+            users = sorted(list(r for r in resource.users if r != 'ANONYMOUS'))
+            if users:
+                self._system(self.add_group_command % (group,), True)
+                self._system(self.add_users_command % ("%s" %
+                                                       (','.join(users),),))
+    
+    def updatepwdtrigger(self, pwdtrigger):
+        for resource in self.resources:
+            group = self.pathtogroup(resource.path)
+            pwdtrigger.set(group, [group], 'apache_auth',
+                           '/etc/apache2/conf/viewvc.%s.passwd' % (group,))
+
+    def pathtogroup(self, path):
+        return 'viewvc%s' % (path.replace('/', '_'),)
+
+    def write(self, path):
+        path.write(str(self))
+
+    def _system(self, cmd, ignoreerrors=False):
+        self._log.write('executing %r\n' % (cmd,))
+        if not self.debug:
+            try:
+                py.process.cmdexec(cmd)
+            except py.__.process.cmdexec.ExecutionFailed:
+                if not ignoreerrors:
+                    raise
+
+    def __str__(self):
+        """returns a serialized version of self"""
+        return '\n'.join([b.asauthz() for b in self.resources])
+
+class htaccessconf(object):
+    """an apache config file that controls access to directories
+    
+        can be included into other conf files
+
+        contains <Directory> sections for all paths, containing 
+    """
+
+    def __init__(self, resources, root, realm, htpasswd):
+        self.resources = list(resources)
+        self.root = root
+        self.realm = realm
+        self.htpasswd = htpasswd
+
+    def write(self, path):
+        path.write(str(self))
+        
+    def __str__(self):
+        """returns a serialized version of self"""
+        return '\n'.join([b.ashtaccessconf(self.root, self.realm, self.htpasswd)
+                          for b in self.resources])
+

File uconf/svnconfparser/test/__init__.py

+#

File uconf/svnconfparser/test/test_svnconfparser.py

+import py
+from StringIO import StringIO
+import ConfigParser
+from uconf.__.svnconfparser import svnconfparser
+
+# some helper stuff, fakes
+class _member(object):
+    def __init__(self, name):
+        self.name = name
+
+class _group(object):
+    def __init__(self, *members):
+        self.members = (_member(m) for m in members)
+
+def _getgroup(self, name):
+    if name == 'foo':
+        return _group('bar', 'baz')
+    elif name == 'baz':
+        return _group('qux', 'quux')
+    return _group()
+
+class FakeUconfBase:
+    def setup_class(cls):
+        cls._org_getgroup = svnconfparser.resource._getgroup
+        svnconfparser.resource._getgroup = _getgroup
+
+    def teardown_class(cls):
+        svnconfparser.resource._getgroup = cls._org_getgroup
+
+# the actual tests
+def test_enumlines_simple():
+    lines = [
+        'foo',
+        'bar',
+        'baz',
+    ]
+    assert list(svnconfparser.enumlines(lines)) == [
+        (0, 'foo'),
+        (1, 'bar'),
+        (2, 'baz'),
+    ]
+
+def test_enumlines_comments():
+    lines = [
+        'foo # a foo',
+        '# a bar',
+        'bar',
+    ]
+    assert list(svnconfparser.enumlines(lines, nonempty=False)) == [
+        (0, 'foo'),
+        (2, 'bar'),
+    ]
+
+def test_enumlines_comments_nonempty():
+    lines = [
+        'foo # a foo',
+        '# a bar',
+        'bar',
+    ]
+    assert list(svnconfparser.enumlines(lines, nonempty=True)) == [
+        (0, 'foo'),
+        (1, ''),
+        (2, 'bar'),
+    ]
+
+def test_enumlines_rstrip():
+    assert list(svnconfparser.enumlines(['foo '])) == [(0, 'foo')]
+    assert list(svnconfparser.enumlines(['foo '], rstrip=False)) == [(0, 'foo ')]
+
+class TestResource(FakeUconfBase):
+    def setup_method(self, method):
+        svnconfparser._authz_inheritance_tree = {}
+
+    def test_userlist_username(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        assert b.userlist('readonly') == ['foo']
+
+    def test_userlist_groupname(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['@foo']})
+        assert b.userlist('readonly') == ['bar', 'baz']
+
+    def test_userlist_combined(self):
+        b = svnconfparser.resource('/foo/', {'readonly': ['foo', 'bar', '@baz']})
+        assert b.userlist('readonly') == ['foo', 'bar', 'qux', 'quux']
+
+    def test_userlist_nospec(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        assert b.userlist('readwrite') == []
+    
+    def test_users(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo'],
+                                            'readwrite': ['@baz']})
+        users = sorted(list(b.users))
+        assert users == ['foo', 'quux', 'qux']
+
+    def test_asauthz_one(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        assert b.asauthz() == '\n'.join([
+            '[/foo]',
+            'foo = r',
+            '',
+        ])
+
+    def test_asauthz_multiple(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo', 'bar'],
+                                            'readwrite': ['baz', 'qux'],
+                                            'noaccess': ['quux']})
+        assert b.asauthz() == '\n'.join([
+            '[/foo]',
+            'baz = rw',
+            'qux = rw',
+            'foo = r',
+            'bar = r',
+            'quux = ',
+            '',
+        ])
+
+    def test_asauthz_multiple_noaccess_anonymous(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo', 'bar'],
+                                            'readwrite': ['baz', 'qux'],
+                                            'noaccess': ['ANONYMOUS']})
+        print b.asauthz()
+        assert b.asauthz() == '\n'.join([
+            '[/foo]',
+            'baz = rw',
+            'qux = rw',
+            'foo = r',
+            'bar = r',
+            '* = ',
+            '',
+        ])
+
+    def test_asauthz_anon(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['ANONYMOUS']})
+        assert b.asauthz() == '\n'.join([
+            '[/foo]',
+            '* = r',
+            '',
+        ])
+
+    def test_ashtaccessconf_one(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        assert b.ashtaccessconf('/viewvc', 'MyRealm', '/var/www/.htpasswd') ==\
+            '\n'.join([
+                '<Location /viewvc/foo>',
+                '    AuthType Basic',
+                '    AuthName MyRealm',
+                '    AuthUserFile /var/www/.htpasswd',
+                '    require user foo',
+                '</Location>',
+                '',
+            ])
+    
+    def test_ashtaccesscont_multiple(self):
+        b = svnconfparser.resource('/foo/bar/', {'readonly': ['foo', 'bar'],
+                                 'readwrite': ['baz', 'qux'],
+                                 'noaccess': ['quux']})
+        htac = b.ashtaccessconf('/root', 'svn-access',
+                                '/var/www/.htpasswd')
+        print htac
+        expected = '\n'.join([
+            '<Location /root/foo/bar>',
+            '    AuthType Basic',
+            '    AuthName svn-access',
+            '    AuthUserFile /var/www/.htpasswd',
+            '    require user foo bar baz qux',
+            '</Location>',
+            '',
+        ])
+        print expected
+        assert htac == expected
+
+    def test_ashtaccessconf_noanon(self):
+        b = svnconfparser.resource('/foo', {'noaccess': ['ANONYMOUS']})
+        assert b.ashtaccessconf('/root', 'MyRealm', '/foo') == '\n'.join([
+            '<Location /root/foo>',
+            '    AuthType Basic',
+            '    AuthName MyRealm',
+            '    AuthUserFile /foo',
+            '    require valid-user',
+            '</Location>',
+            '',
+        ])
+
+    def test_ashtaccessconf_anon(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['ANONYMOUS']})
+        assert b.ashtaccessconf('/root', 'MyRealm', '/foo') == ''
+
+    def test_ashtaccessconf_noroot(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        assert b.ashtaccessconf('', 'MyRealm', '/foo') == '\n'.join([
+            '<Location /foo>',
+            '    AuthType Basic',
+            '    AuthName MyRealm',
+            '    AuthUserFile /foo',
+            '    require user foo',
+            '</Location>',
+            '',
+        ])
+
+    def test_ashtaccessconf_rootwithslash(self):
+        b = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        assert b.ashtaccessconf('/root/', 'MyRealm', '/foo') == \
+            '\n'.join([
+                '<Location /root/foo>',
+                '    AuthType Basic',
+                '    AuthName MyRealm',
+                '    AuthUserFile /foo',
+                '    require user foo',
+                '</Location>',
+                '',
+            ])
+
+    def test_ashtaccessconf_inheritance(self):
+        b1 = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        b2 = svnconfparser.resource('/foo/bar', {'readwrite': ['bar']})
+        b1.ashtaccessconf('/root', 'MyRealm', '/foo')
+        htac = b2.ashtaccessconf('/root', 'MyRealm', '/foo')
+        assert htac == '\n'.join([
+            '<Location /root/foo/bar>',
+            '    AuthType Basic',
+            '    AuthName MyRealm',
+            '    AuthUserFile /foo',
+            '    require user foo bar',
+            '</Location>',
+            '',
+        ])
+
+    def test_ashtaccessconf_inheritance_override(self):
+        b1 = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        b2 = svnconfparser.resource('/foo/bar', {'readonly': ['bar']})
+        b1.ashtaccessconf('/root', 'MyRealm', '/foo')
+        htac = b2.ashtaccessconf('/root', 'MyRealm', '/foo')
+        assert htac == '\n'.join([
+            '<Location /root/foo/bar>',
+            '    AuthType Basic',
+            '    AuthName MyRealm',
+            '    AuthUserFile /foo',
+            '    require user bar',
+            '</Location>',
+            '',
+        ])
+
+    def test_ashtaccessconf_inheritance_anonymous_override(self):
+        b1 = svnconfparser.resource('/foo', {'readonly': ['foo']})
+        b2 = svnconfparser.resource('/foo/bar', {'readonly': ['ANONYMOUS']})
+        b3 = svnconfparser.resource('/foo/bar/baz', {'readonly': ['bar']})
+        b1.ashtaccessconf('/root', 'MyRealm', '/foo')
+        b2.ashtaccessconf('/root', 'MyRealm', '/foo')
+        htac = b3.ashtaccessconf('/root', 'MyRealm', '/foo')
+        assert htac == '\n'.join([
+            '<Location /root/foo/bar/baz>',
+            '    AuthType Basic',
+            '    AuthName MyRealm',
+            '    AuthUserFile /foo',
+            '    require user bar',
+            '</Location>',
+            '',
+        ])
+
+    def test_ashtaccessconf_inheritance_anonymous_noaccess_override(self):