Source

vinstall / vinstall / backend / users.py

Full commit
#!/bin/env python
# coding: utf8

#    This file is part of vinstall.
#
#    vinstall is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License v3 as published by
#    the Free Software Foundation.
#
#    vinstall is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with vinstall.  If not, see <http://www.gnu.org/licenses/>.

__author__ = "Moises Henriquez"
__author_email__ = "moc.liamg@xnl.E0M"[::-1]

"""Api for managing and creating user accounts on VectorLinux"""
import pwd
import grp
import crypt
import os
import subprocess as sp
import unittest
from utils import Chroot
#from vinstall.backend.utils import Chroot


class User(object):
    DEFAULT_MEMBERSHIP=('disk','lp',
                        'floppy','audio','video',
                        'cdrom','games','slocate',
                        'plugdev','netdev','scanner',
                        'users', 'wheel')
    DEFAULT_MEMBERSHIP_ROOT=('root','bin','daemon','sys','adm','disk','wheel')

    def __init__(self):
        self._data = None
        self._root = "/"
        self.login = None
        self.password = None
        self.fullname = None

    def _setup_home(self):
        """Setup the default home directory for this user"""
        # This should not be needed, but just in case someone
        # decides to call this directly.
        assert self._system_data is not None, "User account not in system"
        assert self.login is not None, "Does this account exist yet?"
        SKELPATH = "/etc/skel"
        TARGETDIR= self.home

        topdir, dirnames, files = os.walk(SKELPATH).next()
        
        # Copy the skel files to the home dir
        # FIXME:  Does useradd do this by default on VL?
        for item in os.listdir(SKELPATH):
            fcopy = ["/bin/cp", "-aru", os.path.join(SKELPATH, item), 
                   os.path.join(TARGETDIR, item)]
            fchown = ["/bin/chown", self.login, os.path.join(TARGETDIR, item)]
            fchgrp = ["/bin/chgrp", self.login, os.path.join(TARGETDIR, item)]
            for fun in (fcopy, fchown, fchgrp):
                sp.check_call(fun)



        # Set permissions to the items in home directory.
        cmd = ["/bin/chmod", "0700", TARGETDIR]
        sp.check_call(cmd)

        cmd = ["/bin/chown", "-R", self.login, TARGETDIR ]
        return sp.check_call(cmd)

    def _encrypt_password(self, passwd):
        """Return the encrypted password"""
        salt = passwd[-1] + passwd[1]
        return crypt.crypt(passwd, salt)

    def create(self):
        """Create the user account on the system"""
        assert self._system_data is None, "User account already exists"
        assert self.password is not None, "Password attribute has not been set"
        assert self.login is not None, "Login attribute has not been set"
        epass = self._encrypt_password(self.password)

        # Add the group
        cmd = ["/usr/sbin/groupadd", "-g", str(self.uid), self.login ]
        sp.check_call(cmd)
        if self.fullname:
            cmd = ["/usr/sbin/useradd","-m","-c","%s"% self.fullname,
                  "-s","/bin/bash", "-g", self.login,
                  "-G", ",".join(self.DEFAULT_MEMBERSHIP), "-p", epass,
                  self.login]
        else:
            cmd = ["/usr/sbin/useradd","-m", "-s", "/bin/bash", "-g", self.login,
                  "-G", ",".join(self.DEFAULT_MEMBERSHIP), "-p", epass,
                   self.login]

        # Launch the command
        # FIXME:  Is subprocess.call the right method to use?
        # subprocess.Popen offers stdout, stderr and returnvalue.
        sp.check_call(cmd)        

        return self._setup_home()

    def change_password(self, newpass):
        """Change a users password"""
        assert len(newpass) > 4, "Invalid password"
        assert self._system_data is not None, "User does not exist in the system."
        # Not needed for the installer.  Only for VASM
        epass = self._encrypt_password(newpass)
        assert epass != newpass, "Password cannot be encrypted correctly"
        cmd = ["/usr/sbin/usermod", "-p", epass, self.login]
        return sp.check_call(cmd)

    def delete(self):
        """delete this user account"""
        # if self._data is None, this user account does not yet exist.

        assert self._system_data is not None, "User not in system"
        cmd = [ "/usr/sbin/userdel", "-r", self.login]
        return sp.check_call(cmd)

    def set_initial_group(self, grouname="users"):
        """Set the initial group for this account.  This is normally 
        'users' for human user accounts"""
        assert self._system_data is not None, "User does not exist in the system."
        cmd = [ "/usr/sbin/usermod", "-g", groupname, self.login]
        return sp.check_call(cmd)

    def set_supplementary_groups(self, grouplist=[]):
        """Set the supplementary group memberships for this user"""
        assert self._system_data is not None, "User does not exist in the system."
        assert isinstance(grouplist, list), "grouplist argument must be a list"
        cmd = [ "/usr/sbin/usermod", "-G", ",".join(grouplist), self.login ]
        return sp.check_call(cmd)

    def add_to_group(self, group):
        """Add this user account to the specified group """
        assert self._system_data is not None, "User not in system yet"
        assert isinstance(group, str), "Group argument must be a string"
        allgroups = [ g.gr_name for g in grp.getgrall() ]
        assert group in allgroups, "Group %s does not exist in the system."% group
        
        cmd = [ "/usr/sbin/usermod", "-a", group, self.login ]
        return sp.check_call(cmd)

    @property
    def _system_data(self):
        """Return the system data related to this account"""
        assert self.login is not None, "Login property must be set first"
        ret = [ u for u in pwd.getpwall() if u.pw_name == self.login ]
        if ret:
            return ret[0]

    @property
    def _next_available_uid(self):
        """Find the next available uid value"""
        with Chroot(self._root):
            ids = sorted([u.pw_uid for u in pwd.getpwall() if u.pw_uid >= 1000])
            # FIXME: Is this fail-proof ?
            if ids:
                return max(ids) + 1
            return 1000

    @property
    def _next_available_gid(self):
        """Find the next available gid value"""
        with Chroot(self._root):
            gids = sorted([ g.gr_gid for g in grp.getgrall() if g.gr_gid >= 1000 ])
            if gids: return max(gids) + 1
            return 1000

    @property
    def uid(self):
        """Return the uid value for this user account"""
        if self._system_data:
            return self._system_data.pw_uid
        elif self.login == "root":
            return 0
        else:
            # Find the next available uid
            return self._next_available_uid

    @property
    def home(self):
        if self._system_data:
            return self._system_data.pw_dir
        if self.login == "root":
            return "/root"
        return os.path.join("/home",self.login)
    
    @property
    def gid(self):
        """Return the gid value for this user or the next available value"""
        if self._data:
            return self._data.pw_gid
        if self.login == "root":
            return 0
        return self._next_available_gid

    @property
    def groups(self):
        if self._data or self._system_data:
            # account exists, read the group list
            return [ g.gr_name for g in grp.grpgrall() if self.login in g.gr_mem]
        return []

    @classmethod
    def all(cls, root="/"):
        with Chroot(root):
            for entity in pwd.getpwall():
                if entity.pw_uid >= 1000:
                    account = cls()
                    account._root = root
                    account._data = entity
                    account.login = entity.pw_name
                    yield account


class Tests(unittest.TestCase):
    def setUp(self):
        self.fakeuser = User()
        self.root = "/"
        self.fakeuser.login = "fakeuser"
        self.fakeuser.password = "fakepass"
        self.fakeuser._root = self.root

    def test_invalid_password(self):
        self.assertRaises(AssertionError,
                          self.fakeuser.change_password,
                          '')
        for x in xrange(0,4):
            self.assertRaises(AssertionError,
                              self.fakeuser.change_password,
                              "o"*x
                              )
            return

    def test_pasword_encryption(self):
        """Make sure password encryption works"""
        return self.assertNotEqual(self.fakeuser.password,
                                   self.fakeuser._encrypt_password(
                self.fakeuser.password))

    def test_delete_non_existing_account(self):
        return self.assertRaises(AssertionError,
                                 self.fakeuser.delete)

    def test_add_user_to_nonexistant_group(self):
        return self.assertRaises(AssertionError, self.fakeuser.add_to_group,
                                 "fakenewgroup")

    def test_add_nonexistant_user_to_group(self):
        return self.assertRaises(AssertionError, self.fakeuser.add_to_group,
                                 "wheel")

    def test_set_supplementary_groups_argument(self):
        return self.assertRaises(AssertionError,
                                 self.fakeuser.set_supplementary_groups,
                                 "foo, bar, nogroup")
    
    def test_setup_home_for_invalid_user(self):
        return self.assertRaises(AssertionError,
                                 self.fakeuser._setup_home)

    def test_user_created(self):
        self.fakeuser.create()
        ruser = [ u for u in User.all() if u.login == self.fakeuser.login][0]
        self.assertEqual(ruser.login, self.fakeuser.login)
        # This should raise an exception because it already exists
        self.assertRaises(AssertionError,
                          self.fakeuser.create)
        # Delete the account
        self.fakeuser.delete()
        
    
    def test_groups_retval(self):
        return self.assertIsInstance(self.fakeuser.groups, list)

    def test_uid_retval(self):
        return self.assertIsInstance(self.fakeuser.uid, int)

    def test_gid_retval(self):
        return self.assertIsInstance(self.fakeuser.gid, int)

    def test_home_retval(self):
        return self.assertIsInstance(self.fakeuser.home, str)


if __name__ == '__main__':
    assert os.getuid() == 0, "Must be root to run these tests"
    unittest.main()