Source

RNA-seq pipeline / rnaseq_pipe.py

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
#! /usr/bin/env python
# Time-stamp: <2011-11-24 15:31:33 sunhf>
"""Rnaseq pipeline main script

Copyright (c) 2011 Hanfei Sun <hfsun.tju@gmail.com>

This code is free software; you can redistribute it and/or modify it
under the terms of the BSD License (see the file COPYING included with
the distribution).

@status:  experimental
@version: $Id$
@author:  Hanfei Sun
@contact: hfsun.tju@gmail.com
"""
import os
import sys
import string
import re
import subprocess
from ConfigParser import SafeConfigParser 

from modulebase import *


################
# custom alias #
################
space_join = lambda *args:" ".join(args)
List_append = list.append
parser = SafeConfigParser()
pjoin = os.path.join
get_output = run_cmd_pipe
#############
# main part #
#############

class Tophat(ModuleBase):
    ''' Use Tophat to map the Fastq file back to Genome'''
    def __init__(self, conf):
        ModuleBase.__init__(self, "Tophat align", conf)

        self.ebwt_index = conf["%s.bowtie_genome_index_path" %conf['specie.version']]
        self.gtf_file = conf["%s.gtf" %conf['specie.version']]
        
        List_append (self.tool_path, conf["tophat.main"])
        List_append (self.lib_path, self.gtf_file)
        # for tool, data, directory and static library validation

        for a_tag,a_fastq in conf['datalist'].items():
            a_dir = self.find_path(self.output_dir_path, a_tag, "Tophat")
            a_dir = self.get_dir(a_dir)

            if a_fastq != '@pair':     # for paired-end data
                self.data_path[a_tag]=a_fastq
                List_append (self.cmd_list,
                             space_join(conf['tophat.main'], '-G', self.gtf_file, '-o', a_dir, 
                                        self.ebwt_index, a_fastq, conf['tophat.extend_option']))
            else:               # for single-end data

                two_fastq = conf['paired_datalist'][a_tag]
                self.data_path[a_tag]=two_fastq
                List_append (self.cmd_list,
                             space_join(conf['tophat.main'], '-G', self.gtf_file, '-o', a_dir, "-p","5",
                                        "-r", conf['pairdefine.mate_inner_distance'], self.ebwt_index, 
                                        two_fastq[0], two_fastq[1], conf['tophat.extend_option']))

    def check_data(self):
        if not ModuleBase.check_data(self):
            return False
        if not os.path.isfile(self.ebwt_index+".1.ebwt"):
            error("bowtie genome index can't be found!")
            return False
        return True
    
class FastqcRaw(ModuleBase):
    ''' Use Fastqc to do the Quality Control for the raw Fastq file'''
    def __init__(self, conf):
        ModuleBase.__init__(self, name="quality control", conf=conf)
        self.conf=conf
        List_append (self.tool_path, conf["fastqc.main"])
        self.tag2fastqs = lambda tag: [self.conf['datalist'][tag]] if self.conf['datalist'][tag]!="@pair" else self.conf['paired_datalist'][tag]
        for a_tag in conf['datalist'].keys():
            a_dir = self.find_path(self.output_dir_path, a_tag, "FastqcRaw")
            a_dir = self.get_dir(a_dir)
            
            self.data_path[a_tag]=[]
            for a_fastq in self.tag2fastqs(a_tag):
                self.data_path[a_tag].append(a_fastq)
                List_append( self.cmd_list,
                             space_join(conf['fastqc.main'], '-q','-o', a_dir, a_fastq))
    def qc_result_summary(self):
        self.tag2realnames = lambda tag:map(self.parse_name,self.tag2fastqs(tag))
        self.realname2source = lambda tag,realname:self.find_path(self.conf['output.outputdir'], tag,"FastqcRaw",
                                                   realname+"_fastqc/fastqc_data.txt")
        for a_tag in self.conf['datalist']:
            for a_realname in self.tag2realnames(a_tag):
                print self.realname2source(a_tag,a_realname)
    def main(self):
        ModuleBase.main(self)
        self.qc_result_summary()
        
                
class FastqcMapped(ModuleBase):
    ''' Use Fastqc to do the Quality Control for the mapped BAM file'''
    def __init__(self, conf, source="Tophat"):
        ModuleBase.__init__(self, name="quality control mapped", conf=conf)
        List_append (self.tool_path, conf["fastqc.main"])
        self.tag2bam = lambda tag:self.find_path(conf['output.outputdir'], tag, source,"accepted_hits.bam")
        for a_tag in conf['datalist'].keys():
            self.data_path[a_tag]=self.tag2bam(a_tag)

        print self.data_path
        for a_tag,a_bam in self.data_path.items():
            a_dir = self.find_path(conf['output.outputdir'], a_tag,"FastqcMapped")
            a_dir = self.get_dir(a_dir)
            List_append( self.cmd_list, space_join(conf['fastqc.main'], '-q','-o', a_dir, a_bam))

            



class Bam2Bed(ModuleBase):
    '''
    Use Bam2Bed to convert the 'Bam' file to intervals('Bed' format)
    @attention: Only support Tophat's input now
    @note: 'tag' means the name of a data or a comparison, see [data]
    and [comparison] section in the config file
    '''
    def __init__(self, conf, source="Tophat"):
        ModuleBase.__init__(self, name="Convert from Bam to Bed", conf=conf)
        
        self.source = source
        # used to get the source, for example, the output bam file of Tophat
        self.tag2bam = lambda tag:self.find_path(conf['output.outputdir'], tag,
                                               self.source,"accepted_hits.bam")
        # generate the path of the input file from a tag, such as 'heart' or 'sample1'
        for a_tag in conf['datalist']:
            self.data_path[a_tag]=self.tag2bam(a_tag)
        List_append (self.tool_path, conf["tophat.bam2bed"])

        tag2bed = lambda tag:self.find_path(conf['output.outputdir'], tag,"Bam2bed","accepted_hits.bed")
        tag2dir = lambda tag:self.find_path(conf['output.outputdir'], tag,"Bam2bed")

        for a_tag,a_bam in self.data_path.items():
            a_dir = tag2dir(a_tag)
            a_dir = self.get_dir(a_dir)
            a_bed = tag2bed(a_tag)
            List_append (self.cmd_list, space_join(conf['tophat.bam2bed'], "-i", a_bam, ">", a_bed))

class Cufflinks(ModuleBase):
    '''
    Use cufflinks to do the transcriptome finding
    @attention: Only support Tophat's input now    
    '''    
    def __init__(self, conf, source="Tophat"):
        ModuleBase.__init__(self, name="Cufflinks transcripts finding", conf=conf)

        self.gtf_file = conf["%s.gtf" %conf['specie.version']]        
        self.tag2bam = lambda tag:self.find_path(conf['output.outputdir'], tag, source,"accepted_hits.bam")
        for a_tag in conf['datalist']:
            self.data_path[a_tag]=self.tag2bam(a_tag)
        List_append (self.tool_path, conf["cufflinks.main"])
        
        for a_tag,a_bam in self.data_path.items():
            a_dir = self.find_path(conf['output.outputdir'], a_tag,"Cufflinks")
            a_dir = self.get_dir(a_dir)
            List_append (self.cmd_list,
                         space_join(conf['cufflinks.main'],'-G', self.gtf_file,
                                    '--output-dir', a_dir, a_bam))
        
class gfold(ModuleBase):
    '''
    Use gfold to do the differential genes finding
    @attention: Only support BAM input from Tophat now
    '''        
    def __init__(self, conf, source="Tophat"):
        ModuleBase.__init__(self, name="gfold differential genes finding", conf=conf)
        self.tag2bam = lambda tag:self.find_path(conf['output.outputdir'], tag,
                                               source,"accepted_hits.bam")        
        self.annotation_file = conf["%s.gpf" %conf['specie.version']]
        for a_compare,two_group in conf['comparelist'].items():
            self.data_path[a_compare]=[map(self.tag2bam, two_group[0]),
                                       map(self.tag2bam, two_group[1])]
        List_append (self.lib_path, self.annotation_file)            
        List_append (self.tool_path, conf["gfold.main"])
        List_append (self.tool_path, conf["gfold.samtools"])
        
        self.cmd_list_count=[]
        self.cmd_list_diff=[]
        comma_join = lambda x:",".join(x)
        tag2dir = lambda tag:self.find_path(conf['output.outputdir'], tag,"gfold") # to dir
        tag2prefix = lambda tag:tag2dir(tag)+"/"+tag 
        tag2cnt = lambda tag:tag2prefix(tag)+".cnt"
        tag2diff = lambda tag:tag2prefix(tag)+".diff"
        bam2tag = lambda bam:bam.split("/")[-3]
        # the dir structure is like this
        # /xxx/xxx/outputdir/tag_name/Tophat/accepted.bam
        # So get the third one from right to left

        tags=[]
        for a_compare,two_group in self.data_path.items():
            self.get_dir(tag2dir(a_compare))
            for i in range(0,2):
                for a_bam in two_group[i]:
                    a_tag = bam2tag(a_bam)
                    # check whether this tag (data) has already been counted
                    if a_tag in tags:
                        continue
                    else:
                        tags.append (a_tag)
                    self.get_dir(tag2dir(a_tag))
                    List_append(self.cmd_list_count,
                                space_join(conf["gfold.samtools"],"view", a_bam,
                                           "|", conf["gfold.main"], "count",
                                           "-ann", self.annotation_file,
                                           "-tag stdin","-o", tag2cnt(a_tag)))

            List_append (self.cmd_list_diff,
                         space_join(conf["gfold.main"],"diff",
                                    "-s1", comma_join(map(tag2prefix, map(bam2tag, two_group[0]))),
                                    "-s2", comma_join(map(tag2prefix, map(bam2tag, two_group[1]))),
                                    "-suf .cnt","-o", tag2diff(a_compare)))

    def main(self):
        if not self.multi_process_:
            map(run_cmd, self.cmd_list_count + self.cmd_list_diff)
        else:
            p = Pool(self.process_num)
            p.map(run_cmd, self.cmd_list_count)
            p.map(run_cmd, self.cmd_list_diff)

class Summary_tophat(ModuleBase):
    def __init__(self, conf, source="Tophat"):
        ModuleBase.__init__(self, name="Make the graph of evenness for quality control", conf=conf)
        # Evenness hasn't supported multi process yet
        self.tag2bam = lambda tag:self.find_path(conf['output.outputdir'], tag, source,"accepted_hits.bam")
        self.tag2dir = lambda tag:self.find_path(conf['output.outputdir'], tag,"Summary_mapped")
        self.tag2even = lambda tag:self.tag2dir(tag)+"/"+tag
        self.tag2rscript = lambda tag:self.tag2even(tag)+".r"
        self.tag2mm = lambda tag:self.tag2dir(tag)+"/"+tag+".multimapped"
        # save the temp result for calculate uniq maping rate
        
        for a_tag in conf['datalist']:
            self.data_path[a_tag] = self.tag2bam(a_tag)
        self.gtf_file = conf["%s.gtf" %conf['specie.version']]

        List_append (self.tool_path, conf['gfold.samtools'])
        List_append (self.tool_path, pjoin(conf['scripts.rootdir'],"evenness_qclib.py"))
        List_append (self.tool_path, pjoin(conf['scripts.rootdir'],"Rlib/evenness_summary.r"))
        List_append (self.lib_path, self.gtf_file)

        self.for_summary=""
        self.summary_output_dir = self.find_path(conf['output.outputdir'], "Summary")
        self.summary_output_dir = self.get_dir(self.summary_output_dir)
        for a_tag,a_bam in self.data_path.items():
            a_dir = self.tag2dir(a_tag)
            a_dir = self.get_dir(a_dir)
            an_output = self.tag2even(a_tag)
            an_rscript = self.tag2rscript(a_tag)

            List_append (self.cmd_list,
                         space_join(conf['gfold.samtools'],'view', a_bam,"|",
                                    "python", pjoin(conf['scripts.rootdir'],"evenness_qclib.py"),
                                    self.gtf_file, an_output))
            List_append (self.cmd_list,
                         space_join(self.conf['gfold.samtools'],'view', a_bam,"|",
                                    "cut","-f1","|",
                                    "sort","|",
                                    "uniq","-c",">",self.tag2mm(a_tag)))
            
            self.for_summary += an_output+" "+a_tag+" "

        self.conf = conf
    def qc_for_mapped_cnt(self):
        '''Calculate how many counts are mapped or uniq mapped'''
        self.tag2mappableQC = lambda tag:self.tag2dir(tag)+"/"+tag+".mappable_cnt"
        self.is_paired = lambda tag: True if self.conf['datalist'][tag]=="@pair" else False
        self.tag2fastq = lambda tag: self.conf['paired_datalist'][tag][0] if self.is_paired(tag) else self.conf['datalist'][tag]
        get_first_string = lambda cmd: get_output(cmd).split(" ")[0]

        self.cnt_list=[]
        for a_tag,a_bam in self.data_path.items():
            an_output = self.tag2mappableQC(a_tag)
            self.cnt_list.append(an_output)
            a_fastq = self.tag2fastq(a_tag)
            print a_fastq
            with open(an_output,"w") as mappable_file:
                qc_content=""
                mapped_cnt = int(get_first_string(space_join("cat", self.tag2mm(a_tag),"|"
                                                             "wc","-l")))
                raw_cnt = int(get_first_string(space_join("wc","-l", a_fastq)))/4

                # A uniq read means it can map to a location uniquely,
                # it should appear only once in the bam file
                uniq_cnt = int(get_first_string(space_join("cat", self.tag2mm(a_tag),"|",
                                                           "grep","\ 1\ ","|",
                                                           "wc","-l")))


                if self.is_paired(a_tag):
                    # Only for pair-end data, an uniq read may appear twice in the bam file
                    uniq_pair_cnt = int(get_first_string(space_join("cat", self.tag2mm(a_tag),"|",
                                                                    "grep","\ 2\ ","|",
                                                                    "wc","-l")))
                    uniq_cnt += uniq_pair_cnt
                    qc_content += "QC for %s (BAM:%s FASTQ:%s) (pair-end data)\n"%(a_tag, a_bam, a_fastq)
                    qc_content += "%s mapped pair-end reads (count 1 time for multiple mapping)(mapped means at least 1 end of a paired-reads is mappable)\n"%mapped_cnt
                    qc_content += "%s unique pair-end reads(eliminate mutiple mapping)(uniq pair means at least 1 end of a paired-reads is uniq)\n"%uniq_cnt
                    qc_content += "%s total pair-end reads\n"%raw_cnt
                else:
                    qc_content += "QC for %s (BAM:%s FASTQ:%s)\n"%(a_tag, a_bam, a_fastq)
                    qc_content += "%s mapped reads (count 1 time for multiple mapping)\n"%mapped_cnt
                    qc_content += "%s unique reads (eliminate mutiple mapping)\n"%uniq_cnt
                    qc_content += "%s total reads\n"%raw_cnt
                qc_content += "%%%.2f percent reads are mappable\n"%((mapped_cnt/float(raw_cnt))*100)
                qc_content += "%%%.2f percent reads are uniq\n"%((uniq_cnt/float(raw_cnt))*100)
                qc_content += "Result file is at %s\n"%an_output+"-"*20
                print qc_content
                mappable_file.write(qc_content)

    def main(self):
        ModuleBase.main(self)
        self.qc_for_mapped_cnt()
        
        run_cmd(space_join("Rscript", pjoin(self.conf['scripts.rootdir'],"Rlib/evenness_summary.r"),
                           self.for_summary, self.summary_output_dir+"/evenness.pdf"))
        run_cmd(space_join("cat", space_join(*self.cnt_list), ">",
                           self.summary_output_dir+"/mappable.txt"))
            
class Summary_cufflinks(ModuleBase):
    def __init__(self, conf, source="Cufflinks"):
        ModuleBase.__init__(self, name="Correlation of the expression of genes", conf=conf)
        self.tag2exp = self.tag2bam = lambda tag:self.find_path(conf['output.outputdir'],
                                                              tag, source,"genes.fpkm_tracking")
        
        for a_tag in conf['datalist']:
            self.data_path[a_tag] = self.tag2exp(a_tag)
        self.outputdir = self.find_path(conf['output.outputdir'], "Summary")            
        self.get_dir(self.outputdir)

        cmd_pairs = space_join(*[an_exp+" "+a_tag for a_tag,an_exp in self.data_path.items()])
        List_append(self.cmd_list,
                    space_join("Rscript","Rlib/summary.r",
                               cmd_pairs,
                               self.outputdir+"/"+"rpkm_correlation"))
            

                          
alias_dict = dict(fastqc_raw=FastqcRaw, fastqc_mapped=FastqcMapped, tophat=Tophat, cufflinks=Cufflinks,
                  gfold=gfold, bam2bed= Bam2Bed, summary_mapped= Summary_tophat,
                  summary_rpkm=Summary_cufflinks)
def dependency_check(conf):
    dependency_dict = dict(fastqc_raw=[], tophat=[], fastqc_mapped=["tophat"], cufflinks=["tophat"],
                           gfold=["tophat"], bam2bed=["tophat"], summary_rpkm=["cufflinks"],
                           summary_naiveqc=["fastqc_raw"], summary_mapped=["tophat"])
    step_list_=conf['steps.steps'].strip().split(",")
    pre_step_list=[]
    for a_cursor in step_list_:
        deps = dependency_dict.get(a_cursor,[])
        for a_dep in deps:
            if a_dep not in pre_step_list:
                a_tip=("{0} need {1} as a dependency, please make sure you have run {1} previously.".format(a_cursor,a_dep))                
                if conf.get("strict.strict","False") == "False":
                    warn(a_tip)
                else:
                    error(a_tip)
                    sys.exit(1)

        if a_cursor not in pre_step_list:
            pre_step_list.append(a_cursor)

def read_config(configFile):
    """Read configuration file and parse it into a dictionary.

    In the dictionary, the key is the section name plus option name
    like: data.data.treatment_seq_file_path.
    
    @type configFile: str
    @param configFile: The path of config file    
    
    """
    configs = {}
    
    if len(parser.read(configFile)) == 0:
        raise IOError("%s not found!" % configFile)
    
    for sec in parser.sections():
        secName = string.lower(sec)
        for opt in parser.options(sec):
            optName = string.lower(opt)
            configs[secName + "." + optName] = string.strip(parser.get(sec, opt))
    dependency_check(configs)            
    parse_conf(configs)

    return configs

def parse_conf(conf):
    '''Parse the config dictionary to get the 'datalist' and 'comparelist' term

    @type conf:dict
    @param conf: The dictionary contains the configuration for current step, which is the result of 'read_config'

    >>> steps_=",".join(alias_dict.keys())
    >>> cfg={'comparison.AvsB':'A:B','comparison.CvsDE':'C:D,E','data.A':'sampleA.fastq','data.B':'sampleB.fastq','steps.steps':steps_}
    >>> parse_conf(cfg)
    >>> cfg['comparelist']['avsb']
    [['a'], ['b']]
    >>> cfg['comparelist']['cvsde']
    [['c'], ['d', 'e']]
    >>> cfg['datalist']
    {'a': 'sampleA.fastq', 'b': 'sampleB.fastq'}
    >>> cfg={'comparison.AvsB':'A:B','data.A':'sampleA_1.fastq,sampleA_2.fastq','data.B':'sampleB.fastq','steps.steps':steps_}
    >>> parse_conf(cfg)
    >>> cfg['datalist']
    {'a': '@pair', 'b': 'sampleB.fastq'}
    >>> cfg['paired_datalist']
    {'a': ['sampleA_1.fastq', 'sampleA_2.fastq']}

    '''
    compare_pre='comparison.'
    new_compare_pre='comparelist'    
    conf[new_compare_pre]={}
    conf['datalist']={}
    conf['paired_datalist']={}
    
    comma_split = lambda x:x.split(",")
    trim = lambda x,prefix:x[len(prefix):].lower()
    
    for a_term in conf:
        if a_term.startswith(compare_pre):
            compare_groups = conf[a_term].split(':')
            if len(compare_groups)==2:
                conf[new_compare_pre][trim(a_term,compare_pre)] = map(comma_split,
                                                                      map(string.lower, compare_groups))
            else:
                error("Format wrong in the %s part of config file"%compare_pre)
                sys.exit(1)
        elif a_term.startswith('data.'):
            if "," in conf[a_term]:
                if conf[a_term].count(",")!=1:
                    error("Only one comma can exsist in the conf section ["+a_term+"] => "+conf[a_term])
                    sys.exit(1)
                conf['paired_datalist'][trim(a_term,"data.")]=conf[a_term].split(",")
                conf['datalist'][trim(a_term,"data.")]='@pair'
            else:
                conf['datalist'][trim(a_term,"data.")]=conf[a_term]

    conf['steplist']=[alias_dict[alias] for alias in conf['steps.steps'].split(",")]


###########
# running #
###########
if __name__ == '__main__':
    if len(sys.argv)<2:
        print "Please choose a conf file!"
        sys.exit(1)
    try:
        cfg = read_config(sys.argv[1])
        my_steps = []
        print "Tool and library validating\n"+"="*50
        if len(sys.argv)==3:
            if sys.argv[2]=='q':
                cfg['steplist']=['fastqc'] # Add the ability to only do the QC in cmd line
                
        for a_tool in cfg['steplist']:
            an_app = a_tool(cfg)
            if an_app.check_tool() and an_app.check_lib():
                my_steps.append(an_app)
            else:
                error("Validation failed at %s"%an_app.name)
                sys.exit(1)

        for a_step in my_steps:
            if a_step.check_data() and a_step.check_output_dir():
                a_step.main()
            else:
                sys.exit(1)
    except KeyboardInterrupt:
        sys.stderr.write("User interrupt me! ;-) See you!\n")
        sys.exit(0)