1. Davide Cittaro
  2. utils


utils / markvcf.py

#!/usr/bin/env python2.7

import bx.intervals
import argparse
import sys
import vcf

def terminate(msg=None, exit_status=1):
  sys.stderr.write("%s\n" % msg)

def sniff(s):
    return 'Integer'
  except ValueError:
      return 'Float'
    except ValueError:
      return 'String'

def cast(s, t):
  if t == 'Integer':
    return int(s)
  elif t == 'Float':
    return float(s)
  elif t == 'Flag':
    return None  
    return s  

def read_gene_list(fh):
  # Read gene list, which is a tab separated file with gene in the first column.
  # If one column is present, returns the genes, if multiple columns are present, returns the gene list 
  # along with the properties. An header should be present, column names will be the properties
  mark_names = []
  gene_list = {}
  # read first line
  header = fh.readline()
  mark_names = header.lstrip('#').strip().split()
  nf = len(mark_names)
  if nf > 1 and not header.startswith('#'):
    terminate("Multiple columns in gene list, but no header provided!")
  elif nf == 1:
    gene_list[mark_names[0]] = None
  for line in fh:
    fields = line.strip().split()
    if nf == 1:
      gene_list[fields[0]] = None
      gene_list[fields[0]] = dict(zip(mark_names[1:], fields[1:]))      
  return gene_list

def read_refgene(fh, gene_list, prefix="MARK", use_gene_symbol = True):
  # Read sql table (refGene) from UCSC. This should be quite standard, with 
  # at least cdsStart, cdsEnd, name2 and chrom in the proper positions...
  # bin	name	chrom	strand	txStart	txEnd	cdsStart	cdsEnd	exonCount	exonStarts	exonEnds	score	name2	cdsStartStat	cdsEndStat	exonFrames

  tree_dict = {}
  for line in (fh):
    if line.startswith('#'): continue
    fields = line.split()
    chrom = fields[2]
    cdsStart = int(fields[6])
    cdsEnd = int(fields[7])
    if use_gene_symbol:
      gene_name = fields[12]
      gene_name = fields[1]
    if not tree_dict.has_key(chrom):
      tree_dict[chrom] = bx.intervals.IntervalTree()
    if gene_list.has_key(gene_name):
      properties = gene_list[gene_name]
      if not properties:
        properties = {}
      tree_dict['__properties__'] = properties.keys()
      properties['gene'] = gene_name
      tree_dict[chrom].add(cdsStart, cdsEnd, properties)

  return tree_dict

def annotate_vcf(fh, gene_props, prefix):
  vcf_file = vcf.Reader(fh)
  properties = [prefix + x for x in gene_props['__properties__']]
  if len(properties) == 0:
    id = prefix
    type = 'Flag'
    num = 1
    desc = 'Value added by markvcf'
    vcf_file.infos[id] = vcf.parser._Info(id, num, type, desc)
    for id in properties:
      type = sniff(id)
      num = '.'
      desc = 'Value added by markvcf'
      vcf_file.infos[id] = vcf.parser._Info(id, num, type, desc)
  vcf_out = vcf.Writer(sys.stdout, vcf_file, lineterminator='\n')
  # we now traverse the tree. As soon as we get into an overlapping feature
  # we update the record entry and the INFO in the header
  for record in vcf_file:
    chrom = record.CHROM
    if not gene_props[chrom]:
      # no need to process, just write
      # get the intersection, it is an array of dictionaries
      matched_genes = gene_props[chrom].find(record.POS, record.POS)  
      if len(matched_genes) == 0:
        # ok, this is tough!
        # we don't want to add redundancies... I didn't removed from intervaltree
        # as, well, it is easier to manage here :-)
        added = set([]) # keeps track of redundacies for this SNP
        for match in matched_genes:
          gene = match['gene']
          if gene in added: continue # skip. In principle annotation file should contain one entry per gene...
          for (p,v) in match.items():
            # here p is the property name, v its value
            id = prefix + p # convert to match above...
              mark_value = cast(v, vcf_file.infos[id].type) # this converts back to the proper data type
                # since variants can overlap more than one gene (yes...) they should be handled as arrays
              except KeyError:
                record.INFO[id] = [mark_value]  
              # this happens when we only have one gene, no columns in the annotation file
              record.INFO[prefix] = True

def mark_main():

  # parse options
  option_parser = argparse.ArgumentParser(
  description="Mark VCF files for gene properties",
  epilog="For any question, write to cittaro.davide@hsr.it")
  option_parser.add_argument("--version", action="version", version="%(prog)s 0.1")
  option_parser.add_argument("-v", "--vcf", help="VCF file to annotate", action='store', type=file, required=True)
  option_parser.add_argument("-g","--genelist", help="File containing gene list", action='store', type=file, required=True)
  option_parser.add_argument("-r","--refgene", help="RefGene SQL table from UCSC", action='store', type=file, required=True)
  option_parser.add_argument("-n","--name", help="Annotation name prefix", action='store', default="MARK")
  options = option_parser.parse_args()
  # read the genelist and retur
  gene_list = read_gene_list(options.genelist)
  # read sql file and create the interval tree with gene properties
  gene_properties = read_refgene(options.refgene, gene_list)
  # read the vcf and, at the same time, output annotated entries
  # I really would like to use PyVCF interface but at time of writing it had a bug that prevents 
  # reading of my test vcf files...
  annotate_vcf(options.vcf, gene_properties, options.name)
  # that's it!

if __name__ == "__main__": mark_main()