Commits

Anonymous committed 4ee48bf

Added tests for FSTAB and SERVICES. Continuing code review

  • Participants
  • Parent commits 289fdc6

Comments (0)

Files changed (6)

modules/support/vectorlinux/DATESET.py

 
     def get_value(self):
         """ Read self.descriptor and return the current setting """
+        ret = ""
         with open(self.descriptor) as f:
             for line in f:
                 if self._line_sets_value(line):
-                    return line.strip()
+                    ret = line.strip()
+        # make sure we have a valid setting in there, otherwise, we wont be able
+        # to set the value later
+        assert ret != ""
+        return ret
 
-    def set_value(self, value):
+    def set_value(self, value=""):
         """ Save the provided value to /etc/hardwareclock """
+        # Raise exception if value is not valid
+        assert value in ("UTC",
+                         "LOCALTIME",
+            "utc",
+            "localtime",
+            "Utc",
+            "Localtime"), "Invalid hardware clock identifier"
         newdata = []
         with open(self.descriptor) as f:
             for line in f:
         self.tzlink = "/etc/localtime-copied-from"
         self.timezones_base_dir = "/usr/share/zoneinfo"
 
-    def get_current_timezone(self):
+    def get_zone(self):
         """ Return a string representing the current timezone """
         # To get the value, we must read /etc/localtime-copied-from.
         # If this excepts, then no timezone is set.
 
     def _remove_tz_link(self):
         """ remove the symlink that sets the timezone """
-        return  os.remove(self.tzlink)
+        return os.remove(self.tzlink)
 
     def _check_faulty_rtcports(self):
         """ Check /proc/ioports for unknown rtc ports """
                 break
         return ret
 
-    def _active_zone(self):
+    def _activate_zone(self):
         """ Active the selected zone """
         if self._check_faulty_rtcports():
             clock_opt = "--directisa"
 
     def set_zone(self, zone=None):
         """ Set the timezone to `zone` """
+        assert zone not in ("",None), "Empty values are not allowed for setting the time zone"
         # first we remove the current setting
         self._remove_tz_link()
         

modules/support/vectorlinux/FSTAB.py

         lname = os.readlink(os.path.join(uuiddir, entry))
         if lname.endswith(part):
             return entry
+    return None
 
 def get_partition_filesystem(partition):
     """ Returns pertition's filesystem """
 
 class FstabModel(object):
     """ Data model for working with /etc/fstab """
-    def __init__(self):
+    def __init__(self, dataobject = '/etc/fstab'):
         self.observers = []
+        self.dataobject = dataobject
     
     def notify(self):
         """ Notify all observers that there have been changes """
         else:
             path = partition
             uuid=None
-        f = open("/etc/fstab", "r")
+
+        f = open(self.dataobject, "r")
         data = f.readlines()
         f.close()
         for line in data:
             else:
                 if line.startswith(path):
                     data.remove(line)
-        try:
-            f = open("/etc/fstab", "w")
-            f.writelines("".join(data))
-            f.close()
-            log.debug("%s entry has been removed from /etc/fstab"% partition)
-        except:
-            _log = ("Unable to remove %s from /etc/fstab.", " ",
-                "Check file permissions and contents of /etc/fstab.")
-            log.fatal(" ".join(_log))
-            return 1 # ???
+
+        f = open(self.dataobject, 'w')
+        f.writelines(''.join(data))
+        f.close()
+        log.debug("%s entry has been removed from %s"% (partition, self.dataobject))
         return self.notify()
     
-    def add_entry(self, partition, mpoint, fsystem, options):
+    def add_entry(self, partition="", mpoint="", fsystem="", options="defaults 0 0"):
         """ Adds an entry to the current fstab setup"""
+        # raise exceptions when invalid entries are parsed
+        assert partition != "", "You must specify a partition path"
+        assert mpoint != "" , "You must specify a mountpoint for %s"% partition
+        assert fsystem != "", "You must specify the filesystem type for %s"% partition
+        
         # We want to add by uuid if possible rather than by partition path.
-        if len(os.listdir("/dev/disk/by-uuid")) > 0:
-            # we can add uuid
-            if partition.startswith('/'):
-                part = "UUID=%s" % get_partition_uuid(partition)
+        uuid = get_partition_uuid(partition)
+        if uuid is None:
+            part = partition
         else:
-            part = partition
-        
+            part = "UUID=%s"% uuid        
         line = "%s    %s    %s    %s\n"%(part,
                                        mpoint,
                                        fsystem,
                                        options)
         
-        f = open("/etc/fstab", "r")
+        f = open(self.dataobject, "r")
         data = f.readlines()
         f.close()
         data.append(line)
-        try:
-            f = open("/etc/fstab", "w")
-            f.writelines("".join(data))
-            f.close()
-            log.debug("%s has been added to /etc/fstab."% partition)
-        except:
-            msg = ("Unable to add entry to /etc/fstab.", " ", 
-                "Check file permissions and contents on /etc/fstab")
-            log.fatal(" ".join(msg))
-            return 1
-        # Notify all the observers that there has been a change.
-        return self.notify()
+        # write the changes
+        f = open(self.dataobject, 'w')
+        f.writelines(''.join(data))
+        f.close()
+        log.debug('%s has been added to %s'% (partition, self.dataobject))
+        self.notify()
+        return
     
     def get_active_listing(self):
         """ Return a list of tuples containing the currently activated
         mount points"""
         ret = []
-        try:
-            f = open("/etc/fstab", "r")
-            data = f.readlines()
-            f.close()
-        except:
-            log.fatal("Unable to open /etc/fstab for reading.")
-            return None
+        f = open(self.dataobject, 'r')
+        data = f.readlines()
+        f.close()
         for line in data:
             if not line.strip():
                 continue
                 entry = (dev, fst, tgt, opts.strip(), uuid)
                 ret.append(entry)
         return ret
-

modules/support/vectorlinux/SERVICES.py

 
 class ServiceModel(object):
     """ Class for working with system services """
-    def __init__(self):
+    def __init__(self, initrdpath = '/etc/rc.d/init.d/'):
         self.observers = []
-        self.initdpath = "/etc/rc.d/init.d/"
+        self.initdpath = initrdpath
     
     def notify(self):
         """ Trigger all observers to run """
 
     def enable_service(self, service, runlevel):
         """ enable service for runlevel"""
-        #cmd = "/sbin/service -s %s %s"%(service, str(runlevel))
-        #proc = sp.Popen(cmd.split(), stdout=sp.PIPE, stderr=sp.STDOUT)
+        assert str(runlevel) not in ("0", "6"), "Invalid runlevel"
+        assert service in self.list_all_services(), "Unknown Service"
+        
         cmd = ["/sbin/service", "-s", service, str(runlevel)]
         proc = get_popen(cmd)
         output = proc.communicate()[1]
     
     def disable_service(self, service, runlevel):
         """ Disable service from runlevel """
-        #cmd = "/sbin/service -r %s %s"% (service, str(runlevel))
-        #proc = sp.Popen(cmd.split(), stdout=sp.PIPE, stderr=sp.STDOUT)
+        assert str(runlevel) not in ("0", "6"), "Invalid runlevel"
+        assert service in self.list_all_services(), "Unknown service."
+
         cmd = ["/sbin/service", "-r", service, str(runlevel)]
         proc = get_popen(cmd)
         output = proc.communicate()[1]

modules/support/vectorlinux/tests/test_DATESET.py

+
 #!/usr/bin/env python
 # -*- encoding: utf-8 -*-
 
 
 
 import unittest
+import jailedtests
+import os
+import sys
+mydir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+if not mydir in sys.path: sys.path.append(mydir)
 import DATESET
 
+class ClockTest(jailedtests.JailedTest):
+    """ Test the functionality to set the hardware clock """
+    def setUp(self):
+        self.clock = DATESET.HWClock()
+        super(ClockTest, self).setUp()
+        self.dummyvalue = "localtime"
 
-class DateTimeTest(unittest.TestCase):
+    def test_setClock(self):
+        # This will except if something goes wrong
+        self.clock.set_value(self.dummyvalue)
+        gotval = self.clock.get_value()
+        return self.assertEqual(gotval, self.dummyvalue)
+
+    def test_invalid_values(self):
+        self.assertRaises(AssertionError,
+                          self.clock.set_value, "foo")
+        self.assertRaises(AssertionError,
+                          self.clock.set_value, "")
+        
+
+class TimeZoneTest(jailedtests.JailedTest):
+    """ Test timezone setting and getting """
+    def setUp(self):
+        self.zone = DATESET.TimeZone()
+        super(TimeZoneTest, self).setUp()
+        self.zonesbase = "/usr/share/zoneinfo/"
+        self.tz = "US/Central"
+
+    def test_setEmtyZone(self):
+        self.assertRaises(AssertionError,
+                          self.zone.set_zone, "")
+        return self.assertRaises(AssertionError,
+                                 self.zone.set_zone, None)
+
+    def test_setzone(self):
+        ret = self.zone.set_zone(self.tz)
+        return self.assertTrue(ret)
+
+    def test_getzone(self):
+        zone = self.zone.get_zone()
+        return self.assertEqual(zone, self.tz)
+
+    def test_verify_with_filesystem(self):
+        rlink = os.readlink("/etc/localtime-copied-from")
+        return self.assertEqual(rlink, os.path.join(self.zonesbase, self.tz))
+
+
+class DateTimeTest(jailedtests.JailedTest):
+    """ Test the functionality to set and get the system time """
 
     def setUp(self):
-
+        super(DateTimeTest, self).setUp()
         self.date_time = DATESET.DateTime()
 
         # save current time for restoring it later
         self.time = self.now.hour, self.now.minute, self.now.second
 
     def tearDown(self):
-
+        super(DateTimeTest, self).tearDown()
         dt = self.date_time
         dt.year, dt.month, dt.day = self.date
         dt.hour, dt.minute, dt.second = self.time

modules/support/vectorlinux/tests/test_FSTAB.py

+#!/usr/bin/env python
+
+#    This file is part of VASM.
+#
+#    VASM is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License v3 as published by
+#    the Free Software Foundation.
+#
+#    VASM is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with VASM.  If not, see <http://www.gnu.org/licenses/>.
+
+__author__ = "Moises Henriquez"
+__author_email__ = "moc.liamg@xnl.E0M"[::-1]
+
+import unittest
+import os
+import sys
+mydir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+if not mydir in sys.path: sys.path.append(mydir)
+import FSTAB
+from shutil import copy2
+
+class FstabTest(unittest.TestCase):
+    def setUp(self):
+        # create a copy of /etc/fstab so we can play with it
+        self.datapath = '/tmp/test_fstab'
+        copy2('/etc/fstab', self.datapath)
+        self.datamodel = FSTAB.FstabModel(self.datapath)
+        self.fake_entry = {"partition":"/dev/sdc1",
+                           "mountpoint":"/fake_mount_point",
+                           "filesystem":"ext2",
+                           "options":"defaults 1 0"}
+        return
+
+    def test_addEntryInvalidValues(self):
+        self.assertRaises(AssertionError,
+                          self.datamodel.add_entry,
+                          partition="",
+                          mpoint="",
+                          fsystem="",
+                          )
+        self.assertRaises(AssertionError,
+                          self.datamodel.add_entry,
+            partition=self.fake_entry['partition'],
+            mpoint="",
+            fsystem="")
+        self.assertRaises(AssertionError,
+                          self.datamodel.add_entry,
+            partition=self.fake_entry['partition'],
+            mpoint = self.fake_entry['mountpoint'],
+            fsystem = "")
+
+    def test_addEntry(self):
+        self.datamodel.add_entry(
+            partition=self.fake_entry['partition'],
+            mpoint=self.fake_entry['mountpoint'],
+            fsystem=self.fake_entry['filesystem'],
+            options=self.fake_entry['options']
+            )
+        # verify that it went in
+        listing = self.datamodel.get_active_listing()
+        readback = [item for item in self.datamodel.get_active_listing() if item[0] == self.fake_entry['partition']][0]
+        self.assertTrue(readback is not None)
+        self.assertEqual(readback[0], self.fake_entry['partition'])
+        self.assertEqual(readback[1], self.fake_entry['filesystem'])
+        self.assertEqual(readback[2], self.fake_entry['mountpoint'])
+        self.assertEqual(readback[3], self.fake_entry['options'])
+            
+    def tearDown(self):
+        """ Remove the file when we're done """
+        return os.remove(self.datapath)
+
+
+if __name__ == "__main__":
+    assert os.geteuid() == 0, "You must be root to run tests on /etc/fstab """
+    unittest.main()
+

modules/support/vectorlinux/tests/test_SERVICES.py

+#!/usr/bin/env python
+
+#    This file is part of VASM.
+#
+#    VASM is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License v3 as published by
+#    the Free Software Foundation.
+#
+#    VASM is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with VASM.  If not, see <http://www.gnu.org/licenses/>.
+
+__author__ = "Moises Henriquez"
+__author_email__ = "moc.liamg@xnl.E0M"[::-1]
+
+import jailedtests
+import os
+import sys
+mydir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+if not mydir in sys.path: sys.path.append(mydir)
+import SERVICES as services
+
+class ServicesTests(jailedtests.JailedTest):
+    def setUp(self):
+        super(ServicesTests, self).setUp()
+        self.datamodel = services.ServiceModel()
+        self.fake_service = self.datamodel.list_all_services()[0]
+        self.fake_runlevel = 4
+
+    def test_EnableService(self):
+        code, output = self.datamodel.enable_service(self.fake_service,
+                                                     self.fake_runlevel)
+        # subprocess will return > 0 if this fails
+        self.assertEqual(code, 0)
+        # this return True when service is enabled, False otherwise
+        # should be enabled now
+        val = self.datamodel.get_service_runlevel_status(self.fake_service,
+                                                         self.fake_runlevel)
+        self.assertTrue(val)
+
+    def test_DisableService(self):
+        code, output = self.datamodel.disable_service(self.fake_service,
+                                                      self.fake_runlevel)
+        # Subprocess will return > 0 if this fails
+        self.assertEqual(code, 0)
+        # Check the status of this service now.  Should be disabled
+        val = self.datamodel.get_service_runlevel_status(self.fake_service,
+                                                         self.fake_runlevel)
+        self.assertEqual(val, False)
+
+    def test_InvalidRunlevel_enable(self):
+        self.assertRaises(AssertionError,
+                          self.datamodel.enable_service,
+                          self.fake_service, 0)
+        self.assertRaises(AssertionError,
+                          self.datamodel.enable_service,
+            self.fake_service, 6)
+
+    def test_InvalidService_enable(self):
+        """ Try to enable an unknown service."""
+        return self.assertRaises(AssertionError,
+                          self.datamodel.enable_service,
+            "foo", 2)
+
+    def test_InvalidRunlevel_disable(self):
+        """ Try to disable services for an invalid runlevel """
+        self.assertRaises(AssertionError,
+                          self.datamodel.disable_service,
+            self.fake_service, 0)
+        self.assertRaises(AssertionError,
+                          self.datamodel.disable_service,
+            self.fake_service, 6)
+        return
+
+    def test_InvalidService_disable(self):
+        """ Try to disable an invalid service """
+        return self.assertRaises(AssertionError,
+                                 self.datamodel.disable_service,
+            "foo", 2)
+        
+if __name__ == '__main__':
+    assert os.geteuid() == 0, "You must be root to run tests on system services"
+    unittest = jailedtests.unittest
+    unittest.main()