Source

vasm / src / vasm / backend / utils.py

Full commit
#!/usr/bin/env python

#    This file is part of VASM.
#
#    VASM is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License v3 as published by
#    the Free Software Foundation.
#
#    VASM is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with VASM.  If not, see <http://www.gnu.org/licenses/>.

import threading
import multiprocessing as mp
import subprocess as sp
import os
import time
import psutil
import logging

__author__ = "Moises Henriquez"
__author_email__ = "moc.liamg@xnl.E0M"[::-1]
__vasm_version__ = '3.0a2'
datapath = '/usr/share/vasm'

def _(txt):
    return txt
    
logger = logging.getLogger('vasm')

def get_mounted_list():
    """ Return a list of the currently mounted partitions
    The list consists of objects with a .device, .mountpoint, and .filesystem attributes

    """
    return psutil.disk_partitions()
    

def mount(partition, mntpoint):
    """ Mount the specified partition on mntpoint
    raises AssertionError when it fails.  Returns the
    mountpoint when successfull.
    """
    mpoint = get_mountpoint(partition)
    if not os.path.isdir(mntpoint):
        os.makedirs(mntpoint)
    cmd = ["mount", partition, mntpoint]
    proc = get_popen(cmd)
    out, error = proc.communicate()
    rcode = proc.returncode
    assert rcode == 0, "Error while mounting partition: %s"% error
    mpoint = get_mountpoint(partition)
    assert mpoint  is not None, "Unable to mount partition"
    return mpoint

def activate_swap(partition):
    """ Activates swap space on partition for immediate use """
    proc = get_popen(['swapon',partition])
    out, err = proc.communicate()
    rcode = proc.returncode
    assert rcode == 0, "Unable to activate swap space on %s"% partition
    return
    

def umount(mntpoint):
    """ unmount the partition mounted in mntpoint 
        Raises an assertion error when it fails.
    """
    partition = get_mounted(mntpoint)
    if partition is None:
        return 0
    cmd = ["umount","-f",mntpoint]
    proc = get_popen(cmd)
    out, err = proc.communicate()
    rcode = proc.returncode
    assert rcode == 0, "Unable to un-mount partition %s."% err
    

def get_mounted(mntpoint):
    """ return the path to the partition currently mounted in mntpoint or None"""
    listing = psutil.disk_partitions()
    for part in listing:
        if part.mountpoint == mntpoint:
            return part.device
    
    return None
def get_mountpoint(partition):
    """ return the mountpoint of where partition is mounted or None """
    listing = psutil.disk_partitions()
    for part in listing:
        if part.device == partition:
            return part.mountpoint
    return None

class MonitoredExternalGui(threading.Thread):
    def __init__(self, command, quit_event):
        threading.Thread.__init__(self)
        self.quie_event = quit_event
        self.command = command
    
    def run(self):
        proc = get_popen(self.command)
        proc.communicate()
        self.quit_event.set()
        

def get_popen(cmd):
    """ return a subprocess.Popen object for the provided command to be launched """
    return sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE)

def launch_external_gui(f):
    """ Runs an external GUI application in a separate thread to keep
    the main UI from freezing"""
    
    def do_run():
        t = mp.Process(target=f)
        t.start()
    return do_run


def test_su_helpers():
    """ Return True if there is a su helper (gksu | kdesu)... False otherwise """
    if os.path.exists("/usr/bin/gksu"): return True
    if os.path.exists("/usr/bin/kdesu"): return True
    return False
    
def get_system_info():
    """ return a dictionary containing information about the operating system """
    ret = {}
    ret["version"] = get_vector_version()
    ret["base"] = get_slack_base()
    ret["releasedate"] = get_release_date()
    ret["website"] = "http://www.vectorlinux.com"
    ret["support"] = "http://www.vectorlinux.com/forum"
    return ret
    
def get_slack_base():
    """ Return the slackware version used for this release """
    f = open("/etc/slackware-version", "r")
    data = f.readlines()[0]
    f.close()
    return data.strip()

def get_release_date():
    """ Return the date this version was released (as stated in /etc/vector-version)"""
    f = open('/etc/vector-version', 'r')
    data = f.readlines()[0]
    f.close()
    return data.split("built on")[-1].strip()
    
def get_vector_version():
    """ Returns the full vector version string """
    f = open('/etc/vector-version', 'r')
    data = f.readlines()[0]
    f.close()
    r = []
    #r.append("VectorLinux")
    for word in data.split():
        if word == "built":
            break
        r.append(word)
    return ' '.join(r)