Commits

Anonymous committed cb769b6

convert the FastaMerger class into a FastaTools class

- make more abstract
- add more methods and cool stuff

  • Participants
  • Parent commits 6affdef

Comments (0)

Files changed (2)

File FastaMerger.py

-# Merge Fasta files of individual genes into complex files.  Assumes all files
-# have the same lines with the same ids, though order shouldn't matter.
-# Probably don't want to use this on huge FASTA files as it's quick and dirty.
-#
-# @todo Figure out if this really needs to be a class or not.
-
-class FastaMerger:
-
-	def merge_files(self, files, merged_name = "merged"):
-		out = open('%s.fasta' % merged_name, 'w')
-		hash = {}
-		ids = []
-		for file in files:
-			id = None
-			in_file = open(file)
-			for line in in_file:
-				line = line.strip()
-				if line.startswith(">"):
-					id = line
-					if id not in ids:
-						ids.append(id)
-				else:
-					try:
-						hash[id] = "%s%s" % (hash[id], line)
-					except KeyError:
-						hash[id] = line
-			in_file.close()
-		for id in ids:
-			print >>out, id
-			try:
-				print >>out, hash[id]
-			except KeyError:
-				print >>out, ""
-
-		out.close()
-

File FastaTools.py

+# A collection of FASTA-related tools.  Many of these are fairly
+# quick-and-dirty, so be careful applying them to large FASTA files.
+
+class FastaTools:
+
+  def __init__(self):
+    self.ACGT_set = set("ACGT")
+
+  def read_file(self, file_name):
+    """
+      Read in a FASTA formatted file.  Returns a tuple containing a dictionary
+      of the sequences and a list of sequence ids showing the order the
+      sequences were read in.
+
+      @param string file_name The file to read in.
+
+      @returns tuple A tuple whose first element is a dictionary of the sequences
+        read in and whose second element is a list showing the order the
+        sequences were read in.
+    """
+
+    hash = {}
+    ids = []
+    id = None
+    in_file = open(file_name)
+    for line in in_file:
+      line = line.strip()
+      if line.startswith(">"):
+        id = line
+        if id not in ids:
+          ids.append(id)
+      else:
+        try:
+          hash[id] = "%s%s" % (hash[id], line)
+        except KeyError:
+          hash[id] = line
+    in_file.close()
+    return (hash, ids)
+
+  def append_all_to_dict(self, base_hash, sub_hash):
+    """
+      Takes all the elements in the second parameter and appends them to their
+      appropriate element in the first dictionary, or creates it if it does not
+      already exist.  Right now, will stringify all elements of the
+      dictionaries, so be warned!
+
+      @param dictionary base_hash The hash to have the elements added to.
+      @param dictionary sub_hash The hash whose elements are to be added.
+
+      @returns dictionary A copy of the base_hash with the elements from the
+      sub_hash appended on the end.
+    """
+
+    copy = dict(base_hash)
+    for (key, value) in sub_hash.items():
+      try:
+        copy[key] = "%s%s" % (base_hash[key], sub_hash[key])
+      except KeyError:
+        copy[key] = sub_hash[key]
+    return copy
+
+  def correct_for_codon_length(self, seqs, char_to_add = "-"):
+    """
+      Corrects a list of sequences to have lengths divisible by 3 by adding
+      characters to the end of the sequence.
+
+      @param list seqs The list of sequences to correct.
+      @param string char_to_add The character to be added to the end of the
+      sequence.  Defaults to "-", the typical gap character.
+
+      @returns list A copy of the incoming list with corrected sequences.
+
+      @raises ValueError When the char_to_add is not a single character string.
+    """
+
+    char_to_add = str(char_to_add)
+    if len(char_to_add) != 1:
+      message = " ".join(
+        [
+          'The char_to_add parameter should be a string of length 1.  Received "',
+          str(char_to_add),
+          '" which has length ',
+          len(char_to_add)
+        ]
+      )
+      raise ValueError(message)
+    copy = []
+    for seq in seqs:
+      while len(seq) % 3 != 0:
+        seq = "%s%s" % (seq, char_to_add)
+      copy.append(seq)
+    return copy
+
+  def correct_for_codon_length_dict(self, seqs, char_to_add = "-"):
+    """
+      Corrects a dictionary of sequences to have lengths divisible by 3 by adding
+      characters to the end of the sequence.
+
+      @param dictionary seqs The dictionary of sequences to correct.
+      @param string char_to_add The character to be added to the end of the
+      sequence.  Defaults to "-", the typical gap character.
+
+      @returns dictionary A copy of the incoming dictionary with corrected sequences.
+
+      @raises ValueError When the char_to_add is not a single character string.
+    """
+
+    keys = seqs.keys()
+    temp_list = [seqs[x] for x in keys]
+    temp_list = self.correct_for_codon_length(temp_list, char_to_add)
+    copy = {}
+    for (key, seq) in zip(keys, temp_list):
+      copy[key] = seq
+    return copy
+
+
+
+  def merge_files(self, files, merged_name = "merged", ensure_codon_length = False):
+    """
+      Merge FASTA files of individual genes into complex files.  Assumes all
+      files have the same lines with the same ids, though order shouldn't
+      matter.
+
+      @param list files A list of the files that need to be merged.
+      @param string merged_name The name of the output file, without the file
+        extension.  Defaults to "merged" (meaning the default output file will
+        actually be named "merged.fasta".
+      @param boolean ensure_codon_length Flag to say whether the sequences
+        should be corrected to have a length divisible by 3 or not.  Defaults to
+        false (i.e, sequences are merged as given without correction).
+    """
+    out = open('%s.fasta' % merged_name, 'w')
+    hash = {}
+    for file in files:
+      (seqs, ids) = self.read_file(file)
+      if ensure_codon_length:
+        seqs = self.correct_for_codon_length_dict(seqs)
+      hash = self.append_all_to_dict(hash, seqs)
+
+    for id in ids:
+      print >>out, id
+      try:
+        print >>out, hash[id]
+      except KeyError:
+        print >>out, ""
+
+    out.close()
+
+  def find_clean_seq(self, file):
+    """
+      Finds a sequence from a FASTA file that has no gaps or non-ACGT
+      characters, if there is one.  NOTE:  this may pick a seemingly random
+      sequence within the file, as it does not consider the order of sequences
+      within the file as it looks through them.
+
+      @param string file The name of the file to search.
+
+      @returns tuple A tuple of the form (seq_identifier, seq).
+
+      @raises EOFError When the file contains no clean sequences.
+    """
+
+    (seqs, ids) = self.read_file(file)
+    for (id, seq) in seqs.items():
+      bases_used = set(seq)
+      non_ACGT = bases_used.difference(self.ACGT_set)
+      if len(non_ACGT) == 0:
+        return (id, seq)
+
+    raise EOFError("The file contains no sequences without non-ACGT bases.")