Wiki
Clone wikisparkseq / MultisampleAnalysisTutorial
Introduction
This tutorial presents how to perform various operations on your BAM files using SparkSeq. We assume that you have the following components of your system up and running:
- Apache Spark cluster (either in standalone mode or using resource manager like Mesos or YARN) or Apache Spark local mode (More info here)
- SparkSeq compiled or downloaded from the unofficial Maven repository (please pay attention to HDFS version compatibility) and appropriate variables added to SPARK_CLASSPATH, ADD_JARS and SPARK_JAVA_OPTS environment (More info here)
- Apache Spark's REPL connected to a cluster or ran locally (More info here)
- HDFS NameNode(s) and DataNode(s)
During this tutorial it is used an RNA sequencing dataset with alignments limited to the chromosome Y, freely available at DERFinder's github. It should be copied this dataset to our local Hadoop cluster and form the following directory structure:
spark@sparkseq:/sandbox$ hadoop fs -lsr /BAM/64MB/derfinder/chrom_Y | cut -c61-
/BAM/64MB/derfinder/chrom_Y/F
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF23_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF2_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF33_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF40_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF55_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF56_Y.bam
/BAM/64MB/derfinder/chrom_Y/M
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF11_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF32_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF3_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF42_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF43_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF47_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF53_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF58_Y.bam
Yet another directory should be created for storing various BED files that can be downloaded from the project's page:
spark@sparkseq:/sandbox$ hadoop fs -lsr /BAM/64MB/aux | cut -c61-
/BAM/64MB/aux/Homo_sapiens.GRCh37.74_exons_chr_merged_id_st.bed
/BAM/64MB/aux/Homo_sapiens.GRCh37.74_exons_chr_sort_uniq.bed
/BAM/64MB/aux/Homo_sapiens.GRCh37.74_genes_chr_merged_swap.bed
If you are not sure what the SparkSeq's syntax is, you can always find more information at SparkSeq API Doc
1. Creating SparkSeqAnalysis object
The most important SparkSeq's class for performing multisample analyses is SparkSeqAnalysis. An object of this class stores all the information about the samples and references them. It features a simple API and together with Apache Spark's transformations make SparkSeq a powerful yet easy tool for RNA/DNA analyses.
So let's create a SparkSeqAnalysis object and attach all the BAM files from /BAM/64MB/derfinder/chrom_Y/M/ directory. We first initialize an object using the first sample and the rest of them we can very easily add using addBAM method. Let's run Apache Spark's REPL and set the maximum memory per worker machine to 2GB (as the default 512MB may be insufficient in some cases):
spark@sparkseq:~/spark-0.9.0-incubating/bin$ SPARK_MEM=2g ./spark-shell
and then create a SparkSeqAnalysis object:
import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
import pl.elka.pw.sparkseq.util.SparkSeqRegType._
val seqAnalysis = new SparkSeqAnalysis(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam",1,1,1)
val caseId = Array(11, 32, 3, 42, 43, 47, 53, 58)
for(i<-caseId)
seqAnalysis.addBAM(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF"+i.toString+"_Y.bam",i,1)
Then we can list all the samples has been attached using listSamples method:
scala> seqAnalysis.listSamples()
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF11_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF32_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF3_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF42_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF43_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF47_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF53_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF58_Y.bam
2. Counting and displaying
Let's start with some easy counting of samples and reads. We will use getSamples method to obtain all samples, getReads to get all reads and getSampleReads(11) to get all reads from the sample '11':
scala> seqAnalysis.getSamples.count
res2: Long = 9
scala> seqAnalysis.getReads.count
res3: Long = 925231
scala> seqAnalysis.getSampleReads(11).count
res5: Long = 120493
To get all reads info in a way similar to samtools view (first 5 reads):
scala> seqAnalysis.viewSampleReads(11,5)
011211_fcA_:6:1102:10430:158969:0:1 16 Y 2655174 50 100M * 0 0 CACTTCGCTGCAGAGTACCGAAGCGGGATCTGCGGGAAGCAAACTGCAATTCTTCGGCAGCATCTTCGCCTTCCGACGAGGTCGATACTTATAATTCGGG b^ccccccccccccca^cccccccccccccccccccccccddddddddeeeeggiiiihiiihiiiihhiiiiiiiiiiiiiihgefgggggeeecebbb MD:Z:100 XG:i:0 NH:i:1 NM:i:0 XM:i:0 XN:i:0 XO:i:0 AS:i:0YT:Z:UU
011211_fcA_:6:1308:17172:193560:0:1 16 Y 2658568 50 100M * 0 0 CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT acb_bccddcddddddcbbccca`bec\dghdhfhhhiihghfgehiihhiihfihfchgbeZbfhihgihdhhiihfcdfb_hggfggfggeeecebb_ MD:Z:100 XG:i:0 NH:i:1 NM:i:0 XM:i:0 XN:i:0 XO:i:0 AS:i:0YT:Z:UU
011211_fcA_:6:2201:7222:62493:0:1 16 Y 2658568 50 100M * 0 0 CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT aacccccddcdddddddb`b_cbcceegggihiiiiiihgeihgdhiiiiiihiiiiiiihiiiiihfiiiihihhhiighhiihgg^fgggeeeeebbb MD:Z:100 XG:i:0 NH:i:1 NM:i:0 XM:i:0 XN:i:0 XO:i:0 AS:i:0YT:Z:UU
011211_fcA_:6:2202:14432:150553:0:1 16 Y 2658568 50 100M * 0 0 CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT cccccccddcdddededcccccccceegggiiiiiiiiihfihfeiiiiiiiiiiihiiihihiiiiiiiiiiiiiiihghgiiiiigggggeeeeebbb MD:Z:100 XG:i:0 NH:i:1 NM:i:0 XM:i:0 XN:i:0 XO:i:0 AS:i:0YT:Z:UU
011211_fcA_:6:1301:11508:181776:0:1 272 Y 2675692 3 100M * 0 0 GAATGCTTCCAGTTTTTGCCCATTCAGTATGATATTGGCTGTGGGTTTGTTATAAATAGCTCTTATTTTGAAATACATTCCATCAATACCTAGTTTATTG cccccccccddcceeeeegggggiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiigiiiiiiiiiihgeggggeeeeebbb CC:Z:= MD:Z:100 XG:i:0 NH:i:2 HI:i:0 NM:i:0 XM:i:0 XN:i:0XO:i:0 CP:i:14961462 AS:i:0 YT:Z:UU
To get the read sequence only you may combine the methods in a following way:
scala> seqAnalysis.getSampleReads(11).take(5).map(r=> r._2.getReadString).foreach(println)
CACTTCGCTGCAGAGTACCGAAGCGGGATCTGCGGGAAGCAAACTGCAATTCTTCGGCAGCATCTTCGCCTTCCGACGAGGTCGATACTTATAATTCGGG
CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT
CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT
CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT
GAATGCTTCCAGTTTTTGCCCATTCAGTATGATATTGGCTGTGGGTTTGTTATAAATAGCTCTTATTTTGAAATACATTCCATCAATACCTAGTTTATTG
To get the average read length of all reads from all samples:
scala> seqAnalysis.getAvgReadLength
res1: Double = 100.45402715646146
3. Sorting
The following command can be used to sort all reads from all samples by sampleID and alignment start and then display the first 5 reads:
scala> seqAnalysis.sortReadsByAlign().take(5).foreach(println)
((1,(chrY,2675709)),300811_fcB_:1:1108:2837:104706:0:1 101b aligned read.)
((1,(chrY,2675709)),300811_fcB_:1:2101:14014:45631:0:1 101b aligned read.)
((1,(chrY,2675754)),300811_fcB_:1:1305:7608:74961:0:1 101b aligned read.)
((1,(chrY,2675923)),300811_fcB_:1:2304:9858:81704:0:1 101b aligned read.)
((1,(chrY,2676173)),300811_fcB_:1:2303:10751:139532:0:1 101b aligned read.)
To sort the reads from a specific sample by alignment start:
scala> seqAnalysis.sortSampleReadsByAlign(11).take(5).foreach(println)
((11,(chrY,2655174)),011211_fcA_:6:1102:10430:158969:0:1 100b aligned read.)
((11,(chrY,2658568)),011211_fcA_:6:1308:17172:193560:0:1 100b aligned read.)
((11,(chrY,2658568)),011211_fcA_:6:2201:7222:62493:0:1 100b aligned read.)
((11,(chrY,2658568)),011211_fcA_:6:2202:14432:150553:0:1 100b aligned read.)
((11,(chrY,2675692)),011211_fcA_:6:1301:11508:181776:0:1 100b aligned read.)
To implement custom sorting, e.g. to find reads with the greatest number of CIGAR operators( i.e. to sort descending by CIGARString length):
scala> seqAnalysis.getSampleReads(11)
.map(r=>(r._2.getCigarLength,(r._2.getCigarString,r._2.getReadName)))
.sortByKey(false)
.take(5)
.foreach(println)
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:1201:20258:60417:0:1))
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:1201:8410:133984:0:1))
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:1208:14033:185503:0:1))
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:2104:3708:73930:0:1))
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:2206:9709:108114:0:1))
4. Filtering
4.1 Filter and undo
SparkSeqAnalysis class provides a great range of methods that can facilitate process of filtering out reads that do not meet our requirements. All filters follow lazy evaluation principle - they are not executed until they are really needed (so until some of Apache Spark's actions such as count, reduce, collect, etc is not called).
Filters can be combined by calling one method after another, similarly as combining conditions using 'logical and' operator. If you need more sophisticated combinations of logical operators you can use a generic filterReads method that allows to create any conditions on sampleID and reads.
If you wish to undo all the filtering you can call undoFilter method.
Currently up to 2 conditions per method are supported. In order to set eg. 3 conditions on start of alignment you can just call the same method twice: 1. with 2 conditions and the 2. with the remaining condition. Please note that calling filter method for the n-th time is equivalent to combining conditions using 'and' operator.
4.2 Quality filtering
4.2.1 Using base-pair qualities
To filter out all reads that have the mean base quality < 30:
import pl.elka.pw.sparkseq.statisticalTests.SparkSeqStats
seqAnalysis.filterBaseQualities(SparkSeqStats.mean(_)>=30 ).count
res7: Long = 925176
To filter out reads that have at least 2 base with quality < 30:
scala> seqAnalysis.filterBaseQualities(_.sortBy(r=>(-r)).takeRight(2).head >= 30 ).count
res9: Long = 849744
Please remember to undo all the filtering before running each example:
scala> seqAnalysis.undoFilter
4.2.2 Using the alignment quality
To keep only reads with alignment quality equal or greater to 30:
scala> seqAnalysis.filterMappingQuality(_ >=30).count
res22: Long = 669871
4.3 Filtering with the alignment and reference names
To get only the reads that have start alignments between 2655174 and 2685880 in chrY:
scala> seqAnalysis.filterReferenceName(_ == "chrY")
scala> seqAnalysis.filterAlignmentStart(_ >= 2655174 && _ <= 2685880).count
res1: Long = 934
To get only the reads that are fitting into the interval [2655174,2700500] on chromosome Y:
scala> seqAnalysis.filterReferenceName(_ == "chrY")
scala> seqAnalysis.filterAlignmentStart(_ >= 2655174)
scala> seqAnalysis.filterAlignmentEnd(_ <= 2700500).count
res1: Long = 967
4.4 Filtering on read name and length
To get a read with a specific ID you can use the following method:
scala> seqAnalysis.filterReadName(_ == "011211_fcA_:6:1102:10430:158969:0:1")
scala> seqAnalysis.viewReads()
011211_fcA_:6:1102:10430:158969:0:1 16 Y 2655174 50 100M * 0 0 CACTTCGCTGCAGAGTACCGAAGCGGGATCTGCGGGAAGCAAACTGCAATTCTTCGGCAGCATCTTCGCCTTCCGACGAGGTCGATACTTATAATTCGGG b^ccccccccccccca^cccccccccccccccccccccccddddddddeeeeggiiiihiiihiiiihhiiiiiiiiiiiiiihgefgggggeeecebbb MD:Z:100 XG:i:0 NH:i:1 NM:i:0 XM:i:0 XN:i:0 XO:i:0 AS:i:0YT:Z:UU
4.5 Filtering on CIGAR Strings
To get only the reads with CIGAR String equal to "100M" and then to display 2 of them:
scala> seqAnalysis.filterCigarString( _ == "100M").count
res21: Long = 425718
scala> seqAnalysis.viewReads(2)
011211_fcA_:6:1102:10430:158969:0:1 16 Y 2655174 50 100M * 0 0 CACTTCGCTGCAGAGTACCGAAGCGGGATCTGCGGGAAGCAAACTGCAATTCTTCGGCAGCATCTTCGCCTTCCGACGAGGTCGATACTTATAATTCGGG b^ccccccccccccca^cccccccccccccccccccccccddddddddeeeeggiiiihiiihiiiihhiiiiiiiiiiiiiihgefgggggeeecebbb MD:Z:100 XG:i:0 NH:i:1 NM:i:0 XM:i:0 XN:i:0 XO:i:0 AS:i:0YT:Z:UU
011211_fcA_:6:1308:17172:193560:0:1 16 Y 2658568 50 100M * 0 0 CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT acb_bccddcddddddcbbccca`bec\dghdhfhhhiihghfgehiihhiihfihfchgbeZbfhihgihdhhiihfcdfb_hggfggfggeeecebb_ MD:Z:100 XG:i:0 NH:i:1 NM:i:0 XM:i:0 XN:i:0 XO:i:0 AS:i:0YT:Z:UU
To do some more sophisticated filtering you can also take advantage of regular expressions:
scala> val cigRegex = "^[0-9]+M[0-9]+D[0-9]+M$".r
scala> seqAnalysis.filterCigarString(cigRegex.pattern.matcher(_).matches)
scala> seqAnalysis.viewReads(2)
300811_fcB_:1:1302:12119:168462:0:1 16 Y 2712173 50 29M1D72M * 0 0 TCTTCCTCAGGAATAGACTCAAGTATGCGTGACTGGAGATGAGGTAAAGAAGATATGTATGCAACGTTTCATCAAAATTGATGGCAAGGTTCGAGTGGATG aa^Tcccccdcddcddbccdeed^c`bgihhghfgffiiihffihiiiiihfhiheiiiiiiiiiiiihihihihhdiiihiihfdhigggggeeeeebbb MD:Z:29^T72 XG:i:1 NH:i:1 NM:i:1 XM:i:0 XN:i:0 XO:i:1AS:i:-8 YT:Z:UU
300811_fcB_:1:2204:12824:4029:0:1 16 Y 2719503 50 27M4D74M * 0 0 CACCTCTAGCCTTACTTACCGTAGACTCAAACCTGGAATCTTAGCCCACTGCTTTGGTGCTTATGGACTATAGCTTCGTACTTTTCCACTTTAGATTACAG BBBBB^__bbbbb_bb^Za^]cbcb`g`]`_bdfbbZbhgb\Rgddgcc^]cebfdfgggdgbdbhhghgehhhge_gcbfhhgafd`aegegccacc__Z MD:Z:27^CAAA71G2 XG:i:4 NH:i:1 NM:i:5 XM:i:1 XN:i:0XO:i:1 AS:i:-23 YT:Z:UU
4.6 Filtering using the alignment (SAM) flags
To filter out read duplicates (non-primary alignments) :
scala> seqAnalysis.filterDuplicateReadFlag( _ == false ).count
res45: Long = 925231
To filter out unmapped reads:
scala> seqAnalysis.filterUnmappedFlag( _ == false).count
res1: Long = 925231
To filter out reads using other SAM flags (or their combinations) you can use filterFlags method and SAM flags calculator to find the necessary integer representation, e.g. 'not primary alignment'= 256 :
scala> seqAnalysis.filterFlags( _ != 256).count
res4: Long = 829036
4.7 Combining filters
Filters methods can be combined in the the following ways: either in a single method using various logical operators (currently up to 2 conditions supported) or by calling methods several times or by mixing both approaches:
scala> seqAnalysis.filterAlignmentStart(_ >= 2655174 && _ < 2685880).count
res1: Long = 934
scala> seqAnalysis.undoFilter
scala> seqAnalysis.filterAlignmentStart(_ >= 2655174)
scala> seqAnalysis.filterAlignmentStart(_ < 2685880).count
res4: Long = 934
4.8 Generic filter
There is also a generic filter method that makes it possible to set conditions (currently up to 2) on any BAMRecord property as well as sampleID(_._1 refers to sampleID whereas _._2 to read object):
scala> seqAnalysis.filterReads(_._2.getReferenceName == "Y" && _._1==1).count
res16: Long = 105691
Using this generic method you can implement any of the above specialized filters.
4.9 Selecting samples
To filter the reads from a specific sample:
scala> seqAnalysis.filterSample(_ == 32).count
res1: Long = 99955
5. Computing the coverage function and counts of reads in genomic regions
The coverage with reads, as a function defined on all the nucleotides of the reference genome as well as read counts for genomic regions (eg. exons or genes) are the typical summarizations in a sequencing experiment.
You can do this for the regions of interest, describing them in a BED-like, tab-delimited file format that contains the following columns:
chr regStart regEnd strand upperRegionID regionID
where regionID must be globally unique. RegionID must contain exactly 15 character, start with letters, end with digits and padded with zeroes.
For instance:
chr1 11869 12227 . ENSG00000223972 ENSE00002234944
chr1 11872 12227 . ENSG00000223972 ENSE00002234632
chr1 11874 12227 . ENSG00000223972 ENSE00002269724
Precomputed BED-like files for human genes and exons according to Ensembl can be downloaded from here.
5.1 Nucleotide-level coverage function
Coverage matrix is a generalization of a "coverage vector" and GenomicArray, which is explained here. It can be computed for all the positions in all samples or for some particular regions of interest. The following listings shows how to calculate, view (region) and save (all results):
scala> val baseCov = seqAnalysis.getCoverageBase()
scala> seqAnalysis.viewBaseCoverage("chrY",2675720,2675728)
Feature Sample_1 Sample_3 Sample_11 Sample_32 Sample_42 Sample_43 Sample_47 Sample_53 Sample_58
=====================================================================================================================
chrY,2675720 2 2 7 0 0 19 0 0 0
chrY,2675721 2 2 7 0 0 19 0 0 0
chrY,2675722 2 2 7 0 0 19 0 0 0
chrY,2675723 2 2 7 0 0 19 0 0 0
chrY,2675724 2 2 7 0 0 19 0 0 0
chrY,2675725 2 2 7 0 0 19 0 0 0
chrY,2675726 2 2 7 0 0 19 0 0 0
chrY,2675727 2 2 7 3 0 19 0 0 0
chrY,2675728 2 2 7 3 0 19 0 0 0
scala> seqAnalysis.saveBaseCoverageToFile("test_base.txt")
If need to calculate the coverage for a specific chromosome, or a region you can run the following commands:
scala> seqAnalysis.getCoverageBaseRegion("chrY",2600000,2700000)
scala> seqAnalysis.viewBaseCoverage("chrY",2675720,2675728)
Feature Sample_1 Sample_3 Sample_11 Sample_32 Sample_42 Sample_43 Sample_47 Sample_53 Sample_58
=====================================================================================================================
chrY,2675720 2 2 7 0 0 19 0 0 0
chrY,2675721 2 2 7 0 0 19 0 0 0
chrY,2675722 2 2 7 0 0 19 0 0 0
chrY,2675723 2 2 7 0 0 19 0 0 0
chrY,2675724 2 2 7 0 0 19 0 0 0
chrY,2675725 2 2 7 0 0 19 0 0 0
chrY,2675726 2 2 7 0 0 19 0 0 0
chrY,2675727 2 2 7 3 0 19 0 0 0
chrY,2675728 2 2 7 3 0 19 0 0 0
5.2 Exon-level counts
To calculate counts for 4 sample exons you can use getCoverageRegion method. The unionMode parameter is analogous to 'union' of htseq-count.
scala> val genExonsMapB = sc.broadcast(SparkSeqConversions.BEDFileToHashMap(sc, "/BAM/64MB/aux/Homo_sapiens.GRCh37.74_exons_chr_sort_uniq.bed"))
scala> seqAnalysis.getCoverageRegion(genExonsMapB, unionMode=true)
scala> val exonArray = Array("ENSE00000652498","ENSE00000652501","ENSE00000652502","ENSE00000652503")
scala> seqAnalysis.viewExonCoverage(exonArray)
Feature Sample_1 Sample_3 Sample_11 Sample_32 Sample_42 Sample_43 Sample_47 Sample_53 Sample_58
=====================================================================================================================
ENSE00000652498 186 251 135 168 131 140 100 198 149
ENSE00000652501 148 211 136 156 130 83 140 230 171
ENSE00000652502 207 383 173 198 83 129 188 176 169
ENSE00000652503 215 384 182 265 93 136 177 161 135
scala> seqAnalysis.saveFeatureCoverageToFile("test_exon.txt",Exon)
5.3 Gene-level counts
To view counts for 4 sample genes you can use also getCoverageRegion method. unionMode parameter is analogous to 'union' of htseq-count.
scala> val genesMapB = sc.broadcast(SparkSeqConversions.BEDFileToHashMap(sc, "/BAM/64MB/aux/Homo_sapiens.GRCh37.74_genes_chr_merged_swap.bed"))
scala> seqAnalysis.getCoverageRegion(genesMapB, unionMode=true)
scala> val geneArray = Array("ENSG00000012817","ENSG00000067048","ENSG00000067646","ENSG00000092377")
scala> seqAnalysis.viewGeneCoverage(geneArray)
Feature Sample_1 Sample_3 Sample_11 Sample_32 Sample_42 Sample_43 Sample_47 Sample_53 Sample_58
=====================================================================================================================
ENSG00000012817 8720 9103 8585 7375 5387 5835 7669 8175 7258
ENSG00000067048 6321 8647 5325 5598 3468 4140 4376 6827 4335
ENSG00000067646 1305 1419 1610 853 966 535 848 1043 920
ENSG00000092377 75 48 31 61 29 10 31 26 42
seqAnalysis.saveFeatureCoverageToFile("test_gene.txt",Gene)
6. Grouping
To get a coverage vector for a specific genome position across samples you can do the following:
scala> val seqBaseCov = seqAnalysis.getCoverageBaseRegion("chrY",2600000,2700000)
scala> seqBaseCov.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) )
.groupByKey()
.map(r=>(SparkSeqConversions.idToCoordinates(r._1),r._2))
.take(10)
.foreach(println)
((chrY,2658591),ArrayBuffer(3))
((chrY,2677791),ArrayBuffer(10, 10, 6, 5, 4, 3, 5))
((chrY,2655261),ArrayBuffer(1))
((chrY,2694913),ArrayBuffer(10))
((chrY,2677365),ArrayBuffer(6, 21, 5, 16, 1))
((chrY,2676565),ArrayBuffer(6))
((chrY,2692535),ArrayBuffer(3))
((chrY,2695925),ArrayBuffer(2))
((chrY,2677473),ArrayBuffer(11, 7, 24, 7, 17, 2, 8))
((chrY,2686485),ArrayBuffer(1))
7. Joining
In this section we will show how to do joins in SparkSeq in order to define more complex analyses using multiple SparkSeqAnalysis objects. If you are not familiar with the difference between full outer join and inner join, please consult Wikipedia. In the previous examples we have operated only on the male samples. To demonstrate how to operate on 2 SparkSeqAnalysis objects we will create another two for storing male and female samples separately:
import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
import pl.elka.pw.sparkseq.util.SparkSeqRegType._
val seqAnalysisMale = new SparkSeqAnalysis(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam",1,1,1)
val maleId = Array(11, 32, 3, 42, 43, 47, 53, 58)
for(i<-maleId)
seqAnalysisMale.addBAM(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF"+i.toString+"_Y.bam",i,1)
val seqAnalysisFemale = new SparkSeqAnalysis(sc,"/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF2_Y.bam",2,1,1)
val femaleId = Array(23, 33, 40, 55, 56)
for(i<-femaleId)
seqAnalysisFemale.addBAM(sc,"/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF"+i.toString+"_Y.bam",i,1)
And a quick check if all the samples are correctly attached:
scala> seqAnalysisMale.listSamples
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF11_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF32_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF3_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF42_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF43_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF47_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF53_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF58_Y.bam
scala> seqAnalysisFemale.listSamples
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF2_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF23_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF33_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF40_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF55_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF56_Y.bam
scala> seqAnalysisMale.getSamples.count
res7: Long = 9
scala> seqAnalysisFemale.getSamples.count
res8: Long = 6
Now we will do the coverage function summarization as described in the previous section on those 2 SparkSeqAnalysis objects separately in order to get a coverage vector across all the samples from the group:
scala> val seqBaseCovMale= seqAnalysisMale.getCoverageBaseRegion("chrY",2708570,2708870).map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey()
scala> val seqBaseCovFemale= seqAnalysisFemale.getCoverageBaseRegion"chrY",2708570,2708870).map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey()
7.1 Full outer join
To join coverage vectors of males and females by position even if there is no coverage for it in one of the groups you can use a full outer join. For position (chrY,2708650) there are no reads mapped to this position in any female sample and this is why we get empty ArrayBuffer. The commands are as follows:
scala> val outerJoin = seqBaseCovMale.cogroup(seqBaseCovFemale)
scala> outerJoin.filter(r=>r._2._1(0).length>=3).sortByKey().map(r=>(SparkSeqConversions.idToCoordinates(r._1),r._2)).take(10).foreach(println)
((chrY,2708650),(ArrayBuffer(ArrayBuffer(2, 12, 1)),ArrayBuffer()))
((chrY,2708651),(ArrayBuffer(ArrayBuffer(2, 12, 1)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708652),(ArrayBuffer(ArrayBuffer(12, 1, 2)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708653),(ArrayBuffer(ArrayBuffer(1, 12, 2)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708654),(ArrayBuffer(ArrayBuffer(12, 10, 1)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708655),(ArrayBuffer(ArrayBuffer(10, 1, 12)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708656),(ArrayBuffer(ArrayBuffer(1, 10, 12)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708657),(ArrayBuffer(ArrayBuffer(10, 12, 1)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708658),(ArrayBuffer(ArrayBuffer(1, 12, 1, 10)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708659),(ArrayBuffer(ArrayBuffer(1, 1, 12, 4, 10)),ArrayBuffer(ArrayBuffer(2))))
7.2 Inner join
To join coverage vectors if and only if the both groups have at least one element in a vector for a given genomic position you can do the inner join operation:
scala> val innerJoin = seqBaseCovMale.join(seqBaseCovFemale)
scala> innerJoin.filter(r=>r._2._2.length>=6).sortByKey().map(r=>(SparkSeqConversions.idToCoordinates(r._1),r._2)).take(10).foreach(println)
((chrY,2708700),(ArrayBuffer(8, 7, 10, 6, 27, 1, 1, 3),ArrayBuffer(1, 4, 4, 5, 3, 4)))
((chrY,2708701),(ArrayBuffer(8, 27, 7, 1, 3, 6, 12, 1),ArrayBuffer(5, 4, 4, 1, 4, 3)))
((chrY,2708702),(ArrayBuffer(7, 27, 8, 1, 1, 6, 3, 12),ArrayBuffer(4, 5, 4, 4, 1, 3)))
((chrY,2708703),(ArrayBuffer(12, 1, 3, 1, 27, 6, 7, 8),ArrayBuffer(4, 3, 6, 4, 4, 1)))
((chrY,2708704),(ArrayBuffer(7, 27, 1, 3, 6, 12, 8, 1),ArrayBuffer(4, 6, 3, 4, 1, 5)))
((chrY,2708705),(ArrayBuffer(3, 2, 27, 7, 13, 1, 6, 8),ArrayBuffer(1, 4, 4, 6, 4, 5)))
((chrY,2708706),(ArrayBuffer(1, 6, 7, 13, 3, 8, 27, 2),ArrayBuffer(5, 6, 6, 4, 5, 1)))
((chrY,2708707),(ArrayBuffer(27, 2, 6, 8, 13, 7, 3, 1),ArrayBuffer(4, 6, 5, 9, 5, 1)))
((chrY,2708708),(ArrayBuffer(8, 7, 27, 3, 2, 1, 13, 6),ArrayBuffer(5, 4, 5, 6, 9, 1)))
((chrY,2708709),(ArrayBuffer(27, 2, 8, 7, 6, 1, 13, 3),ArrayBuffer(4, 6, 9, 5, 1, 5)))
8. Junction reads analysis
scala> import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
scala> import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
scala> import pl.elka.pw.sparkseq.util.SparkSeqRegType._
scala> import pl.elka.pw.sparkseq.junctions.SparkSeqJunctionAnalysis
scala> val seqAnalysis = new SparkSeqAnalysis(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam",1,1,1)
scala> val caseId = Array(11, 32, 3, 42, 43, 47, 53, 58)
scala> for(i<-caseId)
seqAnalysis.addBAM(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF"+i.toString+"_Y.bam",i,1)
scala> val juncAnalysis = new SparkSeqJunctionAnalysis(seqAnalysis)
scala> juncAnalysis.getJuncReadsCounts()
scala> juncAnalysis.viewJuncReadsCounts(20)
SampleID ChrName StartPos EndPos Count
======================================================================
3 chrY 2722788 2733104 186
11 chrY 2722714 2733030 169
3 chrY 2722713 2733029 157
11 chrY 2722789 2733105 140
32 chrY 2710283 2712117 120
58 chrY 2710277 2712111 119
3 chrY 2710282 2712116 119
1 chrY 2710282 2712116 116
1 chrY 2733258 2734805 116
1 chrY 2722713 2733029 114
3 chrY 2733208 2734755 112
11 chrY 2710278 2712112 106
53 chrY 2710226 2712060 105
1 chrY 2710277 2712111 104
11 chrY 2710283 2712117 103
3 chrY 2712235 2713623 102
1 chrY 2722788 2733104 100
3 chrY 2733258 2734805 94
3 chrY 2713704 2722560 94
32 chrY 2722714 2733030 94
9. Using cache
If we are going to query results of computations several times, it may be a good idea to cache them in memory or/and disk. Apache Spark provide a few caching strategies that are described in section Apache Spark RDD persistence. Here we demonstrate using 2 of them and show the performance benefits. In either case we need to follow the same procedure:
- Mark RDD as cached and optionally choose cache storage level (using cache or persist method with storage level as a parameter)
- Populate the cache by running the query for the first time or by running some other Apache Spark's action like e.g. count
- Do the analyses
- Optionally purge the cache using 'unpersist' method
To cache results in memory:
scala> val seqBaseCovMale= seqAnalysisMale.getCoverageBase.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey()
scala> val posStart = SparkSeqConversions.chrToLong("chrY") + 2708701
scala> val posEnd = SparkSeqConversions.chrToLong("chrY") + 2708708
scala> seqBaseCovMale.filter(r=>r._1 >=posStart && r._1<=posEnd).collect.foreach(println)
14/03/28 11:10:34 INFO spark.SparkContext: Job finished: collect at <console>:26, took 7.4827817 s
(110002708701,ArrayBuffer(27, 8, 12, 3, 7, 1, 1, 6))
(110002708700,ArrayBuffer(1, 3, 1, 6, 27, 8, 10, 7))
(110002708703,ArrayBuffer(1, 3, 6, 8, 7, 27, 1, 12))
(110002708705,ArrayBuffer(13, 3, 6, 8, 1, 2, 27, 7))
(110002708702,ArrayBuffer(1, 8, 6, 27, 12, 3, 1, 7))
(110002708704,ArrayBuffer(1, 6, 12, 8, 27, 3, 1, 7))
(110002708707,ArrayBuffer(2, 27, 6, 1, 13, 7, 8, 3))
(110002708706,ArrayBuffer(2, 1, 13, 8, 27, 3, 6, 7))
(110002708708,ArrayBuffer(8, 7, 1, 3, 27, 6, 2, 13))
scala> val seqBaseCovMale= seqAnalysisMale.getCoverageBase.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey().cache
scala> seqBaseCovMale.count //populate memory cache
scala> seqBaseCovMale.filter(r=>r._1 >=posStart && r._1<=posEnd).collect.foreach(println)
14/03/28 11:11:37 INFO spark.SparkContext: Job finished: collect at <console>:26, took 0.2592477 s
(110002708701,ArrayBuffer(27, 8, 12, 3, 7, 1, 1, 6))
(110002708700,ArrayBuffer(1, 3, 1, 6, 27, 8, 10, 7))
(110002708703,ArrayBuffer(1, 3, 6, 8, 7, 27, 1, 12))
(110002708705,ArrayBuffer(13, 3, 6, 8, 1, 2, 27, 7))
(110002708702,ArrayBuffer(1, 8, 6, 27, 12, 3, 1, 7))
(110002708704,ArrayBuffer(1, 6, 12, 8, 27, 3, 1, 7))
(110002708707,ArrayBuffer(2, 27, 6, 1, 13, 7, 8, 3))
(110002708706,ArrayBuffer(2, 1, 13, 8, 27, 3, 6, 7))
(110002708708,ArrayBuffer(8, 7, 1, 3, 27, 6, 2, 13))
scala> seqBaseCovMale.filter(r=>r._1 >=posStart+5 && r._1<=posEnd+5).collect.foreach(println)
14/03/28 11:13:11 INFO spark.SparkContext: Job finished: collect at <console>:26, took 0.2585521 s
(110002708705,ArrayBuffer(13, 3, 6, 8, 1, 2, 27, 7))
(110002708713,ArrayBuffer(3, 6, 2, 8, 13, 27, 1, 7))
(110002708707,ArrayBuffer(2, 27, 6, 1, 13, 7, 8, 3))
(110002708712,ArrayBuffer(13, 7, 3, 27, 8, 2, 6, 1))
(110002708706,ArrayBuffer(2, 1, 13, 8, 27, 3, 6, 7))
(110002708709,ArrayBuffer(7, 1, 8, 27, 13, 3, 6, 2))
(110002708708,ArrayBuffer(8, 7, 1, 3, 27, 6, 2, 13))
(110002708711,ArrayBuffer(1, 2, 7, 3, 8, 13, 6, 27))
(110002708710,ArrayBuffer(2, 13, 1, 3, 7, 6, 27, 8))
To cache results on disk only:
scala> import org.apache.spark.storage.StorageLevel
scala> val seqBaseCovMale= seqAnalysisMale.getCoverageBase.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey().persist(StorageLevel.DISK_ONLY)
scala> seqBaseCovMale.count //populate memory cache
scala> seqBaseCovMale.filter(r=>r._1 >=posStart+5 && r._1<=posEnd+5).collect.foreach(println)
14/03/28 12:09:38 INFO spark.SparkContext: Job finished: collect at <console>:27, took 0.602364 s
(110002708705,ArrayBuffer(13, 3, 6, 8, 1, 2, 27, 7))
(110002708713,ArrayBuffer(3, 6, 2, 8, 13, 27, 1, 7))
(110002708707,ArrayBuffer(2, 27, 6, 1, 13, 7, 8, 3))
(110002708712,ArrayBuffer(13, 7, 3, 27, 8, 2, 6, 1))
(110002708706,ArrayBuffer(2, 1, 13, 8, 27, 3, 6, 7))
(110002708709,ArrayBuffer(7, 1, 8, 27, 13, 3, 6, 2))
(110002708708,ArrayBuffer(8, 7, 1, 3, 27, 6, 2, 13))
(110002708711,ArrayBuffer(1, 2, 7, 3, 8, 13, 6, 27))
(110002708710,ArrayBuffer(2, 13, 1, 3, 7, 6, 27, 8))
seqBaseCovMale.unpersist
Reading cached data from disk is obviously slower than from memory (0.6s vs 0.26s in this case) but it is still much faster then recompute everything from the scratch (7.5s)
10. Saving results
To summarize the feature saving methods:
Base coverage:
scala> val baseCov = seqAnalysis.getCoverageBase()
scala> seqAnalysis.saveBaseCoverageToFile("test_base.txt")
Exon counts:
scala> val genExonsMapB = sc.broadcast(SparkSeqConversions.BEDFileToHashMap(sc, "/BAM/64MB/aux/Homo_sapiens.GRCh37.74_exons_chr_sort_uniq.bed"))
scala> seqAnalysis.getCoverageRegion(genExonsMapB, unionMode=true)
scala> seqAnalysis.saveFeatureCoverageToFile("test_exon.txt",Exon)
Gene counts:
scala> val genesMapB = sc.broadcast(SparkSeqConversions.BEDFileToHashMap(sc, "/BAM/64MB/aux/Homo_sapiens.GRCh37.74_genes_chr_merged_swap.bed"))
scala> seqAnalysis.getCoverageRegion(genesMapB, unionMode=true)
seqAnalysis.saveFeatureCoverageToFile("test_gene.txt",Gene)
You can of course use the Apache Spark's way of saving results to file by calling 'saveAsTextFile' or 'saveAsObjectFile'(details). Note that that for using either of these methoda you need to specify a path to a file located on your HDFS storage while running an Apache Spark cluster.
11. Setting up complete data processing pipelines
Data processing pipelines may consist of a combination of
- alignment data filtering
- feature building - getting the information on counts, coverage function, exon junctions in given genomic regions.
- analysis of features - statistics, sanity checks, postprocessing
The building blocks from each group may be selected according to the needs of the particular analysis. Examples of the pipelines are:
- Typical pipeline for finding target genes with differential RNA expression or alternative splicing according to a fixed annotation.
Filtering: filtering out the reads with low quality (eg PHRED>30) and large, biologically unjustified (artefactual) alignment gaps (gap in CIGAR > 5000bp).
Feature building: Getting the counts of reads for all the genes and exons in the annotation.
Analysis: DESeq or edgeR according ( Anders et al, Nat Prot 2013), finding genes with splicing as those that have high variance of its exons median coverage, finding genes with splicing as those that have the high or significant splicing index (Gardina et al, Gen Biology, 2006)
- Pipeline for coverage based differential expression and splicing analysis with a special treatment of protein domains and repeatable regions for paired end RNA sequencing
Filtering: filtering out low quality and large gap alignments as above, plus filtering based on the distance between the alignments in the pair (eg. gap between R1 and R2 read > 400) and filtering out reads having non-primary alignments based on bitwise SAM flags in order to exclude repeatable regions in the analysis.
Feature building: calculating the coverage function on the reference genome for each sample.
Analysis: statistical test for each nucleotide position to find the local differential expression and finding genes with splicing as those that have high variance of its exons median coverage.
- Analysis of splicing in RNA sequencing using exon junctions ("very simplified cufflinks")
Filtering: filtering out low quality and large gap alignments as above
Feature building: finding the exon junctions, counting the junctions by genomic location
Analysis: excluding the junctions with low count (eg. 10) finding the genes with high variance of counts of junctions; finding the junctions not hooked on canonical splice start and stop sites.
- Finding the exon junctions with sufficient coverage of flanking exons ("cufflinks verification")
Filtering: filtering out low quality and large gap alignments as above
Feature building: finding the exon junctions, counting the junctions by genomic location; calculating the coverage function.
Analysis: analysis of junctions as above, finding the junctions with high or proportional coverage levels in the flanking areas.
12. Optimization hints
- First of all, read carefully the guide Apache Spark Tuning guide;
- Use numeric IDs. Do conversions to Strings or Tuples only for the results presentation;
- Filter as much and as early in your data processing as possible;
- While using position predicates convert your value to Long, not the other way round, e.g.: filter(r=>r._1==position )) not filter(r=>SparkSeqConversions.idToCoordinates(r._1)==("chrY",2708711)) :
scala> val seqCovBase= seqAnalysis.getCoverageBase.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).cache
scala> seqCovBase.count
res3: Long = 7156008
scala> val position=SparkSeqConversions.coordinatesToId(("chrY",2708711))
scala> seqCovBase.filter(r=>r._1==position).count
14/03/28 17:20:02 INFO spark.SparkContext: Job finished: count at <console>:24, took 0.5758662 s
res18: Long = 8
scala> seqCovBase.filter(r=>SparkSeqConversions.idToCoordinates(r._1)==("chrY",2708711)).count
14/03/28 17:20:21 INFO spark.SparkContext: Job finished: count at <console>:22, took 0.7433719 s
res20: Long = 8
With small dataset the computation time difference is negligible but it is still 30% in this simple case.
Updated