chapmanb / synbio (http://bcbio.wordpress.com/)

Python Synthetic Biology libraries

Clone this repository (size: 8.4 MB): HTTPS / SSH
$ hg clone http://bitbucket.org/chapmanb/synbio/
commit 2: 775d9e9a5c4d
parent 1: d62271144b61
branch: default
tags: tip
Fix setup to reflect actual modules. Add in SQLAlchemy database models.
cha...@sobchak.mgh.harvard.edu
10 months ago
synbio / SynBio / Sequencing / ServerAnalysis.py
r2:775d9e9a5c4d 253 loc 9.7 KB embed / history / annotate / raw /
"""Code to provide a sequencing server for sequencing analysis.
"""
import os
import glob
import StringIO
import threading
import tempfile
from datetime import datetime

from SynBio.Corba import CodonCorba__POA
from SynBio.Corba.CodonCorba.Sequencing import BadParameters, NotFinished
from SynBio.Sequencing.Database import SequencingDatabaseRetrieve
from SynBio.Sequencing.ConstructAnalysis import (TreatmentCloneAnalyze,
        ErrorFinder, KeithBaseCaller)
from SynBio.Sequencing.PlateAnalysis import PhredAnalyzer
from SynBio.Sequencing.AnalysisEmail import DefaultSender
from SynBio.OligoDatabase import get_db_collection

class AnalysisGeneratorServer(
        CodonCorba__POA.Sequencing.SequencingAnalysisGenerator):
    """Generator to get analysis servers with standard details.
    """
    def __init__(self, base_config, logger):
        self._config = base_config
        self._logger = logger
    
    def get_nameservice_name(self):
        return "SequencingAnalysisGenerator"

    def getAnalyzer(self, email, num_reads, phred):
        self._logger.info("+ Setup analyzer: %s" % (email))
        # if num_reads <= 0:
        #     raise BadParameters("Need at least one read")
        # if phred < 20.0:
        #     raise BadParameters("Need phred score above 20.0")
        # special case -- remove our testing server e-email, also used
        # internally
        if email == "nunit@codondevices.com":
            email = None
        if email is not None:
            username, ext = email.split("@")
            if username == "":
                raise BadParameters("Improperly formatted email address")
        new_server = AnalysisServer(self._config, self._logger, email)
        return new_server._this()

class AnalysisRetriever(CodonCorba__POA.Sequencing.SequencingAnalysisRetriever):
    """Retrieve a set of analyses, send e-mails, and do other fancy things.
    """
    def __init__(self, output_dir, analysis_id, email, logger):
        self._output_dir = output_dir
        self._analysis_id = analysis_id
        self._email = email
        self._logger = logger

    def analysisId(self):
        return self._analysis_id

    def finished(self):
        try:
            self._get_outfile()
            return True
        except NotFinished:
            return False

    def _get_outfile(self):
        files = glob.glob(os.path.join(self._output_dir,
            "*%s*" % (self._analysis_id)))
        if len(files) == 1:
            size = os.path.getsize(files[0])
            if size > 0:
                return files[0]
            else:
                raise NotFinished
        else:
            raise ValueError("Multiple results for %s: %s" %
                    (self._analysis_id, files))

    def emailReport(self):
        if self._email is not None:
            self._logger.info("= Sent report: %s" % self._email)
            outfile = self._get_outfile()
            info_handle = StringIO.StringIO()
            info_handle.write(self.getReport())
            email_sender = DefaultSender()
            directory, base_outfile = os.path.split(outfile)
            email_sender.send_email(None, self._email, info_handle, base_outfile)
            info_handle.close()
        else:
            self._logger.info("= No report sent; no e-mail")

    def getReport(self):
        outfile = self._get_outfile()
        handle = open(outfile)
        report = handle.read()
        handle.close()
        return report

class _AnalysisRunner(threading.Thread):
    """Perform analysis on a set of constructs or clones.
    """
    def __init__(self, db_col, retriever_fn, analyzer, analyzer_fn, item_ids,
            outfile, logger, results_retriever, use_plates = False):
        threading.Thread.__init__(self)
        self._db_col = db_col
        self._retriever_fn = retriever_fn
        self._analyzer = analyzer
        self._analyzer_fn = analyzer_fn
        self._item_ids = item_ids
        self._outfile = outfile
        self._logger = logger
        self._results_retriever = results_retriever
        self._use_plates = use_plates

    def run(self):
        """Peform the sequencing analysis and send an e-mail report.
        """
        items, plates = self._retriever_fn(self._item_ids)
        self._analyzer.reset()

        # analyze either the plates or the items
        if self._use_plates:
            for plate in plates:
                self._analyzer_fn(plate)
        else:
            for item in items:
                self._analyzer_fn(item)

        # write out the report to our output file
        orders = [plate.get_order() for plate in plates]
        final_orders = list(set(orders))
        
        out_handle = open(self._outfile, "w")
        self._analyzer.write(out_handle, final_orders, plates)
        out_handle.close()
        self._logger.info("- Finished: %s" % self._item_ids)

        # send out an e-mail by default right now to mimic previous behavior
        self._results_retriever.emailReport()
        # close up our database connections
        self._db_col.close()

class AnalysisServer(CodonCorba__POA.Sequencing.SequencingAnalysis):
    """Perform analysis on clones, e-mail results to users.
    """
    def __init__(self, config, logger, email):
        self._config = config
        self._logger = logger
        self._email = email

    def _setup(self, use_plates = False):
        db_col = get_db_collection(self._config.db_type,
                self._config.ws_user)
        retriever = SequencingDatabaseRetrieve(db_col)
        base_caller = KeithBaseCaller()
        error_finder = ErrorFinder(base_caller)
        if use_plates:
            analyzer = PhredAnalyzer(db_col)
        else:
            analyzer = TreatmentCloneAnalyze(db_col.track_db,
                    error_finder, self._config.muts_gap_thresh,
                    self._config.large_error_thresh)
        return retriever, analyzer, db_col

    def _unique_id_and_file(self, ext = ".txt"):
        """Generate a unique filename and output id for this analysis.
        """
        cur_time = datetime.now()
        time_id = cur_time.strftime("%H_%M_%S-%y%m%d_")
        prefix = self._config.output_base + time_id
        if not(os.path.exists(self._config.analysis_work_dir)):
            os.makedirs(self._config.analysis_work_dir)
        fd, outfile = tempfile.mkstemp(ext, prefix,
                self._config.analysis_work_dir)
        junk, final_file = os.path.split(outfile)
        filename, ext = os.path.splitext(final_file)
        final_id = filename.replace(self._config.output_base, "")
        return outfile, final_id
    
    def analyzeConstructs(self, construct_ids):
        """Provide analysis of the given set of construct IDs.
        """
        self._logger.info("+ Analyze constructs: %s" % construct_ids)
        retriever, analyzer, db_col = self._setup()
        outfile, final_id = self._unique_id_and_file()
        retrieve_fn = retriever.retrieve_by_construct_id
        analyze_fn = analyzer.analyze

        final_retriever = AnalysisRetriever(self._config.analysis_work_dir,
                final_id, self._email, self._logger)
        analysis_run = _AnalysisRunner(db_col, retrieve_fn, analyzer,
                analyze_fn, construct_ids, outfile, self._logger,
                final_retriever)
        analysis_run.start()
        return final_retriever._this()
    
    def analyzePlatePhred(self, plate_ids):
        """Provide phred analysis for the given plates.
        """
        self._logger.info("+ Phred plate analysis: %s" % plate_ids)
        retriever, analyzer, db_col = self._setup(use_plates = True)
        outfile, final_id = self._unique_id_and_file(".csv")
        try:
            plate_ids = retriever.verify_plates(plate_ids)
        except KeyError, msg:
            self._logger.info("- Bad plate passed")
            raise BadParameters(str(msg))

        retrieve_fn = retriever.retrieve_by_plates
        analyze_fn = analyzer.analyze

        final_retriever = AnalysisRetriever(self._config.analysis_work_dir,
                final_id, self._email, self._logger)
        analysis_run = _AnalysisRunner(db_col, retrieve_fn, analyzer,
                analyze_fn, plate_ids, outfile, self._logger, final_retriever,
                use_plates = True)
        analysis_run.start()
        return final_retriever._this()
    
    def analyzeTClones(self, tclone_ids):
        self._logger.info("+ Analyze treatment clones: %s" % tclone_ids)
        retriever, analyzer, db_col = self._setup()
        outfile, final_id = self._unique_id_and_file()
        retrieve_fn = retriever.retrieve_by_treat_clone_id
        analyze_fn = analyzer.analyze_treat_clone

        final_retriever = AnalysisRetriever(self._config.analysis_work_dir,
                final_id, self._email, self._logger)
        analysis_run = _AnalysisRunner(db_col, retrieve_fn, analyzer,
                analyze_fn, tclone_ids, outfile, self._logger,
                final_retriever)
        analysis_run.start()
        return final_retriever._this()
    
    def analyzePlates(self, plate_ids):
        self._logger.info("+ Analyze plates: %s" % plate_ids)
        retriever, analyzer, db_col = self._setup()
        outfile, final_id = self._unique_id_and_file()
        try:
            plate_ids = retriever.verify_plates(plate_ids)
        except KeyError, msg:
            self._logger.info("- Bad plate passed")
            raise BadParameters(str(msg))

        retrieve_fn = retriever.retrieve_by_plates
        analyze_fn = analyzer.analyze

        final_retriever = AnalysisRetriever(self._config.analysis_work_dir,
                final_id, self._email, self._logger)
        analysis_run = _AnalysisRunner(db_col, retrieve_fn, analyzer,
                analyze_fn, plate_ids, outfile, self._logger, final_retriever)
        analysis_run.start()
        return final_retriever._this()