chapmanb / synbio (http://bcbio.wordpress.com/)

Python Synthetic Biology libraries

Clone this repository (size: 8.4 MB): HTTPS / SSH
$ hg clone http://bitbucket.org/chapmanb/synbio/
commit 2: 775d9e9a5c4d
parent 1: d62271144b61
branch: default
tags: tip
Fix setup to reflect actual modules. Add in SQLAlchemy database models.
cha...@sobchak.mgh.harvard.edu
10 months ago
synbio / SynBio / Sequencing / ServerProcess.py
r2:775d9e9a5c4d 170 loc 6.6 KB embed / history / annotate / raw /
"""Code to provide a sequencing server for sequence plate processing.
"""
import time
import datetime
import threading
import Queue
import sys

from SynBio.Corba import CodonCorba__POA
from SynBio.Corba.CodonCorba.Sequencing import BadParameters
from SynBio.Sequencing.ProcessRun import SequencingAnalysisRun

class _AbstractPoolThread:
    """Provide basic functionality to handle a thread pool and queue for servers.

    This manages a pool of threads and two queues:
    - a work queue where requests are submitted
    - a finish queue where threads indicate they are done

    This allows the top level class to keep track of requests in and requests
    out and avoid double queueing requests.
    """
    def __init__(self, thread_class, thread_args, num_threads):
        self._work_queue = Queue.Queue(0)
        self._finish_queue = Queue.Queue(0)
        self._thread_args = [self._work_queue, self._finish_queue] \
                + list(thread_args)
        self._thread_class = thread_class
        self._threads = self._start_initial_threads(num_threads)
    
    def _start_initial_threads(self, num_threads):
        threads = []
        for index in range(num_threads):
            thread = self._thread_class(*self._thread_args)
            thread.start()
            threads.append(thread)
        return threads
    
    def clear_thread_pool(self):
        """Remove our thread pool and terminate the server.
        """
        for index in range(len(self._threads)):
            # put end information on our queue to quit the threads
            self._work_queue.put(None)
        for thread in self._threads:
            thread.join()

    def _add_to_queue(self, queue_items):
        """Add a set of items to the queue, restarting any dead threads.
        """
        self._work_queue.put(queue_items)
        self._restart_dead_threads()

    def _restart_dead_threads(self):
        """Replace any dead threads in our pool, which died due to exceptions.

        XXX will this code create race conditions? Maybe there is a 
        better way to keep threads going...
        """
        for index, thread in enumerate(self._threads):
            # if the thread is dead, join it and replace with a new live thread
            if not thread.isAlive():
                thread.join()
                new_thread = self._thread_class(*self._thread_args)
                new_thread.start()
                self._threads[index] = new_thread

class ProcessServer(CodonCorba__POA.Sequencing.SequencingProcess,
        _AbstractPoolThread):
    """Provide an implementation of a plate processing server.
    """
    def __init__(self, base_config, logger, num_threads = 4):
        self._logger = logger
        self._recent_plates = []
        self._second_delay = 40
        self._stale_remove = 120

        thread_args = (base_config, logger)
        _AbstractPoolThread.__init__(self, _ProcessThread, thread_args,
                num_threads)

    def get_nameservice_name(self):
        return "SequencingProcess"

    def processPlate(self, plate_folder, results_group, sequencer_type):
        plate_id, quadrant = self._get_plate_id(plate_folder)
        self._logger.info("+ Processing plate: %s %s %s" % (results_group,
            plate_id, quadrant))
        # check for plates being passed in multiple times concurrently
        stale_plates = []
        cur_time = datetime.datetime.now()
        for old_plate, old_quadrant, old_time in self._recent_plates:
            time_change = cur_time - old_time
            if (old_plate, old_quadrant) == (plate_id, quadrant):
                if time_change.seconds <= self._second_delay:
                    self._logger.info("- Failed: passed again too soon")
                    return
            if time_change.seconds >= self._stale_remove:
                stale_plates.append((old_plate, old_quadrant, old_time))
        for stale_plate in stale_plates:
            self._recent_plates.remove(stale_plate)
        self._recent_plates.append((plate_id, quadrant, cur_time))

        analyzer = SequencingAnalysisRun(logger = self._logger)
        if sequencer_type == "internal":
            work_fn = analyzer.run_analysis
        # second case -- we may have multiple plates in a single directory
        elif sequencer_type == "external":
            work_fn = analyzer.run_external_analysis
        else:
            raise BadParameters("Unexpected sequencer type %s" % sequencer_type)

        self._restart_dead_threads()

        # put the work on the queue
        self._work_queue.put([work_fn, plate_id, quadrant, results_group,
            sequencer_type])

    def _get_plate_id(self, plate_folder):
        """Retrieve the plate_id and quadrant, given an output plate folder.

        This helps us handle 384 well plates, which are processed by quadrants.
        """
        parts = plate_folder.split('_')
        # old case, only the plate id and a 96 well plate
        if len(parts) == 1:
            return parts[0], 'X'
        # also an old case, the plate folder without a quadrant, 96 well plate
        elif len(parts) == 3:
            return parts[0], 'X'
        # new case, with the quadrant info in the folder
        # S-DSJQ6_Q2_2007-12-14_1
        elif len(parts) == 4:
            quad = parts[1]
            assert quad[0] == 'Q' and len(quad) == 2
            final_quad = quad[1]
            # the Ab1 machines dump Q2 as Q3 and vice versa, so we need to
            # switch these here to be consistent with the normal Q2/Q3
            # convention
            if final_quad == '2':
                final_quad = '3'
            elif final_quad == '3':
                final_quad = '2'
            return parts[0], final_quad
        else:
            raise BadParameters("Unexpected plate info: %s" % plate_folder)

class _ProcessThread(threading.Thread):
    def __init__(self, work_queue, finish_queue, config, logger):
        threading.Thread.__init__(self)
        self._config = config
        self._logger = logger
        self._work_queue = work_queue
        self._finish_queue = finish_queue

    def run(self):
        while 1:
            queue_info = self._work_queue.get()
            # end condition -- get a None object to work on
            if queue_info is None:
                break
            (work_fn, plate_id, quadrant, results_group, sequencer_type) = \
                    queue_info
            end_info = work_fn(self._config, plate_id, quadrant, results_group,
                    sequencer_type, self.getName())
            if end_info is None:
                self._logger.info("- Processing finished. %s" % plate_id)
            else:
                self._logger.info("- Processing problem: %s. %s" %
                        (plate_id, end_info))