Commits

Anonymous committed d5bab30

Adding module for managing user accounts

Comments (0)

Files changed (1)

vinstall/backend/users.py

+#!/bin/env python
+# coding: utf8
+
+#    This file is part of vinstall.
+#
+#    vinstall is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License v3 as published by
+#    the Free Software Foundation.
+#
+#    vinstall is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with vinstall.  If not, see <http://www.gnu.org/licenses/>.
+
+__author__ = "Moises Henriquez"
+__author_email__ = "moc.liamg@xnl.E0M"[::-1]
+
+"""Api for managing and creating user accounts on VectorLinux"""
+import pwd
+import grp
+import crypt
+import shutil
+import os
+import subprocess as sp
+import unittest
+from utils import Chroot
+#from vinstall.backend.utils import Chroot
+
+class User(object):
+    DEFAULT_MEMBERSHIP=('disk','lp',
+                        'floppy','audio','video',
+                        'cdrom','games','slocate',
+                        'plugdev','netdev','scanner',
+                        'users', 'wheel')
+
+    def __init__(self):
+        self._data = None
+        self._root = "/"
+        self.login = None
+        self.password = None
+        self.fullname = None
+
+    def _setup_home(self):
+        pass
+
+    def _encrypt_password(self, passwd):
+        """Return the encrypted password"""
+        salt = passwd[-1] + passwd[1]
+        return crypt.crypt(passwd, salt)
+
+    def create(self):
+        """Create the user account on the system"""
+        assert self._system_data is None, "User account already exists"
+        assert self.password is not None, "Password attribute has not been set"
+        assert self.login is not None, "Login attribute has not been set"
+        epass = self._encrypt_password(self.password)
+        if self.fullname:
+            cmd = ["/usr/sbin/useradd","-m","-c","%s"% self.fullname,
+                  "-s","/bin/bash","-g","users",
+                  "-G", ",".join(self.DEFAULT_MEMBERSHIP), "-p", epass,
+                  self.login]
+        else:
+            cmd = ["/usr/sbin/useradd","-m", "-s", "/bin/bash", "-g", "users",
+                  "-G", ",".join(self.DEFAULT_MEMBERSHIP), "-p", epass,
+                  self.login]
+
+        # Launch the command
+        # FIXME:  Is subprocess.call the right method to use?
+        # subprocess.Popen offers stdout, stderr and returnvalue.
+        sp.check_call(cmd)        
+
+        self._setup_home()
+        pass
+
+    def change_password(self, newpass):
+        """Change a users password"""
+        assert len(newpass) > 4, "Invalid password"
+        # Not needed for the installer.  Only for VASM
+        pass
+
+    def delete(self):
+        """delete this user account"""
+        # if self._data is None, this user account does not yet exist.
+
+        assert self._system_data is not None, "User not in system"
+        cmd = [ "/usr/sbin/userdel", "-r", self.login]
+        sp.check_call(cmd)
+
+    @property
+    def _system_data(self):
+        """Return the system data related to this account"""
+        ret = [ u for u in pwd.getpwall() if u.pw_name == self.login ]
+        if ret:
+            return ret[0]
+
+    @property
+    def _next_available_uid(self):
+        """Find the next available uid value"""
+        with Chroot(self._root):
+            ids = sorted([ u.pw_uid for u in pwd.getpwall() if u.pw_uid >= 1000 ])
+            # FIXME: Is this fail-proof ?
+            if ids:
+                return max(ids) + 1
+            return 1000
+
+    @property
+    def _next_available_gid(self):
+        """Find the next available gid value"""
+        with Chroot(self._root):
+            gids = sorted([ g.gr_gid for g in grp.getgrall() if g.gr_gid >= 1000 ])
+            if gids: return max(gids) + 1
+            return 1000
+
+
+    @property
+    def uid(self):
+        """Return the uid value for this user account"""
+        if self._data:
+            return self._data.pw_uid
+        else:
+            # Find the next available uid
+            return self._next_available_uid
+
+    @property
+    def home(self):
+        if self._data:
+            return self._data.pw_dir
+        return os.path.join("/home",self.login)
+    
+    @property
+    def gid(self):
+        """Return the gid value for this user or the next available value"""
+        if self._data:
+            return self._data.pw_gid
+        return self._next_available_gid
+
+
+    @property
+    def groups(self):
+        if self._data:
+            # account exists, read the group list
+            return [ g.gr_name for g in grp.grpgrall() if self.login in g.gr_mem]
+        return []
+
+    @classmethod
+    def all(cls, root="/"):
+        with Chroot(root):
+            for entity in pwd.getpwall():
+                if entity.pw_uid >= 1000:
+                    account = cls()
+                    account._root = root
+                    account._data = entity
+                    account.login = entity.pw_name
+                    yield account
+
+
+class Tests(unittest.TestCase):
+    def setUp(self):
+        self.fakeuser = User()
+        self.root = "/mnt/TARGET"
+        self.fakeuser.login = "fakeuser"
+        self.fakeuser.password = "fakepass"
+        self.fakeuser._root = self.root
+
+    def test_invalid_password(self):
+        self.assertRaises(AssertionError,
+                          self.fakeuser.change_password,
+                          '')
+        for x in xrange(0,4):
+            self.assertRaises(AssertionError,
+                              self.fakeuser.change_password,
+                              "o"*x
+                              )
+            return
+
+    def test_pasword_encryption(self):
+        """Make sure password encryption works"""
+        return self.assertNotEqual(self.fakeuser.password,
+                                   self.fakeuser._encrypt_password(
+                self.fakeuser.password))
+
+    def test_delete_non_existing_account(self):
+        return self.assertRaises(AssertionError,
+                                 self.fakeuser.delete)
+
+    def test_user_created(self):
+        self.fakeuser.create()
+        ruser = [ u for u in User.all() if u.login == self.fakeuser.login][0]
+        self.assertEqual(ruser.login, self.fakeuser.login)
+        # This should raise an exception because it already exists
+        self.assertRaises(AssertionError,
+                          self.fakeuser.create)
+        # Delete the account
+        self.fakeuser.delete()
+        
+    
+    def test_groups_retval(self):
+        return self.assertIsInstance(self.fakeuser.groups, list)
+
+    def test_uid_retval(self):
+        return self.assertIsInstance(self.fakeuser.uid, int)
+
+    def test_gid_retval(self):
+        return self.assertIsInstance(self.fakeuser.gid, int)
+
+    def test_home_retval(self):
+        return self.assertIsInstance(self.fakeuser.home, str)
+
+
+if __name__ == '__main__':
+    assert os.getuid() == 0, "Must be root to run these tests"
+    unittest.main()