Commits

Anonymous committed 3e98e18

Refactoring fstab and related tests

Comments (0)

Files changed (3)

modules/support/vectorlinux/FSTAB.py

 
 import parted
 import os
-import logging
-
-log = logging.getLogger('vasm')
 
 def list_all_system_partitions():
     """ return a list of all partition paths"""
     for dev in parted.getAllDevices():
         if dev.type == 1 and dev.getSize() > 0.0:
             drives.append(dev.path)
-    return drives
+    return drives    
 
-def get_partition_path(uuid):
-    """ Get a partition's path given it's uuid"""
-    uuiddir="/dev/disk/by-uuid"
-    for uid in os.listdir(uuiddir):
-        if uid == uuid:
-            lname = os.readlink(os.path.join(uuiddir, uid))
-            return os.path.join("/dev/",
-                                lname.split("/")[-1])
+class FstabEntry(object):
+    """ Object representing an entry in /etc/fstab """
+    def __init__(self, device = None):
+        self.device = device
+        self.uuid = None
+        self.mountpoint = None
+        self.filesystem = None
+        self.options = None
 
-def get_partition_uuid(path):
-    """ Get a partition's uuid given it's node path """
-    uuiddir = "/dev/disk/by-uuid"
-    if "/" in path:
-        part = os.path.split(path)[-1]
-    else:
-        part = path
-    for entry in os.listdir(uuiddir):
-        lname = os.readlink(os.path.join(uuiddir, entry))
-        if lname.endswith(part):
-            return entry
-    return None
+    def find_default_options(self):
+        """ Returns the default fstab options for the partitions filesystem """
+        assert self.filesystem is not None, "Unknown filesystem."
+        opts = {
+            "ext2":"defaults 0 0",
+            "ext3":"defaults 0 0",
+            "ext4":"defaults 0 0",
+            "iso9660":"noauto,ro,user 0 0",
+            "msdos":"defaults 0 0",
+            "swap":"sw 0 0",
+            "linux-swap":"sw 0 0",
+            "linux-swap(v1)":"sw 0 0"
+        }
+        if opts.has_key(self.filesystem):
+            return opts[self.filesystem]
+        elif 'swap' in self.filesystem:
+            return opts['swap']
+        else:
+            return 'defaults 0 0'
 
-def get_partition_filesystem(partition):
-    """ Returns pertition's filesystem """
-    if partition is None: return "unknown"
-    devname = partition[:len("/dev/sda")]
-    pdev = parted.Device(devname)
-    pdisk = parted.Disk(pdev)
-    ppart = None
-    for part in pdisk.partitions:
-        if part.path == partition:
-            ppart = part
-            break
-    if ppart:
+    def find_uuid(self):
+        """ Find the uuid for the specified device """
+        assert self.device is not None, "You must specify a device node first."
+        #assert "/" not in self.device, "Invalid device node."
+        uuiddir = "/dev/disk/by-uuid"
+        partition = os.path.split(self.device)[-1]
+        for entry in os.listdir(uuiddir):
+            lname = os.readlink(os.path.join(uuiddir, entry))
+            if lname.endswith(partition):
+                return entry
+        return None
+
+    def find_path(self):
+        """ Find the partition path.  This is useful if fstab provides UUID but not path """
+        assert self.uuid is not None, "Cannot find path if uuid is not known"
+        uuiddir = "/dev/disk/by-uuid"
+        for uid in os.listdir(uuiddir):
+            if uid == self.uuid:
+                lname = os.readlink(os.path.join(uuiddir, uid))
+                return os.path.join("/dev/", lname.split("/")[-1])
+        return None
+        
+    def find_filesystem(self):
+        """ Find the partition's filesystem as reported by parted """
+        assert self.device is not None, "You must specify a device node first."
+        devicename = self.device[:len("/dev/sda")]
+        pdev = parted.Device(devicename)
+        pdisk = parted.Disk(pdev)
+        ppart = [p for p in pdisk.partitions if p.path == self.device ][0]
         if "swap" in ppart._fileSystem.type:
             ret = "swap"
         else:
             ret = ppart._fileSystem.type
-    else:
-        ret = "Unknown"
-    return ret
 
-def get_default_fstab_options(fstype):
-    """ Returns the default fstab options for the given filesystem type """
-    OPTIONS={
-        "ext2": "defaults 0 0",
-        "ext3": "defaults 0 0",
-        "ext4": "defaults 0 0",
-        "iso9660":"noauto,ro,user 0 0",
-        "msdos": "defaults 0 0",
-        "swap": "sw 0 0",
-        "linux-swap": "sw 0 0",
-        "linux-swap(v1)": "sw 0 0"
-    }
-    if fstype in OPTIONS:
-        return OPTIONS[fstype]
-    return "defaults 0 0"
-    
+        return ret
 
 class FstabModel(object):
     """ Data model for working with /etc/fstab """
     def add_observer(self, method):
         """ Add an observer to the model"""
         self.observers.append(method)
-    
-    def delete_entry(self, partition):
-        """ Removes an entry from /etc/fstab"""
-        if partition.startswith("/"):
-            uuid = "UUID=%s" % get_partition_uuid(partition)
-            path = partition
-        elif partition.startswith("UUID="):
-            uuid = partition
-            path = get_partition_path(uuid)
+        return
+
+    def removeEntry(self, device_path=""):
+        """ Remove the fstab entry with the device_path. """
+        assert device_path not in ("", None), "Invalid device path"
+        assert self.hasEntry(device_path), "%s is not listed in %s"% (device_path, self.dataobject)
+        ndata = []
+        with open(self.dataobject) as data:
+            for line in data:
+                if not line.startswith("#"):
+                    entry = self.get_entry_from_line(line)
+                    if entry.device == device_path:
+                        continue
+                ndata.append(line)
+        f = open(self.dataobject, 'w')
+        f.writelines(''.join(ndata))
+        f.close()
+        return self.notify()
+
+    def addEntry(self, entry):
+        """ Add a new entry to fstab.
+        Arguments:
+            entry - An FstabEntry() object """
+        assert isinstance(entry, FstabEntry), "entry argument is of invalid type"
+        assert hasattr(entry, 'mountpoint'), "Missing mountpoint attribute."
+        sep = " "*4
+        if entry.uuid is None:
+            entry.uuid = entry.find_uuid()
+        if entry.filesystem is None:
+            entry.filesyatem = entry.find_filesystem()
+
+        # Use UUID when possible
+        if entry.uuid is None:
+            part = entry.device
         else:
-            path = partition
-            uuid=None
+            part = "UUID=%s"% entry.uuid
 
-        f = open(self.dataobject, "r")
-        data = f.readlines()
-        f.close()
-        for line in data:
-            # look for a match in path or uuid
-            if uuid is not None:
-                if line.startswith(path) or line.startswith(uuid):
-                    data.remove(line)
-            else:
-                if line.startswith(path):
-                    data.remove(line)
+        if entry.options is None:
+            options = entry.find_default_options()
+        else:
+            options = entry.options
 
+        line = sep.join((
+            part,
+            entry.mountpoint,
+            entry.filesystem,
+            options
+            ))
+        data = [line]
         f = open(self.dataobject, 'w')
         f.writelines(''.join(data))
         f.close()
-        log.debug("%s entry has been removed from %s"% (partition, self.dataobject))
         return self.notify()
-    
-    def add_entry(self, partition="", mpoint="", fsystem="", options="defaults 0 0"):
-        """ Adds an entry to the current fstab setup"""
-        # raise exceptions when invalid entries are parsed
-        assert partition != "", "You must specify a partition path"
-        assert mpoint != "" , "You must specify a mountpoint for %s"% partition
-        assert fsystem != "", "You must specify the filesystem type for %s"% partition
-        
-        # We want to add by uuid if possible rather than by partition path.
-        uuid = get_partition_uuid(partition)
-        if uuid is None:
-            part = partition
-        else:
-            part = "UUID=%s"% uuid        
-        line = "%s    %s    %s    %s\n"%(part,
-                                       mpoint,
-                                       fsystem,
-                                       options)
-        
-        f = open(self.dataobject, "r")
-        data = f.readlines()
-        f.close()
-        data.append(line)
-        # write the changes
-        f = open(self.dataobject, 'w')
-        f.writelines(''.join(data))
-        f.close()
-        log.debug('%s has been added to %s'% (partition, self.dataobject))
-        self.notify()
-        return
-    
-    def get_active_listing(self):
-        """ Return a list of tuples containing the currently activated
-        mount points"""
+
+    def listEntries(self):
+        """ Return a list of FstabEntry objects representing each entry in fstab """
         ret = []
-        f = open(self.dataobject, 'r')
-        data = f.readlines()
-        f.close()
-        for line in data:
-            if not line.strip():
-                continue
-            if not line.startswith("#"):
-                # this is not a comment
-                sp = line.split()
-                # we want 3 sections... device|target|options
-                dev = sp[0]
-                if dev.startswith("UUID="):
-                    uid = dev.split("=")[-1]
-                    uuid = uid
-                    dev = get_partition_path(uid)
-                else:
-                    uuid = get_partition_uuid(dev) or "N/A"
-                tgt = sp[1]
-                fst = sp[2]
-                opts = ' '.join(sp[3::])
-                entry = (dev, fst, tgt, opts.strip(), uuid)
+        sep = " "*4
+        with open(self.dataobject) as f:
+            for line in f:
+                if not line.strip():
+                    continue
+                if line.startswith("#"):
+                    continue
+                entry = self.get_entry_from_line(line)
                 ret.append(entry)
         return ret
+
+    def get_entry_from_line(self, line):
+        """ Create a valid entry from the provide line """
+        sp = line.split()
+        sep = " "*4
+        entry = FstabEntry()
+        dev = sp[0].strip()
+        if dev.startswith("UUID="):
+            entry.uuid = dev.split("=")[-1].strip()
+            entry.device = entry.find_path()
+        else:
+            entry.device = dev.strip()
+            entry.uuid = entry.find_uuid()
+        entry.mountpoint = sp[1].strip()
+        entry.filesystem = sp[2].strip()
+        entry.options = sep.join(sp[3::]).strip()
+
+        return entry
+
+    def hasEntry(self, device_path=""):
+        """ Check if fstab has an entry for device_path """
+        assert device_path not in ("",None), "Invalid device path"
+        entries = self.listEntries()
+        for item in entries:
+            if item.device == device_path:
+                return True
+
+        return False

modules/support/vectorlinux/tests/test_FSTAB.py

         self.datapath = '/tmp/test_fstab'
         copy2('/etc/fstab', self.datapath)
         self.datamodel = FSTAB.FstabModel(self.datapath)
-        self.fake_entry = {"partition":"/dev/sdz1",
-                           "mountpoint":"/fake_mount_point",
-                           "filesystem":"ext2",
-                           "options":"defaults 1 0"}
+        self.fake_entry = FSTAB.FstabEntry()
+        self.fake_entry.device = "/dev/sdz1"
+        self.fake_entry.mountpoint = "fake_mount_point"
+        self.fake_entry.filesystem = "ext2"
+        self.fake_entry.options = "defaults 1 0"
         return
 
-    def test_addEntryInvalidValues(self):
+    def test_check_for_entry(self):
+        # Check if an entry exists
+        # We have not yet added fake_entry, so this should return False
+        self.assertTrue(self.datamodel.hasEntry(self.fake_entry.device) is False)
+
+    def test_add_entry(self):
+        # Check if we can add an entry
+        self.datamodel.addEntry(self.fake_entry)
+        # now check if we have the entry there
+        self.assertTrue(self.datamodel.hasEntry(self.fake_entry.device))
+
+    def test_remove_entry(self):
+        # Check if we can remove an entry
+        self.datamodel.addEntry(self.fake_entry)
+        self.assertTrue(self.datamodel.hasEntry(self.fake_entry.device))
+        self.datamodel.removeEntry(self.fake_entry.device)
+        self.assertTrue(self.datamodel.hasEntry(self.fake_entry.device) is False)
+
+    def test_invalid_entry_argument(self):
+        # Make sure it raises an exception if we give it a string
         self.assertRaises(AssertionError,
-                          self.datamodel.add_entry,
-                          partition="",
-                          mpoint="",
-                          fsystem="",
-                          )
+                          self.datamodel.addEntry, 'foo')
+
+    def test_incomplete_entry_attributes(self):
+        # make sure we raise an exception when the attributes of the entry are not complete
+        entry = FSTAB.FstabEntry() # empty entry
         self.assertRaises(AssertionError,
-                          self.datamodel.add_entry,
-            partition=self.fake_entry['partition'],
-            mpoint="",
-            fsystem="")
-        self.assertRaises(AssertionError,
-                          self.datamodel.add_entry,
-            partition=self.fake_entry['partition'],
-            mpoint = self.fake_entry['mountpoint'],
-            fsystem = "")
-
-    def test_addEntry(self):
-        self.datamodel.add_entry(
-            partition=self.fake_entry['partition'],
-            mpoint=self.fake_entry['mountpoint'],
-            fsystem=self.fake_entry['filesystem'],
-            options=self.fake_entry['options']
-            )
-        # verify that it went in
-        listing = self.datamodel.get_active_listing()
-        readback = [item for item in self.datamodel.get_active_listing() \
-                    if item[0] == self.fake_entry['partition' ] and \
-                    item[2] == self.fake_entry['mountpoint']][0]
-        self.assertTrue(readback is not None)
-        self.assertEqual(readback[0], self.fake_entry['partition'])
-        self.assertEqual(readback[1], self.fake_entry['filesystem'])
-        self.assertEqual(readback[2], self.fake_entry['mountpoint'])
-        self.assertEqual(readback[3], self.fake_entry['options'])
-            
+                          self.datamodel.addEntry, entry)
+        
     def tearDown(self):
         """ Remove the file when we're done """
         return os.remove(self.datapath)

modules/support/vectorlinux/tests/test_SKEL.py

 __author__ = "Moises Henriquez"
 __author_email__ = "moc.liamg@xnl.E0M"[::-1]
 
+
 class TestSkel(unittest.TestCase):
     def setUp(self):
         # create a fake user account.