Source

vinstall / vinstall / backend / utils.py

#!/bin/env python

"""Collection of small utilities."""


__version__ = "0.0.1a"
__date__ = "08/20/09"
__author__ = "rbistolfi"
__credits__ = "M0E-lnx, Uelsk8s, Kidd"


import os
import subprocess
import re
import unittest
import tempfile
from vinstall.core import log

LOG = log.get_logger(__file__)

def get_mem_size():
    """Get the amount of RAM availablein the system in mB.

    """
    meminfo = "/proc/meminfo"
    fo = open(meminfo)
    line = fo.readline()
    fo.close()
    label, size, unit = line.split()
    return int(size) / 1024.0


def format_partition(path, filesystem):
    """Create a filesystem in a partition using mkfs.

    """
    if 'swap' not in filesystem:
        assert filesystem in supported_filesystems(), 'Unsupported filesystem'
    FSFLAGS = {'reiserfs':'-q -f -f',
        'jfs':'-q',
        'xfs':'-q',
        'ext2':'-F',
        'ext4':'-F',
        'ext3':'-F',
        }
    if 'swap' in filesystem:
        command = 'mkswap %s'% path
    else:
        command = 'mkfs -t %s %s %s'% (filesystem, FSFLAGS[filesystem], path)
    LOG.debug("Formating partiton %s with %s"%(path, filesystem))
    subprocess.call(command.split())


def activate_swap(path=None):
    if path is None:
        #Activate all swap space
        command = 'swapon -a'
    else:
        command = 'swapon %s'% path
    LOG.debug("Activating swap space on %s"% path)
    subprocess.call(command.split())


def mount(src, mountpoint, filesystem='auto'):
    """Mount a filesystem.

    Arguments:

        src: the filesystem or device to be mounted.
        filesystem: filesystem type passed to the mount command, for example:
                    "ext3". Default is auto.

    """
    command = 'mount -t %s %s %s' % (filesystem, src, mountpoint)
    LOG.debug("Mounting %s on %s"% (src, mountpoint))
    subprocess.call(command.split())
    return mountpoint


def mountiso(src, mountpoint):
    """Mount a filesystem.

    Arguments:

        src: the filesystem or device to be mounted.
        extra: extra options to be passed to the mount command, for example:
               "-o loop". Default is None.
        filesystem: filesystem type passed to the mount command, for example:
                    "ext3". Default is auto.

    """
    command = 'mount -o loop %s %s' % (src, mountpoint)
    LOG.debug("Mounting ISO image %s on %s"% (src, mountpoint))
    subprocess.call(command.split())
    return mountpoint


def umount(mounted):
    """Unmount a filesystem.

    Arguments:

        A mounted filesystem or device.

    >>> umount('/mnt/loop')
    0

    """
    command = 'umount %s' % (mounted)
    LOG.debug("Un-mounting %s"% mounted)
    return subprocess.call(command.split())


def exec_chroot(root, command):
    """Change the root dir for current shell  and run the given command.

    Arguments:

        dir: The directory to for chrooting to.
        command: The command to be executed under the new root.

    >>> exec_chroot('/', 'pwd')
    0

    """
    command = 'chroot %s %s' % (root, command)
    LOG.debug("Executing command '%s' in CHROOT %s"% (command, root))
    subprocess.call(command.split())


def is_mounted(device):
    """Returns true if device is mounted.

    """
    with open("/etc/mtab") as mounted:
        if device in mounted.read():
            return True
        else:
            return False


def get_mounted(mountpoint):
    """Return device for a mountpoint
    
    """
    for line in open("/etc/mtab"):
        device, mpoint, _ = line.split(" ", 2)
        if mountpoint == mpoint:
            return device


def supported_filesystems():
    """Returns a list of filesystems supported by the system."""

    fo = open('/proc/filesystems')
    data = fo.read()
    fo.close()
    kernel_fs = re.findall(r'^\s+(\w+)', data, re.M)
    path = os.environ['PATH'].split(':')
    for p in path:
        for k in kernel_fs:
            if os.path.isfile(os.path.join(p, 'mkfs.' + k)):
                yield k


class Chroot(object):
    """Execute block in a chrooted context

        >>> with Chroot("/mnt"):
        ...     dostuff()

    If chdir kwarg is True, os.chdir to "/" before entering

    """
    def __init__(self, new_root, chdir=True):
        self.new_root = new_root
        self.chdir = chdir

    def __enter__(self):
        """Save current root for restoring later

        """
        LOG.debug("Entering chroot %s"% self.new_root)
        self.real_root = os.open("/", os.O_RDONLY)
        self.old_cwd = os.getcwd()
        os.chroot(self.new_root)
        if self.chdir:
            os.chdir("/")

    def __exit__(self, *_):
        """Go back to previous root

        """
        LOG.debug("Exit chroot %s"% self.new_root)
        os.fchdir(self.real_root)
        os.chroot(".")
        os.close(self.real_root)
        os.chdir(self.old_cwd)


class FakeDeviceTestCase(unittest.TestCase):

    def setUp(self):
        (fd, self.path) = tempfile.mkstemp(prefix="fake-device-")
        f = os.fdopen(fd)
        f.seek(36000000)
        os.write(fd, "0")
        self.img = self.path

    def tearDown(self):
        os.unlink(self.path)

class FormattingTests(FakeDeviceTestCase):

    def test_ext2(self):
        format_partition(self.img, 'ext2')
        # Read the partitioning
        rformat = subprocess.check_output(['file', self.img])
        self.assertEqual(rformat.strip().split()[4], 'ext2')
    
    def test_ext3(self):
        format_partition(self.img, 'ext3')
        rformat = subprocess.check_output(['file', self.img])
        print rformat
        self.assertEqual(rformat.strip().split()[4], 'ext3')
    
    def test_ext4(self):
        format_partition(self.img, 'ext4')
        rformat = subprocess.check_output(['file', self.img])
        self.assertEqual(rformat.strip().split()[4], 'ext4')
    
    def test_xfs(self):
        format_partition(self.img, 'xfs')
        rformat = subprocess.check_output(['file', self.img])
        self.assertEqual(rformat.strip().split()[2], 'XFS')
    
    def test_jfs(self):
        format_partition(self.img, 'jfs')
        rformat = subprocess.check_output(['file', self.img])
        self.assertEqual(rformat.strip().split()[1], 'JFS2')
    
    def test_reiserfs(self):
        format_partition(self.img, 'reiserfs')
        rformat = subprocess.check_output(['file', self.img])
        self.assertEqual(rformat.strip().split()[1], 'ReiserFS')
    
    def test_format_with_unsupported_fs(self):
        self.assertRaises(AssertionError, format_partition,
            self.img, 'foofs')
    
    def test_swap(self):
        format_partition(self.img, 'swap')
        rformat = subprocess.check_output(['file', self.img])
        rformat = rformat.strip()
        self.assertTrue('swap' in rformat.split()[2])

            

if __name__ == "__main__":
    unittest.main()