Source

vasm / src / vasm / backend / SERVICES.py

#!/usr/bin/env python

#    This file is part of VASM.
#
#    VASM is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License v3 as published by
#    the Free Software Foundation.
#
#    VASM is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with VASM.  If not, see <http://www.gnu.org/licenses/>.

__author__ = "Moises Henriquez"
__author_email__ = "moc.liamg@xnl.E0M"[::-1]

import os
import subprocess as sp
import logging
from utils import get_popen

log = logging.getLogger('vasm')

def get_service_desc(service):
    services = {
        "portmap": "Port mapping service.  Needed by NIS and NFS.",
        "inetd": "Internet server super service.",
        "fuse":"Allow non-privileged users to work with filesystems.",
        "cron": "Execute scheduled tasks.",
        "firewall": "Filter and masquerade TCP/IP networking.",
        "gpm": "Enable mouse cursor in text mode",
        "lm_sensors": "Hardware sensor data reporting.",
        "samba": "File/Printer sharing and interaction with a windows network.",
        "sshd": "Allow remote (text mode) access and port tunelling.",
        "bluetooth":"Enables interaction with bluetooth devices.",
        "cups": "Common Unix Printing System.  Necessary to print."
    }
    if service in services:
        return services[service]
    else:
        return service + " service."
    

class ServiceModel(object):
    """ Class for working with system services """
    def __init__(self, initrdpath = '/etc/rc.d/init.d/'):
        self.observers = []
        self.initdpath = initrdpath
    
    def notify(self):
        """ Trigger all observers to run """
        for func in self.observers:
            func()

    def add_observer(self, method):
        """ Add an observer to the list of methods watching this object"""
        self.observers.append(method)
    
    def get_service_runlevel_status(self, service, runlevel):
        """ Returns True if service is enabled on runlevel, False otherwise"""
        rldir = "/etc/rc.d/rc%s.d"%runlevel
        for link in os.listdir(rldir):
            if link.startswith("S") and link.endswith(service):
                return True
        return False

    def enable_service(self, service, runlevel):
        """ enable service for runlevel"""
        assert str(runlevel) not in ("0", "6"), "Invalid runlevel"
        assert service in self.list_all_services(), "Unknown Service"
        
        cmd = ["/sbin/service", "-s", service, str(runlevel)]
        proc = get_popen(cmd)
        output = proc.communicate()[1]
        rcode = proc.returncode
        if rcode > 0:
            msg = ("Enabling service %s for runlevel %s failed."% (service, str(runlevel)))
            log.error(msg)
        else:
            log.debug("Service %s was enabled for runlevel %s"% (service, str(runlevel)))        
        return rcode, output
    
    def disable_service(self, service, runlevel):
        """ Disable service from runlevel """
        assert str(runlevel) not in ("0", "6"), "Invalid runlevel"
        assert service in self.list_all_services(), "Unknown service."

        cmd = ["/sbin/service", "-r", service, str(runlevel)]
        proc = get_popen(cmd)
        output = proc.communicate()[1]
        rcode = proc.returncode
        if rcode > 0:
            msg = ("Disabling service %s for runlevel %s failed."% (service, str(runlevel)))
            log.error(msg)
        else:
            log.debug("Service %s was disabled for runlevel %s"% (service, str(runlevel))) 
        return rcode, output
    
    def list_services_with_status(self):
        ret = []
        svclist = self.list_all_services()
        for svc in svclist:
            if svc == "sample": continue
            entry = [svc,True] # The True value we want to keep because this tells
                                # the UI this service is changable
            for x in range(2, 6):
                entry.append(self.get_service_runlevel_status(svc, x))
            if len(entry) > 1:
                ret.append(entry)
        return ret
    
    def list_all_services(self):
        return os.listdir(self.initdpath)
    
    def hasService(self, svcname):
        """ Check if a service is available on the system. """
        lst = self.list_all_services()
        return svcname in lst
    
    def getServiceStatus(self, svcname, runlevel):
        """ Check if svcname is enabled for runlevel.  Return True if svc is 
        enabled for that runlevel, or False if it is disabled """
        assert self.hasService(svcname) is True, 'No such service %s'% svcname
        return self.get_service_runlevel_status(svcname, runlevel)