Commits

Anonymous committed 133dafb

Refactoring USERADMIN and it's tests

  • Participants
  • Parent commits 3e98e18

Comments (0)

Files changed (2)

File modules/support/vectorlinux/USERADMIN.py

 import subprocess as sp
 from utils import get_popen
 
+class EmptyAttributeError(Exception):
+    def __init__(self, value):
+        self.value = value
+
+    def __str__(self):
+        return repr(self.value)
+        
 
 # __distro_groups__ is a dictionary of the default vectorlinux
 # user groups and their default value (True or False) for new user accounts.
     "wheel": ("Elite user", False)
 }
 
-def test_login_availability(login):
-    all=[]
-    for entry in pwd.getpwall():
-        all.append(entry.pw_name)
-    return login not in all
-
-def run_popen(cmd):
-    #print "running %s" %cmd
-    return sp.Popen(cmd.split(), stdout=sp.PIPE, stderr=sp.STDOUT)
-
-    
 class User(object):
     """ convinience methods for working with a single user account """
-    def __init__(self, login):
+    def __init__(self, login=None, new=False):
         self.login = login
-    
-    def get_home_dir(self):
-        return pwd.getpwnam(self.login)[5]
-    
-    def get_login_name(self):
-        return pwd.getpwnam(self.login)[0]
-    
-    def get_uid(self):
-        return pwd.getpwnam(self.login)[2]
-    
-    def get_shell(self):
-        return pwd.getpwnam(self.login)[-1]
-    
-    def get_fullname(self):
-        return pwd.getpwnam(self.login)[-3]
-    
-    def get_groups(self):
-        """ return a list of groups this user belons to """
-        ret = []
-        for line in grp.getgrall():
-            if self.login in line[-1]:
-                ret.append(line[0])
-        return ret
-    
-    def set_groups(self, groups=[]):
-        """ Modify the user's group membershipts to the value provided in groups
-        Return a tuple (exit_status, stdout+stderr)
-        """
-        grps =",".join(groups)
-        com = "/usr/sbin/usermod -G %s %s"% (grps, self.login)
-        proc = run_popen(com)
-        out = proc.communicate()[0]
-
-        return proc.returncode, out
+        uinfo = None
+        if not new:
+            uinfo = [p for p in pwd.getpwall() if p.pw_name == login][0]
+            self.homedir = uinfo.pw_dir
+            self.uid = uinfo.pw_uid
+            self.fullname = uinfo.pw_gecos
+            # get the group listing
+            self.groups = []
+            for line in grp.getgrall():
+                if self.login in line[-1]:
+                    self.groups.append(line[0])
+            if os.path.exists(os.path.join(self.homedir, '.face')):
+                self.faceicon = os.path.join(self.homedir, '.face')
+            else:
+                self.faceicon = None
+        else:
+            self.homedir = None
+            self.uid = None
+            self.fullname = None
+            self.groups = None
+            self.faceicon = None
 
 
 class UserModel(object):
         # Make a place holder for observables
         self.watchers = []
 
-    def user_exists(self, login):
-        """ Check if login exsits in the current user accounts """
-        allusers = self.list_all()
-        for user in allusers:
-            _login, homedir, uid = user
-            if login == _login:
+    def hasUser(self, login):
+        """ Check if we already have an account for login"""
+        for entry in pwd.getpwall():
+            if entry.pw_name == login:
                 return True
         return False
+
+    def getUser(self, login):
+        """ Return a User object for the user whose login == login """
+        for entry in pwd.getpwall():
+            if entry.pw_name == login:
+                user = User(login, new=False)
+                return user
+        return None
     
-    def add_observable(self, method):
+    def add_observer(self, method):
         """ Add method to the list of obserbables to be notified of any changes """
         self.watchers.append(method)
 
         """ Notify any observers of the changes in the current user setup """
         for f in self.watchers:
             f(self.list_all())
-    
-    def delete_user(self, uname):
+        return
+
+    def _check_user_object(self, usuario):
+        """ Run some checks on usuario"""
+        assert isinstance(usuario, User), "User object is of invalid type."
+        if not hasattr(usuario, 'login') or usuario.login is None:
+            raise EmptyAttributeError('Login attribute is empty.  Expected a string')
+        if not hasattr(usuario, 'password') or usuario.password is None:
+            raise EmptyAttributeError('Password attribute is empty.  Expected a string.')
+        if not hasattr(usuario, 'groups') or usuario.groups is None or type(usuario.groups) is not list:
+            raise EmptyAttributeError('Empty groups attribute.  Expected a list.')
+        if not hasattr(usuario, 'password') or usuario.password is None:
+            raise EmptyAttributeError('Empty password attribute.  Expected a string.')
+        assert len(usuario.password) >= 5, "Provided password for new user is too short."
+        # finally check that the user model does not have this user in the sytem
+        assert self.hasUser(usuario.login) is False, "An user account with this login name already exists."
+        return
+
+    def addUser(self, userobject=None):
+        """ Add the oserobject as a new user account on the system """
+        # Run some checks on the provided object
+        assert userobject not in ("", None), "Invalid user object argument."
+        self._check_user_object(userobject)
+        # if we still have not excepted, go ahead and create the account.
+        salt = userobject.password[0] + userobject.password[-1]
+        epass = crypt.crypt(userobject.password, salt)
+        if hasattr(userobject, 'fullname'):
+            # use get_popen
+            cm = ['/usr/sbin/useradd', '-m','-c', "%s"% userobject.fullname,
+                  '-s', '/bin/bash', '-g', 'users','-G', ','.join(userobject.groups),
+                  '-p', epass, userobject.login]
+        else:
+            cm = ['/usr/sbin/useradd', '-m', '-s', '/bin/bash', '-g', 'users',
+                  '-G', ','.join(userobject.groups), '-p', epass, userobject.login]
+        # Run the popen process
+        proc = get_popen(cm)
+        out, err = proc.communicate()
+        rval = proc.returncode
+        # make sure the process ended well.
+        assert rval == 0, "Error while adding user to system. %s"% err
+        # setup the home directory
+        self._setup_user_home(userobject.login, userobject.faceicon)
+        return self.notify()
+
+    def deleteUser(self, uname):
         """ remove a user account """
-        #cmd = "/usr/sbin/userdel -f -r %s"% uname
-        #proc = sp.Popen(cmd.split(), stdout=sp.PIPE, stderr=sp.STDOUT)
-        cmd = ["/usr/sbin/userdel", "-r", "-r", uname]
+        assert self.hasUser(uname), "User account does not exist"
+        
+        cmd = ["/usr/sbin/userdel", "-r", uname]
         proc = get_popen(cmd)
-        out = proc.communicate()[1]
+        out, err = proc.communicate()
         ret = proc.returncode
-        self.notify()
-        return ret, out
+        assert ret == 0, "Error while deleting user %s"% err
+        return self.notify()
 
-    def _setup_user_home(self, login, faceicon=None):
+    def _setup_user_home(scelf, login, faceicon=None):
         """ setup the default user home for login"""
         skelpath = "/etc/skel"
         targetdir = "/home/%s"% login
             targeticon = os.path.join(targetdir, ".face")
             shutil.copy(faceicon, targeticon)
             # fix the permissions for this
-            perm = "/bin/chown %s:users %s"%(login, targeticon)
-            proc = run_popen(perm)
+            perm = ['/bin/chown','%s:user'% login, targeticon]
+            proc = get_popen(perm)
             proc.communicate()
         
         #copy the default stuff from /etc/skel
         for skel in os.listdir(skelpath):
-            com = "/bin/cp -ar %s %s"% (os.path.join(skelpath, skel), targetdir)
-            proc = run_popen(com)
+            com = ['/bin/cp','-ar', os.path.join(skelpath, skel), targetdir]
+            # com = "/bin/cp -ar %s %s"% (os.path.join(skelpath, skel), targetdir)
+            proc = get_popen(com)
             proc.communicate()
             # fix permissions
-            perm = "/bin/chown %s:users %s"% (
-                login, os.path.join(targetdir, skel))
-            proc = run_popen(perm)
+            perm = ['/bin/chown','%s:users'% login, os.path.join(targetdir, skel)]
+            #perm = "/bin/chown %s:users %s"% (
+            #    login, os.path.join(targetdir, skel))
+            proc = get_popen(perm)
             proc.communicate()
         # ensure permissions for the entire home dir
-        com = "/bin/chmod 0700 %s"% targetdir
-        proc = run_popen(com)
+        com = ['/bin/chmod','0700', targetdir]
+        #com = "/bin/chmod 0700 %s"% targetdir
+        proc = get_popen(com)
         proc.communicate()
-        com = '/bin/chown -R %s %s'% (login, targetdir)
-        proc = run_popen(com)
+        com = ['/bin/chown', '-R', login, targetdir]
+        #com = '/bin/chown -R %s %s'% (login, targetdir)
+        proc = get_popen(com)
         proc.communicate()
         
         return proc.returncode
-
-
-    def create_new_account(self, uname, realname, password, groups=[], faceicon=None):
-        """ creates a new user account """
-        # password must be at least 5 characters long
-        assert len(password) >= 5, "Password is too short"
-        salt = password[0] + password[-1]
-        epass = crypt.crypt(password, salt)
         
-        # FIXME: Add some more checks on these operations?
-        if realname:
-            proc = sp.Popen(["/usr/sbin/useradd","-m","-c",realname,
-                             "-s","/bin/bash","-g","users",
-                             "-G",",".join(groups),
-                             "-p",epass, uname],
-                stdout=sp.PIPE, stderr=sp.STDOUT)
-        else:
-            proc = sp.Popen(["/usr/sbin/useradd", "-m","-s",
-                             "/bin/bash","-g","users",
-                             "-G", ",".join(groups),
-                             "-p", epass, uname],
-                stdout=sp.PIPE, stderr=sp.STDOUT)
-
-        out = proc.communicate()[0]
-        ret = proc.returncode
-        assert ret == 0, "Subprocess returned error when running useradd"
-        self._setup_user_home(uname, faceicon)
-        self.notify()
-        return ret, out
-        
-    def list_all(self):
+    def listUsers(self):
         """ return a list of tuples to represent current user accounts.
         (loginame, uid, user_home)
         """        
         ret = []
         for p in pwd.getpwall():
             if p[2] == 0 or p[2] >= 1000:
-                row = (p[0],p[2],p[5])
-                ret.append(row)
+                user = User(p[0], new=False)
+                ret.append(user)
         return ret
+

File modules/support/vectorlinux/tests/test_USERMANAGE.py

     @classmethod
     def tearDownClass(cls):
         pass
+
+    def tearDown(self):
+        if self.usermodel.hasUser(self.user.login):
+            self.usermodel.deleteUser(self.user.login)
     
     def setUp(self):
         self.usermodel = USERADMIN.UserModel()
         # create a dummy account
-        self._uname = 'fidencio'
-        self._rlname = 'Fidencio Hdz'
-        self._groups = ['sys', 'adm', 'slocate','power']
-        self._password = 'mypassword'
+        self.user = USERADMIN.User('fidencio', new=True)
+        self.user.fullname = 'Fidencio Hdz'
+        self.user.groups = ['sys', 'adm', 'slocate', 'power']
+        self.user.password = 'supersecretpassword'
         self.addCleanup(self._cleanup)
 
     def _cleanup(self):
-        if self.usermodel.user_exists(self._uname):
-            self.usermodel.delete_user(self._uname)
+        if self.usermodel.hasUser(self.user.login):
+            self.usermodel.deleteUser(self.user.login)
+        return
+
+    def test_Invalid_user_object(self):
+        # Make sure we raise an exception when invalid user argument is given to addUser
+        self.assertRaises(AssertionError,
+                          self.usermodel.addUser, self.user.login)
+        return
+
+    def test_missing_login_attribute(self):
+        user = self.user
+        user.login = None
+        return self.assertRaises(USERADMIN.EmptyAttributeError,
+                                 self.usermodel.addUser, user)
+
+    def test_missing_passsword_attribute(self):
+        user = self.user
+        user.password = None
+        return self.assertRaises(USERADMIN.EmptyAttributeError,
+                                 self.usermodel.addUser, user)
+
+    def test_short_password_attribute(self):
+        user = self.user
+        user.password = 'foo'
+        return self.assertRaises(AssertionError,
+                                 self.usermodel.addUser, user)
+
+    def test_missing_groups_attribute(self):
+        user = self.user
+        user.groups = None
+        return self.assertRaises(USERADMIN.EmptyAttributeError,
+                                 self.usermodel.addUser, user)
+
+    def test_wrong_groups_attribute(self):
+        user = self.user
+        user.groups = "foo,bar"
+        return self.assertRaises(USERADMIN.EmptyAttributeError,
+                                 self.usermodel.addUser, user)
+    
+    def test_Delete_Nonexistant_User(self):
+        # Try to delete a user account that doesn't exist.  This should except.
+        return self.assertRaises(AssertionError,
+                                 self.usermodel.deleteUser, self.user)
 
     def test_MakeUser(self):
-        """ Testing user creation capabilities ... """
-        ret, out = self.usermodel.create_new_account(
-            self._uname,
-            self._rlname,
-            self._password,
-            self._groups
-            )
-        self.assertEqual(ret,0)
-        # make sure the user was created
-        return self.assertTrue(self.usermodel.user_exists(self._uname))
+        # Create a new user account
+        self.usermodel.addUser(self.user)
+        self.assertTrue(self.usermodel.hasUser(self.user.login)), "User account was not successfully created."
+        # test to make sure the attributes of the created account match what we wanted.
+        return
 
-    def test_UserAttributes(self):
-        """ Checking attributes of the created user account ... """
-        # create the user
+    def test_Add_Existing_user(self):
+        # Try to add a user that already exists.  This should except
         self.test_MakeUser()
-        user = USERADMIN.User(self._uname)
-        self.assertEqual(self._uname, user.get_login_name())
-        self.assertEqual(self._rlname, user.get_fullname())
-        self.assertEqual(self._groups, user.get_groups())
+        return self.assertRaises(AssertionError,
+                          self.usermodel.addUser, self.user)
 
+    def test_user_attributes(self):
+        # Test the user attributes, with the resulting user account
+        self.test_MakeUser()
+        euser = self.usermodel.getUser(self.user.login)
+        self.assertEqual(self.user.login, euser.login), "User login mismatch."
+        self.assertTrue(euser.uid > 1000), "User ID is in invalid range."
+        self.assertEqual(self.user.groups, euser.groups), "User groups mismatch."
+        self.assertEqual(self.user.fullname, euser.fullname), "User full name mismatch."
+    
     def test_DeleteUser(self):
         """ Testing user deletion capabilities ... """
-        # create the account
         self.test_MakeUser()
-        code, out = self.usermodel.delete_user(self._uname)
-        return self.assertEqual(code, 0)
+        # make sure the user is created
+        self.assertTrue, self.usermodel.hasUser(self.user.login)
+        # delete the account
+        self.usermodel.deleteUser(self.user.login)
+        self.assertTrue(self.usermodel.hasUser(self.user.login) is False), "User account was not deleted."
 
 
 if __name__ =='__main__':