"""Code to provide a sequencing server for sequence plate processing.
"""
import time
import datetime
import threading
import Queue
import sys
from SynBio.Corba import CodonCorba__POA
from SynBio.Corba.CodonCorba.Sequencing import BadParameters
from SynBio.Sequencing.ProcessRun import SequencingAnalysisRun
class _AbstractPoolThread:
"""Provide basic functionality to handle a thread pool and queue for servers.
This manages a pool of threads and two queues:
- a work queue where requests are submitted
- a finish queue where threads indicate they are done
This allows the top level class to keep track of requests in and requests
out and avoid double queueing requests.
"""
def __init__(self, thread_class, thread_args, num_threads):
self._work_queue = Queue.Queue(0)
self._finish_queue = Queue.Queue(0)
self._thread_args = [self._work_queue, self._finish_queue] \
+ list(thread_args)
self._thread_class = thread_class
self._threads = self._start_initial_threads(num_threads)
def _start_initial_threads(self, num_threads):
threads = []
for index in range(num_threads):
thread = self._thread_class(*self._thread_args)
thread.start()
threads.append(thread)
return threads
def clear_thread_pool(self):
"""Remove our thread pool and terminate the server.
"""
for index in range(len(self._threads)):
# put end information on our queue to quit the threads
self._work_queue.put(None)
for thread in self._threads:
thread.join()
def _add_to_queue(self, queue_items):
"""Add a set of items to the queue, restarting any dead threads.
"""
self._work_queue.put(queue_items)
self._restart_dead_threads()
def _restart_dead_threads(self):
"""Replace any dead threads in our pool, which died due to exceptions.
XXX will this code create race conditions? Maybe there is a
better way to keep threads going...
"""
for index, thread in enumerate(self._threads):
# if the thread is dead, join it and replace with a new live thread
if not thread.isAlive():
thread.join()
new_thread = self._thread_class(*self._thread_args)
new_thread.start()
self._threads[index] = new_thread
class ProcessServer(CodonCorba__POA.Sequencing.SequencingProcess,
_AbstractPoolThread):
"""Provide an implementation of a plate processing server.
"""
def __init__(self, base_config, logger, num_threads = 4):
self._logger = logger
self._recent_plates = []
self._second_delay = 40
self._stale_remove = 120
thread_args = (base_config, logger)
_AbstractPoolThread.__init__(self, _ProcessThread, thread_args,
num_threads)
def get_nameservice_name(self):
return "SequencingProcess"
def processPlate(self, plate_folder, results_group, sequencer_type):
plate_id, quadrant = self._get_plate_id(plate_folder)
self._logger.info("+ Processing plate: %s %s %s" % (results_group,
plate_id, quadrant))
# check for plates being passed in multiple times concurrently
stale_plates = []
cur_time = datetime.datetime.now()
for old_plate, old_quadrant, old_time in self._recent_plates:
time_change = cur_time - old_time
if (old_plate, old_quadrant) == (plate_id, quadrant):
if time_change.seconds <= self._second_delay:
self._logger.info("- Failed: passed again too soon")
return
if time_change.seconds >= self._stale_remove:
stale_plates.append((old_plate, old_quadrant, old_time))
for stale_plate in stale_plates:
self._recent_plates.remove(stale_plate)
self._recent_plates.append((plate_id, quadrant, cur_time))
analyzer = SequencingAnalysisRun(logger = self._logger)
if sequencer_type == "internal":
work_fn = analyzer.run_analysis
# second case -- we may have multiple plates in a single directory
elif sequencer_type == "external":
work_fn = analyzer.run_external_analysis
else:
raise BadParameters("Unexpected sequencer type %s" % sequencer_type)
self._restart_dead_threads()
# put the work on the queue
self._work_queue.put([work_fn, plate_id, quadrant, results_group,
sequencer_type])
def _get_plate_id(self, plate_folder):
"""Retrieve the plate_id and quadrant, given an output plate folder.
This helps us handle 384 well plates, which are processed by quadrants.
"""
parts = plate_folder.split('_')
# old case, only the plate id and a 96 well plate
if len(parts) == 1:
return parts[0], 'X'
# also an old case, the plate folder without a quadrant, 96 well plate
elif len(parts) == 3:
return parts[0], 'X'
# new case, with the quadrant info in the folder
# S-DSJQ6_Q2_2007-12-14_1
elif len(parts) == 4:
quad = parts[1]
assert quad[0] == 'Q' and len(quad) == 2
final_quad = quad[1]
# the Ab1 machines dump Q2 as Q3 and vice versa, so we need to
# switch these here to be consistent with the normal Q2/Q3
# convention
if final_quad == '2':
final_quad = '3'
elif final_quad == '3':
final_quad = '2'
return parts[0], final_quad
else:
raise BadParameters("Unexpected plate info: %s" % plate_folder)
class _ProcessThread(threading.Thread):
def __init__(self, work_queue, finish_queue, config, logger):
threading.Thread.__init__(self)
self._config = config
self._logger = logger
self._work_queue = work_queue
self._finish_queue = finish_queue
def run(self):
while 1:
queue_info = self._work_queue.get()
# end condition -- get a None object to work on
if queue_info is None:
break
(work_fn, plate_id, quadrant, results_group, sequencer_type) = \
queue_info
end_info = work_fn(self._config, plate_id, quadrant, results_group,
sequencer_type, self.getName())
if end_info is None:
self._logger.info("- Processing finished. %s" % plate_id)
else:
self._logger.info("- Processing problem: %s. %s" %
(plate_id, end_info))