Source

vinstall / vinstall / backend / users.py

#!/bin/env python
# coding: utf8

#    This file is part of vinstall.
#
#    vinstall is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License v3 as published by
#    the Free Software Foundation.
#
#    vinstall is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with vinstall.  If not, see <http://www.gnu.org/licenses/>.

__author__ = "Moises Henriquez"
__author_email__ = "moc.liamg@xnl.E0M"[::-1]


"""Api for managing and creating user accounts on VectorLinux

"""


import pwd
import grp
import crypt
import random
import string
import os
import unittest
from vinstall.backend import sp
from utils import Chroot


class User(object):
    DEFAULT_MEMBERSHIP=('disk','lp',
                        'floppy','audio','video',
                        'cdrom','games','slocate',
                        'plugdev','netdev','scanner',
                        'users', 'wheel')
    DEFAULT_MEMBERSHIP_ROOT=('root','bin','daemon','sys','adm','disk','wheel')

    def __init__(self):
        self._data = None
        self._root = "/"
        self.login = None
        self.password = None
        self.fullname = None

    def encrypt_password(self, passwd):
        """Return the encrypted password"""
        s = string.ascii_letters + string.digits + "./"
	salt = "$6$%s$" % ''.join(random.choice(s) for x in range(6))
        return crypt.crypt(passwd, salt)

    def create(self):
        """Create the user account on the system"""
        epass = self.encrypt_password(self.password)
        cmd = ["/usr/sbin/groupadd", "-g", str(self.gid), self.login ]
        sp.check_call(cmd)
        if self.fullname:
            cmd = ["/usr/sbin/useradd","-m","-c","%s"% self.fullname,
                  "-s","/bin/bash", "-g", self.login,
                  "-G", ",".join(self.DEFAULT_MEMBERSHIP), "-p", epass,
                  self.login]
        else:
            cmd = ["/usr/sbin/useradd","-m", "-s", "/bin/bash", "-g", self.login,
                  "-G", ",".join(self.DEFAULT_MEMBERSHIP), "-p", epass,
                   self.login]
        sp.check_call(cmd)

    def change_password(self, newpass):
        """Change a users password"""
        epass = self.encrypt_password(newpass)
        cmd = ["/usr/sbin/usermod", "-p", epass, self.login]
        return sp.check_call(cmd)

    def delete(self):
        """delete this user account"""
        cmd = [ "/usr/sbin/userdel", "-r", self.login]
        return sp.check_call(cmd)

    def set_initial_group(self, groupname="users"):
        """Set the initial group for this account.  This is normally
        'users' for human user accounts"""
        cmd = [ "/usr/sbin/usermod", "-g", groupname, self.login]
        return sp.check_call(cmd)

    def set_supplementary_groups(self, grouplist=[]):
        """Set the supplementary group memberships for this user"""
        cmd = [ "/usr/sbin/usermod", "-G", ",".join(grouplist), self.login ]
        return sp.check_call(cmd)

    def add_to_group(self, group):
        """Add this user account to the specified group """
        allgroups = [ g.gr_name for g in grp.getgrall() ]
        cmd = [ "/usr/sbin/usermod", "-a", group, self.login ]
        return sp.check_call(cmd)

    def _system_data(self):
        """Return the system data related to this account"""
        ret = [ u for u in pwd.getpwall() if u.pw_name == self.login ]
        if ret:
            return ret[0]

    def _next_available_uid(self):
        """Find the next available uid value"""
        with Chroot(self._root):
            ids = [u.pw_uid for u in pwd.getpwall() if u.pw_uid >= 1000]
            if ids:
                return max(ids) + 1
            return 1000

    def _next_available_gid(self):
        """Find the next available gid value"""
        with Chroot(self._root):
            gids = [ g.gr_gid for g in grp.getgrall() if g.gr_gid >= 1000 ]
            if gids:
                return max(gids) + 1
            return 1000

    @property
    def uid(self):
        """Return the uid value for this user account"""
        if self._system_data():
            return self._system_data().pw_uid
        elif self.login == "root":
            return 0
        else:
            return self._next_available_uid()

    @property
    def home(self):
        if self._system_data():
            return self._system_data().pw_dir
        if self.login == "root":
            return "/root"
        return os.path.join("/home", self.login)

    @property
    def gid(self):
        """Return the gid value for this user or the next available value"""
        if self._data:
            return self._data.pw_gid
        if self.login == "root":
            return 0
        return self._next_available_gid()

    @property
    def groups(self):
        if self._data or self._system_data():
            # account exists, read the group list
            return [ g.gr_name for g in grp.grpgrall() if self.login in g.gr_mem]
        return []

    @classmethod
    def all(cls, root="/"):
        with Chroot(root):
            for entity in pwd.getpwall():
                if entity.pw_uid >= 1000:
                    account = cls()
                    account._root = root
                    account._data = entity
                    account.login = entity.pw_name
                    yield account


class UserTestCase(unittest.TestCase):

    def setUp(self):
        self.fakeuser = User()
        self.root = "/"
        self.fakeuser.login = "fakeuser"
        self.fakeuser.password = "fakepass"
        self.fakeuser._root = self.root

    def test_user_created(self):
        self.fakeuser.create()
        ruser = [ u for u in User.all() if u.login == self.fakeuser.login][0]
        self.assertEqual(ruser.login, self.fakeuser.login)
        self.fakeuser.delete()


if __name__ == '__main__':
    assert os.getuid() == 0, "Must be root to run these tests"
    unittest.main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.