Source

RNA-seq pipeline / modulebase.py

Full commit
# Time-stamp: <2011-11-23 15:31:28 sunhf>
import os
import sys
import subprocess
import logging

from subprocess import call as subpcall
from multiprocessing import Pool

logfhd = open("log", "w")
logging.basicConfig(level=20,
                    format='%(levelname)-5s[%(asctime)s]: %(message)s ',
                    datefmt='%a, %d %b %y %H:%M:%S',
                    stream=sys.stderr,
                    filemode="w"
                    )
error = logging.critical       
warn = logging.warning
def info(a):
    logging.info(a)
    logfhd.write(a+"\n")
    logfhd.flush()

class KeyboardInterruptError(Exception): pass

def run_cmd ( command ):
    """
    Run a command and save the command's string to the log file
    @type  command: str
    @param command: the command you want to run, for example, "ls -l"
    """
    info ("[Run ->] %s" % command)
    try:
        subpcall (command, shell=True)
    except KeyboardInterrupt:
        raise KeyboardInterruptError()
def run_cmd_pipe ( command ):
    """
    Run a command and save the command's string to the log file
    @type  command: str
    @param command: the command you want to run, for example, "ls -l"
    """
    info ("[Run <->] %s" % command)
    try:
        return subprocess.Popen(command,shell=True,stdout=-1).stdout.read()
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def check_cmd( command ):
    """
    Check whether a command is ex
    @type  command: str
    @param command: the command you want to run, for example, "ls -l"    
    """
    try:
        retcode = subpcall(command, shell=True,stdout=subprocess.PIPE)
        return True
    except OSError, e:
        print "adsfdfasd"
        print >>sys.stderr, "Execution failed:", e
        return False

def list_or_tuple(x):  
    return isinstance(x, (list, tuple))  
def flatten(sequence, to_expand=list_or_tuple):
    '''
    used to flatten a nested list

    >>> for x in flatten([1, 2, [3, [  ], 4, [5, 6], 7, [8,], ], 9]):print x,
    1 2 3 4 5 6 7 8 9
    '''
    for item in sequence:  
        if to_expand(item):  
            for subitem in flatten(item, to_expand):  
                yield subitem  
        else:  
            yield item 
    
class ModuleBase():
    '''
    Every step using a tool in RNA-seq pipleline derives from ModuleBase.
    
    It contains the basic function for file operations and data validation.
    >>> test = ModuleBase()
    >>> test.find_path("test","sampleA","Tophat")
    'test/sampleA/Tophat'
    >>> test.find_path("test","sampleA","Tophat","accept_hits.bam")
    'test/sampleA/Tophat/accept_hits.bam'
    >>> test.parse_name("test/sampleA/raw/sampleA.fastq")
    'sampleA'
    >>> test.parse_name("sampleB.fastq")
    'sampleB'

    '''    
    def __init__(self, name="Base", conf={}):
        '''
        @type name: str
        @param name: Descript the instance of a step
        @type conf: dict
        @param conf: The dictionary contains the configuration for current step, which is the result of 'read_config' and 'parse_conf' function
        '''
        self.name = name
        self.help_str = "Write your help information here"
        self.conf = conf
        self.tool_path = []
        self.data_path = {}
        self.lib_path = []
        self.cmd_list =[]
        
        self.check_multi_process()
        # validate the multi-processing setting
        
        if self.conf.has_key('output.outputdir'):
            self.output_dir_path = conf['output.outputdir']
        else:
            self.output_dir_path = './'
            
    def check_multi_process(self):
        ''' Check the config dictionary about the multiple processing
        '''
        if self.conf.has_key("performance.process_number"):
            p_num = int(self.conf['performance.process_number'])
            if p_num == 1:
                self.multi_process_ = False
            elif p_num>1:
                info("multi-processing mode on, %s parallel process availble"%p_num)
                self.multi_process_ = True
                self.process_num = p_num
            else:
                warn("the process number must be > = 1, multi-processing mode off")
                self.multi_process_ = False
        else:
            warn("No settings for multi-processing found, multi-processing mode off")            
            self.multi_process_ = False
        
    def check_lib(self):
        ''' Check the 'lib_path' variable'''
        for a_lib in self.lib_path:
            if not os.path.isfile(a_lib):
                error("Library %s doesn't exists ---> %s"%(a_lib, self.name))
                return False
            else:
                info("(lib-valid) %s ---> %s"%(a_lib, self.name))
        return True        
    def check_tool(self):
        ''' Check the 'tool_path' variable'''
        for a_tool in self.tool_path:
            if not os.path.isfile(a_tool):
                error("Tool %s doesn't exists ---> %s"%(a_tool, self.name))
                return False
            else:
                info("(tool-valid) %s ---> %s"%(a_tool, self.name))
        return True
    def check_data(self):
        ''' Check the data_path variable '''
        print self.data_path
        for a_data_family in self.data_path.values():
            # if there are several data paths under one key, it's
            # called data family
            if isinstance(a_data_family,str):
                a_data_family=[a_data_family]
            for a_data in flatten(a_data_family):
                if not os.path.isfile(a_data):
                    error("Data %s doesn't exists ---> %s"%(a_data, self.name))
                    return False
                else:
                    info("(data-valid) %s ---> %s"%(a_data, self.name))
        return True
    def check_output_dir(self):
        ''' Check the output_dir_ptah variable ''' 
        return self.exam_dir(self.output_dir_path)
    def exam_dir(self, a_path):
        ''' Exam if a path points to a directory'''        
        a_dir = self._convert_path(a_path)
        if os.path.isdir(a_dir):
            return True
        elif os.path.isfile(a_dir):
            error("There is file named with the same name of the output dir %s"%a_dir)
            error("Please 'rm %s'"%a_dir)
            sys.exit(1)
        else:
            # info("The directory doesn't exists, create one first.")
            return False
    def help_info(self):
        ''' Show verbose help infomation '''
        print self.help_str
    def main(self, arguments=None):
        ''' Start a step using the specified tool and data '''
        if self.cmd_list==[]:
            warn("No command input @ %s"%self.name)
            return False
        if not self.multi_process_:
            map(run_cmd, self.cmd_list)
        else:
            p=Pool (self.process_num)
            try:
                p.map(run_cmd, self.cmd_list)
                p.close()
            except KeyboardInterrupt:
                p.terminate()
                print 'pool is terminated'
            finally:
                p.join()


            
    def _convert_path(self, temp_name):
        ''' Convert a relative path to an absolute path '''
        if os.path.isabs(temp_name):
            return temp_name
        else:
            return os.path.join(os.getcwd(), temp_name)
    def get_dir(self, temp_name="temp"):
        ''' Create a directory if it doesn't exists '''
        temp_path = self._convert_path(temp_name)
        if self.exam_dir(temp_path) is False:
            cmd = "mkdir -p %s"%temp_path
            # make parent dir if needed
            run_cmd(cmd)
        return temp_path
    def release_file(self, temp_name="temp"):
        ''' Delete a temporary file '''
        temp_path = self._convert_path(temp_name)        
        if os.path.isfile(temp_path):
            cmd = "rm %s"%temp_path
            run_cmd(cmd)
        else:
            print "The file %s haven't be created yet, OR it's not a file"%temp_path
    def release_dir(self, temp_name="temp_dir"):
        ''' Delete a temporary directory '''        
        temp_path = self._convert_path(temp_name)
        if os.path.isdir(temp_path):
            cmd = "rm -r %s"%temp_path
            run_cmd(cmd)
        else:
            print "The dir %s haven't be created yet, OR it's not a dir"%temp_path
    def find_path(self, top_dir, data_name, tool_name="", file_name=""):
        '''
        Get the path in a multi-level folder
        @type top_dir: str
        @param top_dir: The 1st level of the output directory, maybe the 'output.outputdir' term in the 'conf' dictionary
        @type data_name: str
        @param data_name: The 2nd level of the output directory, maybe the terms in 'conf[datalist]' or 'conf[comparelist]'
        @type tool_name: str
        @param tool_name: The 3rd level of the output directory, maybe the name of a tool such as 'Tophat' or 'Degseq'
        @type file_name: str
        @param file_name: The final level, file name in the multi-level directory, leave it default if you want a directory
        '''
        if file_name=="":
            return os.path.join(top_dir, data_name, tool_name)
        else:
            return os.path.join(top_dir, data_name, tool_name, file_name)            
    def parse_name(self, file_path):
        ''' Get prefix name of a file but no folder name '''
        return os.path.splitext(file_path)[0].split("/")[-1]