"""Code to provide a sequencing server for sequencing analysis.
"""
import os
import glob
import StringIO
import threading
import tempfile
from datetime import datetime
from SynBio.Corba import CodonCorba__POA
from SynBio.Corba.CodonCorba.Sequencing import BadParameters, NotFinished
from SynBio.Sequencing.Database import SequencingDatabaseRetrieve
from SynBio.Sequencing.ConstructAnalysis import (TreatmentCloneAnalyze,
ErrorFinder, KeithBaseCaller)
from SynBio.Sequencing.PlateAnalysis import PhredAnalyzer
from SynBio.Sequencing.AnalysisEmail import DefaultSender
from SynBio.OligoDatabase import get_db_collection
class AnalysisGeneratorServer(
CodonCorba__POA.Sequencing.SequencingAnalysisGenerator):
"""Generator to get analysis servers with standard details.
"""
def __init__(self, base_config, logger):
self._config = base_config
self._logger = logger
def get_nameservice_name(self):
return "SequencingAnalysisGenerator"
def getAnalyzer(self, email, num_reads, phred):
self._logger.info("+ Setup analyzer: %s" % (email))
# if num_reads <= 0:
# raise BadParameters("Need at least one read")
# if phred < 20.0:
# raise BadParameters("Need phred score above 20.0")
# special case -- remove our testing server e-email, also used
# internally
if email == "nunit@codondevices.com":
email = None
if email is not None:
username, ext = email.split("@")
if username == "":
raise BadParameters("Improperly formatted email address")
new_server = AnalysisServer(self._config, self._logger, email)
return new_server._this()
class AnalysisRetriever(CodonCorba__POA.Sequencing.SequencingAnalysisRetriever):
"""Retrieve a set of analyses, send e-mails, and do other fancy things.
"""
def __init__(self, output_dir, analysis_id, email, logger):
self._output_dir = output_dir
self._analysis_id = analysis_id
self._email = email
self._logger = logger
def analysisId(self):
return self._analysis_id
def finished(self):
try:
self._get_outfile()
return True
except NotFinished:
return False
def _get_outfile(self):
files = glob.glob(os.path.join(self._output_dir,
"*%s*" % (self._analysis_id)))
if len(files) == 1:
size = os.path.getsize(files[0])
if size > 0:
return files[0]
else:
raise NotFinished
else:
raise ValueError("Multiple results for %s: %s" %
(self._analysis_id, files))
def emailReport(self):
if self._email is not None:
self._logger.info("= Sent report: %s" % self._email)
outfile = self._get_outfile()
info_handle = StringIO.StringIO()
info_handle.write(self.getReport())
email_sender = DefaultSender()
directory, base_outfile = os.path.split(outfile)
email_sender.send_email(None, self._email, info_handle, base_outfile)
info_handle.close()
else:
self._logger.info("= No report sent; no e-mail")
def getReport(self):
outfile = self._get_outfile()
handle = open(outfile)
report = handle.read()
handle.close()
return report
class _AnalysisRunner(threading.Thread):
"""Perform analysis on a set of constructs or clones.
"""
def __init__(self, db_col, retriever_fn, analyzer, analyzer_fn, item_ids,
outfile, logger, results_retriever, use_plates = False):
threading.Thread.__init__(self)
self._db_col = db_col
self._retriever_fn = retriever_fn
self._analyzer = analyzer
self._analyzer_fn = analyzer_fn
self._item_ids = item_ids
self._outfile = outfile
self._logger = logger
self._results_retriever = results_retriever
self._use_plates = use_plates
def run(self):
"""Peform the sequencing analysis and send an e-mail report.
"""
items, plates = self._retriever_fn(self._item_ids)
self._analyzer.reset()
# analyze either the plates or the items
if self._use_plates:
for plate in plates:
self._analyzer_fn(plate)
else:
for item in items:
self._analyzer_fn(item)
# write out the report to our output file
orders = [plate.get_order() for plate in plates]
final_orders = list(set(orders))
out_handle = open(self._outfile, "w")
self._analyzer.write(out_handle, final_orders, plates)
out_handle.close()
self._logger.info("- Finished: %s" % self._item_ids)
# send out an e-mail by default right now to mimic previous behavior
self._results_retriever.emailReport()
# close up our database connections
self._db_col.close()
class AnalysisServer(CodonCorba__POA.Sequencing.SequencingAnalysis):
"""Perform analysis on clones, e-mail results to users.
"""
def __init__(self, config, logger, email):
self._config = config
self._logger = logger
self._email = email
def _setup(self, use_plates = False):
db_col = get_db_collection(self._config.db_type,
self._config.ws_user)
retriever = SequencingDatabaseRetrieve(db_col)
base_caller = KeithBaseCaller()
error_finder = ErrorFinder(base_caller)
if use_plates:
analyzer = PhredAnalyzer(db_col)
else:
analyzer = TreatmentCloneAnalyze(db_col.track_db,
error_finder, self._config.muts_gap_thresh,
self._config.large_error_thresh)
return retriever, analyzer, db_col
def _unique_id_and_file(self, ext = ".txt"):
"""Generate a unique filename and output id for this analysis.
"""
cur_time = datetime.now()
time_id = cur_time.strftime("%H_%M_%S-%y%m%d_")
prefix = self._config.output_base + time_id
if not(os.path.exists(self._config.analysis_work_dir)):
os.makedirs(self._config.analysis_work_dir)
fd, outfile = tempfile.mkstemp(ext, prefix,
self._config.analysis_work_dir)
junk, final_file = os.path.split(outfile)
filename, ext = os.path.splitext(final_file)
final_id = filename.replace(self._config.output_base, "")
return outfile, final_id
def analyzeConstructs(self, construct_ids):
"""Provide analysis of the given set of construct IDs.
"""
self._logger.info("+ Analyze constructs: %s" % construct_ids)
retriever, analyzer, db_col = self._setup()
outfile, final_id = self._unique_id_and_file()
retrieve_fn = retriever.retrieve_by_construct_id
analyze_fn = analyzer.analyze
final_retriever = AnalysisRetriever(self._config.analysis_work_dir,
final_id, self._email, self._logger)
analysis_run = _AnalysisRunner(db_col, retrieve_fn, analyzer,
analyze_fn, construct_ids, outfile, self._logger,
final_retriever)
analysis_run.start()
return final_retriever._this()
def analyzePlatePhred(self, plate_ids):
"""Provide phred analysis for the given plates.
"""
self._logger.info("+ Phred plate analysis: %s" % plate_ids)
retriever, analyzer, db_col = self._setup(use_plates = True)
outfile, final_id = self._unique_id_and_file(".csv")
try:
plate_ids = retriever.verify_plates(plate_ids)
except KeyError, msg:
self._logger.info("- Bad plate passed")
raise BadParameters(str(msg))
retrieve_fn = retriever.retrieve_by_plates
analyze_fn = analyzer.analyze
final_retriever = AnalysisRetriever(self._config.analysis_work_dir,
final_id, self._email, self._logger)
analysis_run = _AnalysisRunner(db_col, retrieve_fn, analyzer,
analyze_fn, plate_ids, outfile, self._logger, final_retriever,
use_plates = True)
analysis_run.start()
return final_retriever._this()
def analyzeTClones(self, tclone_ids):
self._logger.info("+ Analyze treatment clones: %s" % tclone_ids)
retriever, analyzer, db_col = self._setup()
outfile, final_id = self._unique_id_and_file()
retrieve_fn = retriever.retrieve_by_treat_clone_id
analyze_fn = analyzer.analyze_treat_clone
final_retriever = AnalysisRetriever(self._config.analysis_work_dir,
final_id, self._email, self._logger)
analysis_run = _AnalysisRunner(db_col, retrieve_fn, analyzer,
analyze_fn, tclone_ids, outfile, self._logger,
final_retriever)
analysis_run.start()
return final_retriever._this()
def analyzePlates(self, plate_ids):
self._logger.info("+ Analyze plates: %s" % plate_ids)
retriever, analyzer, db_col = self._setup()
outfile, final_id = self._unique_id_and_file()
try:
plate_ids = retriever.verify_plates(plate_ids)
except KeyError, msg:
self._logger.info("- Bad plate passed")
raise BadParameters(str(msg))
retrieve_fn = retriever.retrieve_by_plates
analyze_fn = analyzer.analyze
final_retriever = AnalysisRetriever(self._config.analysis_work_dir,
final_id, self._email, self._logger)
analysis_run = _AnalysisRunner(db_col, retrieve_fn, analyzer,
analyze_fn, plate_ids, outfile, self._logger, final_retriever)
analysis_run.start()
return final_retriever._this()