Commits

Anonymous committed c41c8ea

Fix overlapping partition errors, refactored tests, added logging

Comments (0)

Files changed (2)

vinstall/backend/partitioning.py

 import parted
 import unittest, tempfile, os
 from vinstall.backend import media
+from vinstall.core import log
 
 __author__ = "Moises Henriquez"
 __email__ = "moc.liamg@xnl.e0m"[::-1]
 __version__ = "0.1"
 __date__ = "2012-06-05"
 
+LOG = log.get_logger(__file__)
+
+class PartitionDim(object):
+    def __init__(self, device=None, region=None):
+        self.device = device
+        self.start = 0
+        self.end = 0
+        self.length = 0
+        self.region = region
+        self.partition_type = parted.PARTITION_NORMAL
 
 class DiskPartitioner(object):
     """ Class used to modify the partition table on the given disk
 
     """
+    
     def __init__(self, disk):
         """ Arguments:
 
 
         """
         self.disk = disk
+        self.partition_cache = []
 
     def has_partition_table(self):
         """Find if there is a partition table. PTs start at position 0x1BE and
         """
         self.disk._disk = parted.freshDisk(self.disk._device, "msdos")
         self.disk._disk.has_partition_table = True
+        LOG.debug("New partition table created on %s"% self.disk)
 
     def delete_all_partitions(self):
         """ Delete all partitions from drive
 
         """
-        #self.disk._disk.deleteAllPartitions()
+        LOG.debug("Deleting all cached and existing partitions in %s"% self.disk)
+        self.partition_cache = []
+        return (self.disk._disk.deleteAllPartitions(), self.disk._disk.commit())
+        #return self.disk._disk.commit()
         self.disk._disk = parted.freshDisk(self.disk._device, "msdos")
         self.disk._disk.has_partition_table = True
 
         
         """
         return len(self.disk._disk.partitions)
+    
+    def append_partition(self, size=0, units='MB'):
+        """Add a partition to the partition cache"""
+        dprimaries = self.disk._disk.getPrimaryPartitions()
+        tprimaries = len(dprimaries) + len(
+            [p for p in self.partition_cache if p.partition_type == parted.PARTITION_NORMAL])
+        assert tprimaries < 3, "Too many primary partitions."
+        mypsize = parted.sizeToSectors(size, units, self.disk._device.sectorSize)
+        
+        free_space = self.disk._disk.getFreeSpacePartitions()
+        if not free_space: return
+        ## FIXME:  Raise exception instead
+        partdim = PartitionDim()
+        # Iterate over existing cached partitions and add up the end of them.
+        if self.partition_cache:
+            cached_end = max([c.end for c in self.partition_cache])
+        else:
+            cached_end = 0
+        # FIXME: ^^ compare which region each cached partition belongs to for sizing calculations.
+        newstart = max(2048, (cached_end + 1))
+        partdim.start = newstart
+        partdim.length = mypsize
+        partdim.device = self.disk
+        partdim.end = newstart + mypsize
+        partdim.partition_type = parted.PARTITION_NORMAL
+        self.partition_cache.append(partdim)
+        
 
     def add_partition(self,  size=0, units='MB'):
         """Add a partition to the disk.  
                units = partition units (MB, GB, MiB, GiB), (defaults to MiB)
 
         """
-        mypsize = parted.sizeToSectors(size, units, self.disk._device.sectorSize)
-        # first partition starts at sector 1
-        # find out where the free space is
-        myconst = parted.Constraint(device = self.disk._device)
-        for i in self.disk._disk.getFreeSpaceRegions():
-            if i > mypsize:  # partition fits here
-                if not self.partitions_number():
-                    start_marker = max(i.start, 2048)
-                else:
-                    start_marker = i.start
-                break
-        else:
-            raise RuntimeError("There is no free space for partition")
-
-        mygeom = parted.Geometry(device = self.disk._device,
-                                 start = start_marker,
-                                 length = mypsize)
-        myfs = parted.FileSystem(type='ext2', geometry=mygeom)
-        mypartition = parted.Partition(disk=self.disk._disk,
-                                       fs = myfs,
-                                       type=parted.PARTITION_NORMAL,
-                                       geometry = mygeom)
-        myconst = parted.Constraint(exactGeom = mygeom)
-        # Add the partition to the disk
-        self.disk._disk.addPartition(partition = mypartition,
-                                     constraint = myconst)
+        # FIXME: DEPRECATION WARNING:  Use append_partition method
+        return self.append_partition(size, units)
+    
+    def create_partition(self, partdim):
+        """Create a partition based on the provided partdim object"""
+        pconstraint = parted.Constraint(device = self.disk._device)
+        pgeometry = parted.Geometry(device = self.disk._device,
+            start = partdim.start,
+            length = partdim.length)
+        pfilesystem = parted.FileSystem(type="ext2", geometry=pgeometry)
+        ppartition = parted.Partition(disk = self.disk._disk,
+            fs=pfilesystem,
+            type = partdim.partition_type,
+            geometry=pgeometry)
+        pconstraint = parted.Constraint(exactGeom=pgeometry)
+        self.disk._disk.addPartition(partition = ppartition,
+            constraint = pconstraint)
+        
 
     def write_changes(self):
         """Finalize changes to the disk.  This needs to be called
         after creating partitions or deleting partitions to make sure
         the changes are actually written to the disk."""
+        
+        # Create the cached partitions and then write changes
+        for cached in self.partition_cache:
+            LOG.debug("Creating new partition starting at sector %s, ending in sector %s"%(
+                cached.start, cached.end))
+            self.create_partition(cached)
+        LOG.debug("Writing partition table changes to %s"%(self.disk))
         self.disk._disk.commit()
 
 
-class DiskPartitionerTestCase(unittest.TestCase):
+class DiskPartitionerTests(unittest.TestCase):
 
     def setUp(self):
         (fd, self.path) = tempfile.mkstemp(prefix="fake-device-")
         f = os.fdopen(fd)
-        f.seek(1400000)
+        f.seek(1024000000)
         os.write(fd, "0")
+        self.disk = media.Disk()
+        self.disk._device = parted.Device(self.path)
+        self.partitioner = DiskPartitioner(self.disk)
 
     def tearDown(self):
         os.unlink(self.path)
-
+    
     def test_create_partition_table(self):
-        device = parted.Device(self.path)
-        disk = media.Disk()
-        disk._device = device
-        p = DiskPartitioner(disk)
-        p.create_partition_table()
-        p.add_partition(size=0.1)
-        p.write_changes()
-        self.assertTrue(p.has_partition_table())
+        """Create partition table in new disk"""
+        self.partitioner.create_partition_table()
+        self.partitioner.add_partition(size=0.1)
+        self.partitioner.write_changes()
+        self.assertTrue(self.partitioner.has_partition_table())
 
     def test_add_partition(self):
-        device = parted.Device(self.path)
-        disk = media.Disk()
-        disk._device = device
-        p = DiskPartitioner(disk)
-        p.create_partition_table()
-        p.add_partition(size=0.1)
-        p.write_changes()
-        self.assertEqual(len(disk._disk.partitions), 1)
+        """Adding a single partition"""
+        self.partitioner.create_partition_table()
+        self.partitioner.append_partition(size=0.1)
+        self.partitioner.write_changes()
+        self.disk._disk.getPrimaryPartitions()
+        self.assertEqual(len(self.disk._disk.partitions), 1)
+    
+    def test_add_multiple_partitions(self):
+        """Add multiple partitions to a single disk"""
+        self.partitioner.create_partition_table()
+        self.partitioner.add_partition(size=3)
+        self.partitioner.add_partition(size=5)
+        self.partitioner.write_changes()
+        self.assertEqual(len(self.disk._disk.partitions), 2)
 
     def test_has_partition_table(self):
-        device = parted.Device(self.path)
-        disk = media.Disk()
-        disk._device = device
-        p = DiskPartitioner(disk)
-        self.assertFalse(p.has_partition_table())
-        p.create_partition_table()
-        p.add_partition(size=0.1)
-        p.write_changes()
-        self.assertTrue(p.has_partition_table())
+        """Check if partition table exists in disk"""
+        self.assertFalse(self.partitioner.has_partition_table())
+        self.partitioner.create_partition_table()
+        self.partitioner.add_partition(size=0.1)
+        self.partitioner.write_changes()
+        self.assertTrue(self.partitioner.has_partition_table())
     
     def test_delete_all_partitions(self):
-        device = parted.Device(self.path)
-        disk = media.Disk()
-        disk._device = device
-        p = DiskPartitioner(disk)
-        p.create_partition_table()
-        p.add_partition(size=0.05)
-        p.add_partition(size=0.05)
-        p.write_changes()
-        self.assertEqual(len(disk._disk.partitions), 2)
-        p.delete_all_partitions()
-        p.write_changes()
-        self.assertEqual(len(disk._disk.partitions), 0)
+        """Make sure all partitions are deleted and the cache is flushed"""
+        self.partitioner.create_partition_table()
+        for size in (1,2):
+            self.partitioner.add_partition(size)
+        self.partitioner.write_changes()
+        self.assertEqual(len(self.disk._disk.partitions), 2)
+        self.partitioner.delete_all_partitions()
+        self.assertEqual(len(self.partitioner.partition_cache), 0)
+        self.assertEqual(len(self.disk._disk.partitions), 0)
+    
+    def test_toom_many_primary_partitions(self):
+        """Raise an exception if we have too many primary partitions 
+        (cached + written to the disk)"""
+        self.partitioner.create_partition_table()
+        for size in (1,2,3):
+            self.partitioner.add_partition(size)
+        # This one should raise an exception
+        self.assertRaises(AssertionError, self.partitioner.add_partition, 1)
 
 
 if __name__ == "__main__":

vinstall/backend/utils.py

 import re
 import unittest
 import tempfile
+from vinstall.core import log
 
+LOG = log.get_logger(__file__)
 
 def get_mem_size():
     """Get the amount of RAM availablein the system in mB.
         command = 'mkswap %s'% path
     else:
         command = 'mkfs -t %s %s %s'% (filesystem, FSFLAGS[filesystem], path)
+    LOG.debug("Formating partiton %s with %s"%(path, filesystem))
     subprocess.call(command.split())
 
 
         command = 'swapon -a'
     else:
         command = 'swapon %s'% path
+    LOG.debug("Activating swap space on %s"% path)
     subprocess.call(command.split())
 
 
 
     """
     command = 'mount -t %s %s %s' % (filesystem, src, mountpoint)
+    LOG.debug("Mounting %s on %s"% (src, mountpoint))
     subprocess.call(command.split())
     return mountpoint
 
 
     """
     command = 'mount -o loop %s %s' % (src, mountpoint)
+    LOG.debug("Mounting ISO image %s on %s"% (src, mountpoint))
     subprocess.call(command.split())
     return mountpoint
 
 
     """
     command = 'umount %s' % (mounted)
+    LOG.debug("Un-mounting %s"% mounted)
     return subprocess.call(command.split())
 
 
 
     """
     command = 'chroot %s %s' % (root, command)
+    LOG.debug("Executing command '%s' in CHROOT %s"% (command, root))
     subprocess.call(command.split())
 
 
         """Save current root for restoring later
 
         """
+        LOG.debug("Entering chroot %s"% self.new_root)
         self.real_root = os.open("/", os.O_RDONLY)
         self.old_cwd = os.getcwd()
         os.chroot(self.new_root)
         """Go back to previous root
 
         """
+        LOG.debug("Exit chroot %s"% self.new_root)
         os.fchdir(self.real_root)
         os.chroot(".")
         os.close(self.real_root)