Source

vasm / src / vasm / backend / useradmin.py

Full commit
#!/usr/bin/env python

#    This file is part of VASM.
#
#    VASM is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License v3 as published by
#    the Free Software Foundation.
#
#    VASM is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with VASM.  If not, see <http://www.gnu.org/licenses/>.

__author__ = "Moises Henriquez"
__author_email__ = "moc.liamg@xnl.E0M"[::-1]

import pwd
import grp
import crypt
import shutil
import os
import subprocess as sp
from utils import get_popen

class EmptyAttributeError(Exception):
    def __init__(self, message):
        Exception.__init__(self)
        self.message = message

# __distro_groups__ is a dictionary of the default vectorlinux
# user groups and their default value (True or False) for new user accounts.

__distro_groups__ = {'disk': ('Access system storage devices', True),
    'lp': ('User is allowed to print', True),
    'floppy': ('User is allowed to use floppy drives', True),
    'audio': ("User is allowed to use audio devices in the system", True),
    "video": ("Use video devices in the system (TV card/webcam/etc)", True),
    "scanner" : ("Access to scanning devices", True),
    "cdrom": ("Use system CD/DVD/BR devices", True),
    "games": ("Play games on this system", True),
    "slocate": ("Find files using `slocate` on the system.", True),
    "plugdev": ("Use removable media devices", True),
    "power": ("Shutdown/restart/suspend/hibernate the system", True),
    "netdev": ("Modify system network connections", True),
    "adm": ("System Administrator", False),
    "sys": ("Services Administrator", False),
    "wheel": ("Elite user", False)
}

class AccountError(Exception):
    pass

class User(object):
    """ convinience methods for working with a single user account """
    def __init__(self, login=None, new=False):
        self.login = login
        uinfo = None
        self.password = None # The password is always none, on new accounts, the password is stored here
        if not new:
            uinfo = [p for p in pwd.getpwall() if p.pw_name == login][0]
            self.homedir = uinfo.pw_dir
            self.uid = uinfo.pw_uid
            self.fullname = uinfo.pw_gecos
            self.shell = uinfo.pw_shell
            # get the group listing
            self.groups = []
            for line in grp.getgrall():
                if self.login in line[-1]:
                    self.groups.append(line[0])
            if os.path.exists(os.path.join(self.homedir, '.face')):
                self.faceicon = os.path.join(self.homedir, '.face')
            else:
                self.faceicon = None
        else:
            self.homedir = None
            self.uid = None
            self.fullname = None
            self.groups = None
            self.faceicon = None
            self.shell = None
            self.password = None

    def save_groups_changes(self):
        """ Save the group membership changes for this account """
        # After self.groups has been set, call this method to save the changes.
        assert self.groups is not None, "Empty groups attribute.  Please set the groups value first."
        assert self.login is not None, "User account does not exist in this system yet.  New account?"
        proc = get_popen([
            "/usr/sbin/usermod","-G",",".join(self.groups), self.login
            ])
        out, err = proc.communicate()
        retv = proc.returncode
        if retv > 0:
            raise AccountError(err)
        return

class UserModel(object):
    """ Model to be used for creating and managing user accounts """
    def __init__(self):
        # Make a place holder for observables
        self.watchers = []

    def hasUser(self, login):
        """ Check if we already have an account for login"""
        for entry in pwd.getpwall():
            if entry.pw_name == login:
                return True
        return False

    def getUser(self, login):
        """ Return a User object for the user whose login == login """
        for entry in pwd.getpwall():
            if entry.pw_name == login:
                user = User(login, new=False)
                return user
        return None
    
    def add_observer(self, method):
        """ Add method to the list of obserbables to be notified of any changes """
        self.watchers.append(method)

    def notify(self):
        """ Notify any observers of the changes in the current user setup """
        for f in self.watchers:
            f(self)
        return

    def _check_user_object(self, usuario):
        """ Run some checks on usuario"""
        assert isinstance(usuario, User), "User object is of invalid type."
        if not hasattr(usuario, 'login') or usuario.login is None:
            raise EmptyAttributeError('Login attribute is empty.  Expected a string')
        if not hasattr(usuario, 'password') or usuario.password is None:
            raise EmptyAttributeError('Password attribute is empty.  Expected a string.')
        if not hasattr(usuario, 'groups') or usuario.groups is None or type(usuario.groups) is not list:
            raise EmptyAttributeError('Empty groups attribute.  Expected a list.')
        if not hasattr(usuario, 'password') or usuario.password is None:
            raise EmptyAttributeError('Empty password attribute.  Expected a string.')
        assert len(usuario.password) >= 5, "Provided password for new user is too short."
        # finally check that the user model does not have this user in the sytem
        assert self.hasUser(usuario.login) is False, "An user account with this login name already exists."
        return

    def addUser(self, userobject=None):
        """ Add the oserobject as a new user account on the system """
        # Run some checks on the provided object
        assert userobject not in ("", None), "Invalid user object argument."
        self._check_user_object(userobject)
        # if we still have not excepted, go ahead and create the account.
        salt = userobject.password[0] + userobject.password[-1]
        epass = crypt.crypt(userobject.password, salt)
        if hasattr(userobject, 'fullname'):
            # use get_popen
            cm = ['/usr/sbin/useradd', '-m','-c', "%s"% userobject.fullname,
                  '-s', '/bin/bash', '-g', 'users','-G', ','.join(userobject.groups),
                  '-p', epass, userobject.login]
        else:
            cm = ['/usr/sbin/useradd', '-m', '-s', '/bin/bash', '-g', 'users',
                  '-G', ','.join(userobject.groups), '-p', epass, userobject.login]
        # Run the popen process
        proc = get_popen(cm)
        out, err = proc.communicate()
        rval = proc.returncode
        # make sure the process ended well.
        assert rval == 0, "Error while adding user to system. %s"% err
        # setup the home directory
        self._setup_user_home(userobject.login, userobject.faceicon)
        return self.notify()

    def deleteUser(self, uname):
        """ remove a user account.
        Arguments: uname = login name for the account to be deleted."""
        assert self.hasUser(uname), "User account does not exist"
        
        cmd = ["/usr/sbin/userdel", "-r", uname]
        proc = get_popen(cmd)
        out, err = proc.communicate()
        ret = proc.returncode
        assert ret == 0, "Error while deleting user %s"% err
        return self.notify()

    def _setup_user_home(scelf, login, faceicon=None):
        """ setup the default user home for login"""
        skelpath = "/etc/skel"
        targetdir = "/home/%s"% login
        
        # FIXME: Add some checks on these operations?
        
        # start with the face icon
        if faceicon is not None:
            targeticon = os.path.join(targetdir, ".face")
            shutil.copy(faceicon, targeticon)
            # fix the permissions for this
            perm = ['/bin/chown','%s:user'% login, targeticon]
            proc = get_popen(perm)
            proc.communicate()
        
        #copy the default stuff from /etc/skel
        for skel in os.listdir(skelpath):
            com = ['/bin/cp','-ar', os.path.join(skelpath, skel), targetdir]
            # com = "/bin/cp -ar %s %s"% (os.path.join(skelpath, skel), targetdir)
            proc = get_popen(com)
            proc.communicate()
            # fix permissions
            perm = ['/bin/chown','%s:users'% login, os.path.join(targetdir, skel)]
            #perm = "/bin/chown %s:users %s"% (
            #    login, os.path.join(targetdir, skel))
            proc = get_popen(perm)
            proc.communicate()
        # ensure permissions for the entire home dir
        com = ['/bin/chmod','0700', targetdir]
        #com = "/bin/chmod 0700 %s"% targetdir
        proc = get_popen(com)
        proc.communicate()
        com = ['/bin/chown', '-R', login, targetdir]
        #com = '/bin/chown -R %s %s'% (login, targetdir)
        proc = get_popen(com)
        proc.communicate()
        
        return proc.returncode
        
    def listUsers(self):
        """ return a list of tuples to represent current user accounts.
        (loginame, uid, user_home)
        """        
        # pwd row results = (0=login, 2=uid, 5=homedir)
        ret = []
        for p in pwd.getpwall():
            if p[2] == 0 or p[2] >= 1000:
                user = User(p[0], new=False)
                ret.append(user)
        return ret