Wiki

Clone wiki

sparkseq / MultisampleAnalysisTutorial

Introduction

This tutorial presents how to perform various operations on your BAM files using SparkSeq. We assume that you have the following components of your system up and running:

  • Apache Spark cluster (either in standalone mode or using resource manager like Mesos or YARN) or Apache Spark local mode (More info here)
  • SparkSeq compiled or downloaded from the unofficial Maven repository (please pay attention to HDFS version compatibility) and appropriate variables added to SPARK_CLASSPATH, ADD_JARS and SPARK_JAVA_OPTS environment (More info here)
  • Apache Spark's REPL connected to a cluster or ran locally (More info here)
  • HDFS NameNode(s) and DataNode(s)

During this tutorial it is used an RNA sequencing dataset with alignments limited to the chromosome Y, freely available at DERFinder's github. It should be copied this dataset to our local Hadoop cluster and form the following directory structure:

spark@sparkseq:/sandbox$ hadoop fs -lsr /BAM/64MB/derfinder/chrom_Y | cut -c61-

/BAM/64MB/derfinder/chrom_Y/F
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF23_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF2_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF33_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF40_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF55_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF56_Y.bam
/BAM/64MB/derfinder/chrom_Y/M
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF11_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF32_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF3_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF42_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF43_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF47_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF53_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF58_Y.bam

Yet another directory should be created for storing various BED files that can be downloaded from the project's page:

spark@sparkseq:/sandbox$ hadoop fs -lsr /BAM/64MB/aux | cut -c61-

/BAM/64MB/aux/Homo_sapiens.GRCh37.74_exons_chr_merged_id_st.bed
/BAM/64MB/aux/Homo_sapiens.GRCh37.74_exons_chr_sort_uniq.bed
/BAM/64MB/aux/Homo_sapiens.GRCh37.74_genes_chr_merged_swap.bed

If you are not sure what the SparkSeq's syntax is, you can always find more information at SparkSeq API Doc

1. Creating SparkSeqAnalysis object

The most important SparkSeq's class for performing multisample analyses is SparkSeqAnalysis. An object of this class stores all the information about the samples and references them. It features a simple API and together with Apache Spark's transformations make SparkSeq a powerful yet easy tool for RNA/DNA analyses.

So let's create a SparkSeqAnalysis object and attach all the BAM files from /BAM/64MB/derfinder/chrom_Y/M/ directory. We first initialize an object using the first sample and the rest of them we can very easily add using addBAM method. Let's run Apache Spark's REPL and set the maximum memory per worker machine to 2GB (as the default 512MB may be insufficient in some cases):

spark@sparkseq:~/spark-0.9.0-incubating/bin$ SPARK_MEM=2g ./spark-shell

and then create a SparkSeqAnalysis object:

import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
import pl.elka.pw.sparkseq.util.SparkSeqRegType._

val seqAnalysis = new SparkSeqAnalysis(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam",1,1,1)
val caseId = Array(11, 32, 3, 42, 43, 47, 53, 58)
for(i<-caseId)
  seqAnalysis.addBAM(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF"+i.toString+"_Y.bam",i,1)

Then we can list all the samples has been attached using listSamples method:

scala> seqAnalysis.listSamples()
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF11_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF32_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF3_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF42_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF43_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF47_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF53_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF58_Y.bam

2. Counting and displaying

Let's start with some easy counting of samples and reads. We will use getSamples method to obtain all samples, getReads to get all reads and getSampleReads(11) to get all reads from the sample '11':

scala> seqAnalysis.getSamples.count
res2: Long = 9

scala> seqAnalysis.getReads.count
res3: Long = 925231

scala> seqAnalysis.getSampleReads(11).count
res5: Long = 120493

To get all reads info in a way similar to samtools view (first 5 reads):

scala> seqAnalysis.viewSampleReads(11,5)

011211_fcA_:6:1102:10430:158969:0:1     16      Y       2655174 50      100M    *       0       0       CACTTCGCTGCAGAGTACCGAAGCGGGATCTGCGGGAAGCAAACTGCAATTCTTCGGCAGCATCTTCGCCTTCCGACGAGGTCGATACTTATAATTCGGG  b^ccccccccccccca^cccccccccccccccccccccccddddddddeeeeggiiiihiiihiiiihhiiiiiiiiiiiiiihgefgggggeeecebbb    MD:Z:100        XG:i:0  NH:i:1  NM:i:0  XM:i:0  XN:i:0  XO:i:0  AS:i:0YT:Z:UU

011211_fcA_:6:1308:17172:193560:0:1     16      Y       2658568 50      100M    *       0       0       CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT  acb_bccddcddddddcbbccca`bec\dghdhfhhhiihghfgehiihhiihfihfchgbeZbfhihgihdhhiihfcdfb_hggfggfggeeecebb_    MD:Z:100        XG:i:0  NH:i:1  NM:i:0  XM:i:0  XN:i:0  XO:i:0  AS:i:0YT:Z:UU

011211_fcA_:6:2201:7222:62493:0:1       16      Y       2658568 50      100M    *       0       0       CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT  aacccccddcdddddddb`b_cbcceegggihiiiiiihgeihgdhiiiiiihiiiiiiihiiiiihfiiiihihhhiighhiihgg^fgggeeeeebbb    MD:Z:100        XG:i:0  NH:i:1  NM:i:0  XM:i:0  XN:i:0  XO:i:0  AS:i:0YT:Z:UU

011211_fcA_:6:2202:14432:150553:0:1     16      Y       2658568 50      100M    *       0       0       CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT  cccccccddcdddededcccccccceegggiiiiiiiiihfihfeiiiiiiiiiiihiiihihiiiiiiiiiiiiiiihghgiiiiigggggeeeeebbb    MD:Z:100        XG:i:0  NH:i:1  NM:i:0  XM:i:0  XN:i:0  XO:i:0  AS:i:0YT:Z:UU

011211_fcA_:6:1301:11508:181776:0:1     272     Y       2675692 3       100M    *       0       0       GAATGCTTCCAGTTTTTGCCCATTCAGTATGATATTGGCTGTGGGTTTGTTATAAATAGCTCTTATTTTGAAATACATTCCATCAATACCTAGTTTATTG  cccccccccddcceeeeegggggiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiigiiiiiiiiiihgeggggeeeeebbb    CC:Z:=  MD:Z:100        XG:i:0  NH:i:2  HI:i:0  NM:i:0  XM:i:0  XN:i:0XO:i:0  CP:i:14961462   AS:i:0  YT:Z:UU

To get the read sequence only you may combine the methods in a following way:

scala> seqAnalysis.getSampleReads(11).take(5).map(r=> r._2.getReadString).foreach(println)
CACTTCGCTGCAGAGTACCGAAGCGGGATCTGCGGGAAGCAAACTGCAATTCTTCGGCAGCATCTTCGCCTTCCGACGAGGTCGATACTTATAATTCGGG
CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT
CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT
CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT
GAATGCTTCCAGTTTTTGCCCATTCAGTATGATATTGGCTGTGGGTTTGTTATAAATAGCTCTTATTTTGAAATACATTCCATCAATACCTAGTTTATTG

To get the average read length of all reads from all samples:

scala> seqAnalysis.getAvgReadLength
res1: Double = 100.45402715646146

3. Sorting

The following command can be used to sort all reads from all samples by sampleID and alignment start and then display the first 5 reads:

scala> seqAnalysis.sortReadsByAlign().take(5).foreach(println)

((1,(chrY,2675709)),300811_fcB_:1:1108:2837:104706:0:1 101b aligned read.)
((1,(chrY,2675709)),300811_fcB_:1:2101:14014:45631:0:1 101b aligned read.)
((1,(chrY,2675754)),300811_fcB_:1:1305:7608:74961:0:1 101b aligned read.)
((1,(chrY,2675923)),300811_fcB_:1:2304:9858:81704:0:1 101b aligned read.)
((1,(chrY,2676173)),300811_fcB_:1:2303:10751:139532:0:1 101b aligned read.)

To sort the reads from a specific sample by alignment start:

scala> seqAnalysis.sortSampleReadsByAlign(11).take(5).foreach(println)

((11,(chrY,2655174)),011211_fcA_:6:1102:10430:158969:0:1 100b aligned read.)
((11,(chrY,2658568)),011211_fcA_:6:1308:17172:193560:0:1 100b aligned read.)
((11,(chrY,2658568)),011211_fcA_:6:2201:7222:62493:0:1 100b aligned read.)
((11,(chrY,2658568)),011211_fcA_:6:2202:14432:150553:0:1 100b aligned read.)
((11,(chrY,2675692)),011211_fcA_:6:1301:11508:181776:0:1 100b aligned read.)

To implement custom sorting, e.g. to find reads with the greatest number of CIGAR operators( i.e. to sort descending by CIGARString length):

scala> seqAnalysis.getSampleReads(11)
.map(r=>(r._2.getCigarLength,(r._2.getCigarString,r._2.getReadName)))
.sortByKey(false)
.take(5)
.foreach(println)

(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:1201:20258:60417:0:1))
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:1201:8410:133984:0:1))
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:1208:14033:185503:0:1))
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:2104:3708:73930:0:1))
(11,(73M1I6M2D2M1I6M1D4M1I6M,011211_fcA_:6:2206:9709:108114:0:1))

4. Filtering

4.1 Filter and undo

SparkSeqAnalysis class provides a great range of methods that can facilitate process of filtering out reads that do not meet our requirements. All filters follow lazy evaluation principle - they are not executed until they are really needed (so until some of Apache Spark's actions such as count, reduce, collect, etc is not called).

Filters can be combined by calling one method after another, similarly as combining conditions using 'logical and' operator. If you need more sophisticated combinations of logical operators you can use a generic filterReads method that allows to create any conditions on sampleID and reads.

If you wish to undo all the filtering you can call undoFilter method.

Currently up to 2 conditions per method are supported. In order to set eg. 3 conditions on start of alignment you can just call the same method twice: 1. with 2 conditions and the 2. with the remaining condition. Please note that calling filter method for the n-th time is equivalent to combining conditions using 'and' operator.

4.2 Quality filtering

4.2.1 Using base-pair qualities

To filter out all reads that have the mean base quality < 30:

import pl.elka.pw.sparkseq.statisticalTests.SparkSeqStats
seqAnalysis.filterBaseQualities(SparkSeqStats.mean(_)>=30 ).count
res7: Long = 925176

To filter out reads that have at least 2 base with quality < 30:

scala> seqAnalysis.filterBaseQualities(_.sortBy(r=>(-r)).takeRight(2).head >= 30 ).count
res9: Long = 849744

Please remember to undo all the filtering before running each example:

scala> seqAnalysis.undoFilter

4.2.2 Using the alignment quality

To keep only reads with alignment quality equal or greater to 30:

scala> seqAnalysis.filterMappingQuality(_ >=30).count
res22: Long = 669871

4.3 Filtering with the alignment and reference names

To get only the reads that have start alignments between 2655174 and 2685880 in chrY:

scala> seqAnalysis.filterReferenceName(_ == "chrY")
scala> seqAnalysis.filterAlignmentStart(_ >= 2655174 && _ <= 2685880).count
res1: Long = 934

To get only the reads that are fitting into the interval [2655174,2700500] on chromosome Y:

scala> seqAnalysis.filterReferenceName(_ == "chrY")
scala> seqAnalysis.filterAlignmentStart(_ >= 2655174)
scala> seqAnalysis.filterAlignmentEnd(_ <= 2700500).count
res1: Long = 967

4.4 Filtering on read name and length

To get a read with a specific ID you can use the following method:

scala> seqAnalysis.filterReadName(_ == "011211_fcA_:6:1102:10430:158969:0:1")
scala> seqAnalysis.viewReads()
011211_fcA_:6:1102:10430:158969:0:1     16      Y       2655174 50      100M    *       0       0       CACTTCGCTGCAGAGTACCGAAGCGGGATCTGCGGGAAGCAAACTGCAATTCTTCGGCAGCATCTTCGCCTTCCGACGAGGTCGATACTTATAATTCGGG  b^ccccccccccccca^cccccccccccccccccccccccddddddddeeeeggiiiihiiihiiiihhiiiiiiiiiiiiiihgefgggggeeecebbb    MD:Z:100        XG:i:0  NH:i:1  NM:i:0  XM:i:0  XN:i:0  XO:i:0  AS:i:0YT:Z:UU

4.5 Filtering on CIGAR Strings

To get only the reads with CIGAR String equal to "100M" and then to display 2 of them:

scala> seqAnalysis.filterCigarString( _ == "100M").count
res21: Long = 425718

scala> seqAnalysis.viewReads(2)
011211_fcA_:6:1102:10430:158969:0:1     16      Y       2655174 50      100M    *       0       0       CACTTCGCTGCAGAGTACCGAAGCGGGATCTGCGGGAAGCAAACTGCAATTCTTCGGCAGCATCTTCGCCTTCCGACGAGGTCGATACTTATAATTCGGG  b^ccccccccccccca^cccccccccccccccccccccccddddddddeeeeggiiiihiiihiiiihhiiiiiiiiiiiiiihgefgggggeeecebbb    MD:Z:100        XG:i:0  NH:i:1  NM:i:0  XM:i:0  XN:i:0  XO:i:0  AS:i:0YT:Z:UU

011211_fcA_:6:1308:17172:193560:0:1     16      Y       2658568 50      100M    *       0       0       CTCCCAGATGCATATATTACAGGACGGGGTGTGTGAGGAGACTCACTTGGGGGTGGCAGTCAACAAAATTTAAATAAAGACACGAGATTTTATTATTTTT  acb_bccddcddddddcbbccca`bec\dghdhfhhhiihghfgehiihhiihfihfchgbeZbfhihgihdhhiihfcdfb_hggfggfggeeecebb_    MD:Z:100        XG:i:0  NH:i:1  NM:i:0  XM:i:0  XN:i:0  XO:i:0  AS:i:0YT:Z:UU

To do some more sophisticated filtering you can also take advantage of regular expressions:

scala> val cigRegex = "^[0-9]+M[0-9]+D[0-9]+M$".r
scala> seqAnalysis.filterCigarString(cigRegex.pattern.matcher(_).matches)
scala> seqAnalysis.viewReads(2)
300811_fcB_:1:1302:12119:168462:0:1     16      Y       2712173 50      29M1D72M        *       0       0       TCTTCCTCAGGAATAGACTCAAGTATGCGTGACTGGAGATGAGGTAAAGAAGATATGTATGCAACGTTTCATCAAAATTGATGGCAAGGTTCGAGTGGATG aa^Tcccccdcddcddbccdeed^c`bgihhghfgffiiihffihiiiiihfhiheiiiiiiiiiiiihihihihhdiiihiihfdhigggggeeeeebbb   MD:Z:29^T72     XG:i:1  NH:i:1  NM:i:1  XM:i:0  XN:i:0  XO:i:1AS:i:-8 YT:Z:UU

300811_fcB_:1:2204:12824:4029:0:1       16      Y       2719503 50      27M4D74M        *       0       0       CACCTCTAGCCTTACTTACCGTAGACTCAAACCTGGAATCTTAGCCCACTGCTTTGGTGCTTATGGACTATAGCTTCGTACTTTTCCACTTTAGATTACAG BBBBB^__bbbbb_bb^Za^]cbcb`g`]`_bdfbbZbhgb\Rgddgcc^]cebfdfgggdgbdbhhghgehhhge_gcbfhhgafd`aegegccacc__Z   MD:Z:27^CAAA71G2        XG:i:4  NH:i:1  NM:i:5  XM:i:1  XN:i:0XO:i:1  AS:i:-23        YT:Z:UU

4.6 Filtering using the alignment (SAM) flags

To filter out read duplicates (non-primary alignments) :

scala> seqAnalysis.filterDuplicateReadFlag( _ == false ).count
res45: Long = 925231

To filter out unmapped reads:

scala> seqAnalysis.filterUnmappedFlag( _ == false).count
res1: Long = 925231

To filter out reads using other SAM flags (or their combinations) you can use filterFlags method and SAM flags calculator to find the necessary integer representation, e.g. 'not primary alignment'= 256 :

scala> seqAnalysis.filterFlags( _ != 256).count
res4: Long = 829036

4.7 Combining filters

Filters methods can be combined in the the following ways: either in a single method using various logical operators (currently up to 2 conditions supported) or by calling methods several times or by mixing both approaches:

scala> seqAnalysis.filterAlignmentStart(_ >= 2655174 && _ < 2685880).count
res1: Long = 934

scala> seqAnalysis.undoFilter

scala> seqAnalysis.filterAlignmentStart(_ >= 2655174)
scala> seqAnalysis.filterAlignmentStart(_ < 2685880).count
res4: Long = 934

4.8 Generic filter

There is also a generic filter method that makes it possible to set conditions (currently up to 2) on any BAMRecord property as well as sampleID(_._1 refers to sampleID whereas _._2 to read object):

scala> seqAnalysis.filterReads(_._2.getReferenceName == "Y" && _._1==1).count
res16: Long = 105691

Using this generic method you can implement any of the above specialized filters.

4.9 Selecting samples

To filter the reads from a specific sample:

scala> seqAnalysis.filterSample(_ == 32).count
res1: Long = 99955

5. Computing the coverage function and counts of reads in genomic regions

The coverage with reads, as a function defined on all the nucleotides of the reference genome as well as read counts for genomic regions (eg. exons or genes) are the typical summarizations in a sequencing experiment.

You can do this for the regions of interest, describing them in a BED-like, tab-delimited file format that contains the following columns:

chr    regStart regEnd  strand  upperRegionID   regionID

where regionID must be globally unique. RegionID must contain exactly 15 character, start with letters, end with digits and padded with zeroes.

For instance:

chr1    11869   12227   .       ENSG00000223972 ENSE00002234944
chr1    11872   12227   .       ENSG00000223972 ENSE00002234632
chr1    11874   12227   .       ENSG00000223972 ENSE00002269724

Precomputed BED-like files for human genes and exons according to Ensembl can be downloaded from here.

5.1 Nucleotide-level coverage function

Coverage matrix is a generalization of a "coverage vector" and GenomicArray, which is explained here. It can be computed for all the positions in all samples or for some particular regions of interest. The following listings shows how to calculate, view (region) and save (all results):

scala> val baseCov = seqAnalysis.getCoverageBase()
scala> seqAnalysis.viewBaseCoverage("chrY",2675720,2675728)
Feature                  Sample_1  Sample_3  Sample_11 Sample_32 Sample_42 Sample_43 Sample_47 Sample_53 Sample_58
=====================================================================================================================
chrY,2675720             2         2         7         0         0         19        0         0         0
chrY,2675721             2         2         7         0         0         19        0         0         0
chrY,2675722             2         2         7         0         0         19        0         0         0
chrY,2675723             2         2         7         0         0         19        0         0         0
chrY,2675724             2         2         7         0         0         19        0         0         0
chrY,2675725             2         2         7         0         0         19        0         0         0
chrY,2675726             2         2         7         0         0         19        0         0         0
chrY,2675727             2         2         7         3         0         19        0         0         0
chrY,2675728             2         2         7         3         0         19        0         0         0

scala> seqAnalysis.saveBaseCoverageToFile("test_base.txt")

If need to calculate the coverage for a specific chromosome, or a region you can run the following commands:

scala> seqAnalysis.getCoverageBaseRegion("chrY",2600000,2700000)
scala> seqAnalysis.viewBaseCoverage("chrY",2675720,2675728)

Feature                  Sample_1  Sample_3  Sample_11 Sample_32 Sample_42 Sample_43 Sample_47 Sample_53 Sample_58
=====================================================================================================================
chrY,2675720             2         2         7         0         0         19        0         0         0
chrY,2675721             2         2         7         0         0         19        0         0         0
chrY,2675722             2         2         7         0         0         19        0         0         0
chrY,2675723             2         2         7         0         0         19        0         0         0
chrY,2675724             2         2         7         0         0         19        0         0         0
chrY,2675725             2         2         7         0         0         19        0         0         0
chrY,2675726             2         2         7         0         0         19        0         0         0
chrY,2675727             2         2         7         3         0         19        0         0         0
chrY,2675728             2         2         7         3         0         19        0         0         0

5.2 Exon-level counts

To calculate counts for 4 sample exons you can use getCoverageRegion method. The unionMode parameter is analogous to 'union' of htseq-count.

scala> val genExonsMapB = sc.broadcast(SparkSeqConversions.BEDFileToHashMap(sc, "/BAM/64MB/aux/Homo_sapiens.GRCh37.74_exons_chr_sort_uniq.bed"))
scala> seqAnalysis.getCoverageRegion(genExonsMapB, unionMode=true)
scala> val exonArray = Array("ENSE00000652498","ENSE00000652501","ENSE00000652502","ENSE00000652503")
scala> seqAnalysis.viewExonCoverage(exonArray)

Feature                  Sample_1  Sample_3  Sample_11 Sample_32 Sample_42 Sample_43 Sample_47 Sample_53 Sample_58
=====================================================================================================================
ENSE00000652498          186       251       135       168       131       140       100       198       149
ENSE00000652501          148       211       136       156       130       83        140       230       171
ENSE00000652502          207       383       173       198       83        129       188       176       169
ENSE00000652503          215       384       182       265       93        136       177       161       135

scala> seqAnalysis.saveFeatureCoverageToFile("test_exon.txt",Exon)

5.3 Gene-level counts

To view counts for 4 sample genes you can use also getCoverageRegion method. unionMode parameter is analogous to 'union' of htseq-count.

scala> val genesMapB = sc.broadcast(SparkSeqConversions.BEDFileToHashMap(sc, "/BAM/64MB/aux/Homo_sapiens.GRCh37.74_genes_chr_merged_swap.bed"))
scala> seqAnalysis.getCoverageRegion(genesMapB, unionMode=true)
scala> val geneArray = Array("ENSG00000012817","ENSG00000067048","ENSG00000067646","ENSG00000092377")
scala> seqAnalysis.viewGeneCoverage(geneArray)

Feature                  Sample_1  Sample_3  Sample_11 Sample_32 Sample_42 Sample_43 Sample_47 Sample_53 Sample_58
=====================================================================================================================
ENSG00000012817          8720      9103      8585      7375      5387      5835      7669      8175      7258
ENSG00000067048          6321      8647      5325      5598      3468      4140      4376      6827      4335
ENSG00000067646          1305      1419      1610      853       966       535       848       1043      920
ENSG00000092377          75        48        31        61        29        10        31        26        42

seqAnalysis.saveFeatureCoverageToFile("test_gene.txt",Gene)

6. Grouping

To get a coverage vector for a specific genome position across samples you can do the following:

scala> val seqBaseCov = seqAnalysis.getCoverageBaseRegion("chrY",2600000,2700000)
scala> seqBaseCov.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) )
.groupByKey()
.map(r=>(SparkSeqConversions.idToCoordinates(r._1),r._2))
.take(10)
.foreach(println)

((chrY,2658591),ArrayBuffer(3))
((chrY,2677791),ArrayBuffer(10, 10, 6, 5, 4, 3, 5))
((chrY,2655261),ArrayBuffer(1))
((chrY,2694913),ArrayBuffer(10))
((chrY,2677365),ArrayBuffer(6, 21, 5, 16, 1))
((chrY,2676565),ArrayBuffer(6))
((chrY,2692535),ArrayBuffer(3))
((chrY,2695925),ArrayBuffer(2))
((chrY,2677473),ArrayBuffer(11, 7, 24, 7, 17, 2, 8))
((chrY,2686485),ArrayBuffer(1))

7. Joining

In this section we will show how to do joins in SparkSeq in order to define more complex analyses using multiple SparkSeqAnalysis objects. If you are not familiar with the difference between full outer join and inner join, please consult Wikipedia. In the previous examples we have operated only on the male samples. To demonstrate how to operate on 2 SparkSeqAnalysis objects we will create another two for storing male and female samples separately:

import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
import pl.elka.pw.sparkseq.util.SparkSeqRegType._

val seqAnalysisMale = new SparkSeqAnalysis(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam",1,1,1)
val maleId = Array(11, 32, 3, 42, 43, 47, 53, 58)
for(i<-maleId)
    seqAnalysisMale.addBAM(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF"+i.toString+"_Y.bam",i,1)

val seqAnalysisFemale = new SparkSeqAnalysis(sc,"/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF2_Y.bam",2,1,1)
val femaleId = Array(23, 33, 40, 55, 56)
for(i<-femaleId)
    seqAnalysisFemale.addBAM(sc,"/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF"+i.toString+"_Y.bam",i,1)

And a quick check if all the samples are correctly attached:

scala> seqAnalysisMale.listSamples
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF11_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF32_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF3_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF42_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF43_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF47_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF53_Y.bam
/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF58_Y.bam

scala> seqAnalysisFemale.listSamples
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF2_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF23_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF33_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF40_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF55_Y.bam
/BAM/64MB/derfinder/chrom_Y/F/orbFrontalF56_Y.bam

scala> seqAnalysisMale.getSamples.count
res7: Long = 9

scala> seqAnalysisFemale.getSamples.count
res8: Long = 6

Now we will do the coverage function summarization as described in the previous section on those 2 SparkSeqAnalysis objects separately in order to get a coverage vector across all the samples from the group:

scala> val seqBaseCovMale=  seqAnalysisMale.getCoverageBaseRegion("chrY",2708570,2708870).map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey()

scala> val seqBaseCovFemale=  seqAnalysisFemale.getCoverageBaseRegion"chrY",2708570,2708870).map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey()

7.1 Full outer join

To join coverage vectors of males and females by position even if there is no coverage for it in one of the groups you can use a full outer join. For position (chrY,2708650) there are no reads mapped to this position in any female sample and this is why we get empty ArrayBuffer. The commands are as follows:

scala> val outerJoin = seqBaseCovMale.cogroup(seqBaseCovFemale)
scala> outerJoin.filter(r=>r._2._1(0).length>=3).sortByKey().map(r=>(SparkSeqConversions.idToCoordinates(r._1),r._2)).take(10).foreach(println)

((chrY,2708650),(ArrayBuffer(ArrayBuffer(2, 12, 1)),ArrayBuffer()))
((chrY,2708651),(ArrayBuffer(ArrayBuffer(2, 12, 1)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708652),(ArrayBuffer(ArrayBuffer(12, 1, 2)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708653),(ArrayBuffer(ArrayBuffer(1, 12, 2)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708654),(ArrayBuffer(ArrayBuffer(12, 10, 1)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708655),(ArrayBuffer(ArrayBuffer(10, 1, 12)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708656),(ArrayBuffer(ArrayBuffer(1, 10, 12)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708657),(ArrayBuffer(ArrayBuffer(10, 12, 1)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708658),(ArrayBuffer(ArrayBuffer(1, 12, 1, 10)),ArrayBuffer(ArrayBuffer(2))))
((chrY,2708659),(ArrayBuffer(ArrayBuffer(1, 1, 12, 4, 10)),ArrayBuffer(ArrayBuffer(2))))

7.2 Inner join

To join coverage vectors if and only if the both groups have at least one element in a vector for a given genomic position you can do the inner join operation:

scala> val innerJoin = seqBaseCovMale.join(seqBaseCovFemale)
scala> innerJoin.filter(r=>r._2._2.length>=6).sortByKey().map(r=>(SparkSeqConversions.idToCoordinates(r._1),r._2)).take(10).foreach(println)

((chrY,2708700),(ArrayBuffer(8, 7, 10, 6, 27, 1, 1, 3),ArrayBuffer(1, 4, 4, 5, 3, 4)))
((chrY,2708701),(ArrayBuffer(8, 27, 7, 1, 3, 6, 12, 1),ArrayBuffer(5, 4, 4, 1, 4, 3)))
((chrY,2708702),(ArrayBuffer(7, 27, 8, 1, 1, 6, 3, 12),ArrayBuffer(4, 5, 4, 4, 1, 3)))
((chrY,2708703),(ArrayBuffer(12, 1, 3, 1, 27, 6, 7, 8),ArrayBuffer(4, 3, 6, 4, 4, 1)))
((chrY,2708704),(ArrayBuffer(7, 27, 1, 3, 6, 12, 8, 1),ArrayBuffer(4, 6, 3, 4, 1, 5)))
((chrY,2708705),(ArrayBuffer(3, 2, 27, 7, 13, 1, 6, 8),ArrayBuffer(1, 4, 4, 6, 4, 5)))
((chrY,2708706),(ArrayBuffer(1, 6, 7, 13, 3, 8, 27, 2),ArrayBuffer(5, 6, 6, 4, 5, 1)))
((chrY,2708707),(ArrayBuffer(27, 2, 6, 8, 13, 7, 3, 1),ArrayBuffer(4, 6, 5, 9, 5, 1)))
((chrY,2708708),(ArrayBuffer(8, 7, 27, 3, 2, 1, 13, 6),ArrayBuffer(5, 4, 5, 6, 9, 1)))
((chrY,2708709),(ArrayBuffer(27, 2, 8, 7, 6, 1, 13, 3),ArrayBuffer(4, 6, 9, 5, 1, 5)))

8. Junction reads analysis

scala> import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
scala> import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
scala> import pl.elka.pw.sparkseq.util.SparkSeqRegType._
scala> import pl.elka.pw.sparkseq.junctions.SparkSeqJunctionAnalysis

scala> val seqAnalysis = new SparkSeqAnalysis(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF1_Y.bam",1,1,1)
scala> val caseId = Array(11, 32, 3, 42, 43, 47, 53, 58)
scala> for(i<-caseId)
  seqAnalysis.addBAM(sc,"/BAM/64MB/derfinder/chrom_Y/M/orbFrontalF"+i.toString+"_Y.bam",i,1)

scala> val juncAnalysis = new SparkSeqJunctionAnalysis(seqAnalysis)
scala> juncAnalysis.getJuncReadsCounts()
scala> juncAnalysis.viewJuncReadsCounts(20)
SampleID  ChrName   StartPos            EndPos              Count
======================================================================
3         chrY      2722788             2733104             186
11        chrY      2722714             2733030             169
3         chrY      2722713             2733029             157
11        chrY      2722789             2733105             140
32        chrY      2710283             2712117             120
58        chrY      2710277             2712111             119
3         chrY      2710282             2712116             119
1         chrY      2710282             2712116             116
1         chrY      2733258             2734805             116
1         chrY      2722713             2733029             114
3         chrY      2733208             2734755             112
11        chrY      2710278             2712112             106
53        chrY      2710226             2712060             105
1         chrY      2710277             2712111             104
11        chrY      2710283             2712117             103
3         chrY      2712235             2713623             102
1         chrY      2722788             2733104             100
3         chrY      2733258             2734805             94
3         chrY      2713704             2722560             94
32        chrY      2722714             2733030             94

9. Using cache

If we are going to query results of computations several times, it may be a good idea to cache them in memory or/and disk. Apache Spark provide a few caching strategies that are described in section Apache Spark RDD persistence. Here we demonstrate using 2 of them and show the performance benefits. In either case we need to follow the same procedure:

  1. Mark RDD as cached and optionally choose cache storage level (using cache or persist method with storage level as a parameter)
  2. Populate the cache by running the query for the first time or by running some other Apache Spark's action like e.g. count
  3. Do the analyses
  4. Optionally purge the cache using 'unpersist' method

To cache results in memory:

scala> val seqBaseCovMale=  seqAnalysisMale.getCoverageBase.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey()
scala> val posStart = SparkSeqConversions.chrToLong("chrY") + 2708701
scala> val posEnd = SparkSeqConversions.chrToLong("chrY") + 2708708
scala> seqBaseCovMale.filter(r=>r._1 >=posStart && r._1<=posEnd).collect.foreach(println)
14/03/28 11:10:34 INFO spark.SparkContext: Job finished: collect at <console>:26, took 7.4827817 s
(110002708701,ArrayBuffer(27, 8, 12, 3, 7, 1, 1, 6))
(110002708700,ArrayBuffer(1, 3, 1, 6, 27, 8, 10, 7))
(110002708703,ArrayBuffer(1, 3, 6, 8, 7, 27, 1, 12))
(110002708705,ArrayBuffer(13, 3, 6, 8, 1, 2, 27, 7))
(110002708702,ArrayBuffer(1, 8, 6, 27, 12, 3, 1, 7))
(110002708704,ArrayBuffer(1, 6, 12, 8, 27, 3, 1, 7))
(110002708707,ArrayBuffer(2, 27, 6, 1, 13, 7, 8, 3))
(110002708706,ArrayBuffer(2, 1, 13, 8, 27, 3, 6, 7))
(110002708708,ArrayBuffer(8, 7, 1, 3, 27, 6, 2, 13))


scala> val seqBaseCovMale=  seqAnalysisMale.getCoverageBase.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey().cache
scala> seqBaseCovMale.count //populate memory cache
scala> seqBaseCovMale.filter(r=>r._1 >=posStart && r._1<=posEnd).collect.foreach(println)
14/03/28 11:11:37 INFO spark.SparkContext: Job finished: collect at <console>:26, took 0.2592477 s
(110002708701,ArrayBuffer(27, 8, 12, 3, 7, 1, 1, 6))
(110002708700,ArrayBuffer(1, 3, 1, 6, 27, 8, 10, 7))
(110002708703,ArrayBuffer(1, 3, 6, 8, 7, 27, 1, 12))
(110002708705,ArrayBuffer(13, 3, 6, 8, 1, 2, 27, 7))
(110002708702,ArrayBuffer(1, 8, 6, 27, 12, 3, 1, 7))
(110002708704,ArrayBuffer(1, 6, 12, 8, 27, 3, 1, 7))
(110002708707,ArrayBuffer(2, 27, 6, 1, 13, 7, 8, 3))
(110002708706,ArrayBuffer(2, 1, 13, 8, 27, 3, 6, 7))
(110002708708,ArrayBuffer(8, 7, 1, 3, 27, 6, 2, 13))

scala> seqBaseCovMale.filter(r=>r._1 >=posStart+5 && r._1<=posEnd+5).collect.foreach(println)
14/03/28 11:13:11 INFO spark.SparkContext: Job finished: collect at <console>:26, took 0.2585521 s
(110002708705,ArrayBuffer(13, 3, 6, 8, 1, 2, 27, 7))
(110002708713,ArrayBuffer(3, 6, 2, 8, 13, 27, 1, 7))
(110002708707,ArrayBuffer(2, 27, 6, 1, 13, 7, 8, 3))
(110002708712,ArrayBuffer(13, 7, 3, 27, 8, 2, 6, 1))
(110002708706,ArrayBuffer(2, 1, 13, 8, 27, 3, 6, 7))
(110002708709,ArrayBuffer(7, 1, 8, 27, 13, 3, 6, 2))
(110002708708,ArrayBuffer(8, 7, 1, 3, 27, 6, 2, 13))
(110002708711,ArrayBuffer(1, 2, 7, 3, 8, 13, 6, 27))
(110002708710,ArrayBuffer(2, 13, 1, 3, 7, 6, 27, 8))

To cache results on disk only:

scala> import org.apache.spark.storage.StorageLevel
scala> val seqBaseCovMale=  seqAnalysisMale.getCoverageBase.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).groupByKey().persist(StorageLevel.DISK_ONLY)
scala> seqBaseCovMale.count //populate memory cache
scala> seqBaseCovMale.filter(r=>r._1 >=posStart+5 && r._1<=posEnd+5).collect.foreach(println)
14/03/28 12:09:38 INFO spark.SparkContext: Job finished: collect at <console>:27, took 0.602364 s
(110002708705,ArrayBuffer(13, 3, 6, 8, 1, 2, 27, 7))
(110002708713,ArrayBuffer(3, 6, 2, 8, 13, 27, 1, 7))
(110002708707,ArrayBuffer(2, 27, 6, 1, 13, 7, 8, 3))
(110002708712,ArrayBuffer(13, 7, 3, 27, 8, 2, 6, 1))
(110002708706,ArrayBuffer(2, 1, 13, 8, 27, 3, 6, 7))
(110002708709,ArrayBuffer(7, 1, 8, 27, 13, 3, 6, 2))
(110002708708,ArrayBuffer(8, 7, 1, 3, 27, 6, 2, 13))
(110002708711,ArrayBuffer(1, 2, 7, 3, 8, 13, 6, 27))
(110002708710,ArrayBuffer(2, 13, 1, 3, 7, 6, 27, 8))

seqBaseCovMale.unpersist

Reading cached data from disk is obviously slower than from memory (0.6s vs 0.26s in this case) but it is still much faster then recompute everything from the scratch (7.5s)

10. Saving results

To summarize the feature saving methods:

Base coverage:

scala> val baseCov = seqAnalysis.getCoverageBase()
scala> seqAnalysis.saveBaseCoverageToFile("test_base.txt")

Exon counts:

scala> val genExonsMapB = sc.broadcast(SparkSeqConversions.BEDFileToHashMap(sc, "/BAM/64MB/aux/Homo_sapiens.GRCh37.74_exons_chr_sort_uniq.bed"))
scala> seqAnalysis.getCoverageRegion(genExonsMapB, unionMode=true)
scala> seqAnalysis.saveFeatureCoverageToFile("test_exon.txt",Exon)

Gene counts:

scala> val genesMapB = sc.broadcast(SparkSeqConversions.BEDFileToHashMap(sc, "/BAM/64MB/aux/Homo_sapiens.GRCh37.74_genes_chr_merged_swap.bed"))
scala> seqAnalysis.getCoverageRegion(genesMapB, unionMode=true)
seqAnalysis.saveFeatureCoverageToFile("test_gene.txt",Gene)

You can of course use the Apache Spark's way of saving results to file by calling 'saveAsTextFile' or 'saveAsObjectFile'(details). Note that that for using either of these methoda you need to specify a path to a file located on your HDFS storage while running an Apache Spark cluster.

11. Setting up complete data processing pipelines

Data processing pipelines may consist of a combination of

  • alignment data filtering
  • feature building - getting the information on counts, coverage function, exon junctions in given genomic regions.
  • analysis of features - statistics, sanity checks, postprocessing

The building blocks from each group may be selected according to the needs of the particular analysis. Examples of the pipelines are:

  • Typical pipeline for finding target genes with differential RNA expression or alternative splicing according to a fixed annotation.

Filtering: filtering out the reads with low quality (eg PHRED>30) and large, biologically unjustified (artefactual) alignment gaps (gap in CIGAR > 5000bp).

Feature building: Getting the counts of reads for all the genes and exons in the annotation.

Analysis: DESeq or edgeR according ( Anders et al, Nat Prot 2013), finding genes with splicing as those that have high variance of its exons median coverage, finding genes with splicing as those that have the high or significant splicing index (Gardina et al, Gen Biology, 2006)

  • Pipeline for coverage based differential expression and splicing analysis with a special treatment of protein domains and repeatable regions for paired end RNA sequencing

Filtering: filtering out low quality and large gap alignments as above, plus filtering based on the distance between the alignments in the pair (eg. gap between R1 and R2 read > 400) and filtering out reads having non-primary alignments based on bitwise SAM flags in order to exclude repeatable regions in the analysis.

Feature building: calculating the coverage function on the reference genome for each sample.

Analysis: statistical test for each nucleotide position to find the local differential expression and finding genes with splicing as those that have high variance of its exons median coverage.

  • Analysis of splicing in RNA sequencing using exon junctions ("very simplified cufflinks")

Filtering: filtering out low quality and large gap alignments as above

Feature building: finding the exon junctions, counting the junctions by genomic location

Analysis: excluding the junctions with low count (eg. 10) finding the genes with high variance of counts of junctions; finding the junctions not hooked on canonical splice start and stop sites.

  • Finding the exon junctions with sufficient coverage of flanking exons ("cufflinks verification")

Filtering: filtering out low quality and large gap alignments as above

Feature building: finding the exon junctions, counting the junctions by genomic location; calculating the coverage function.

Analysis: analysis of junctions as above, finding the junctions with high or proportional coverage levels in the flanking areas.

12. Optimization hints

  • First of all, read carefully the guide Apache Spark Tuning guide;
  • Use numeric IDs. Do conversions to Strings or Tuples only for the results presentation;
  • Filter as much and as early in your data processing as possible;
  • While using position predicates convert your value to Long, not the other way round, e.g.: filter(r=>r._1==position )) not filter(r=>SparkSeqConversions.idToCoordinates(r._1)==("chrY",2708711)) :
scala> val seqCovBase= seqAnalysis.getCoverageBase.map(r=>(SparkSeqConversions.stripSampleID(r._1),r._2) ).cache
scala> seqCovBase.count
res3: Long = 7156008

scala> val position=SparkSeqConversions.coordinatesToId(("chrY",2708711))
scala> seqCovBase.filter(r=>r._1==position).count
14/03/28 17:20:02 INFO spark.SparkContext: Job finished: count at <console>:24, took 0.5758662 s
res18: Long = 8

scala> seqCovBase.filter(r=>SparkSeqConversions.idToCoordinates(r._1)==("chrY",2708711)).count
14/03/28 17:20:21 INFO spark.SparkContext: Job finished: count at <console>:22, took 0.7433719 s
res20: Long = 8

With small dataset the computation time difference is negligible but it is still 30% in this simple case.

Updated