Commits

Ronny Pfannschmidt committed f535b63

refactor Procs

Comments (0)

Files changed (4)

glas_process/baseproc.py

+import gevent
+from gevent.queue import Queue
+
+
+class Proc(object):
+    def save_with_batch(self, doc):
+        self.dir.save_with_batch(doc)
+
+    def __init__(self, procdir):
+        self.dir = procdir
+        self.queue = Queue()
+        self.greenlets = []
+        self._control = None
+        self.step = None
+
+    def spawn(self, func, *k, **kw):
+        res = gevent.spawn(func, *k,  **kw)
+        self.greenlets.append(res)
+        return res
+
+    def _store(self):
+        for i, doc in enumerate(self.queue):
+            if not doc._id:
+                doc._id = '%s:%s' %(self.step._id, i)
+            self.save_with_batch(doc)
+            returncode = getattr(doc, 'returncode', None)
+            if returncode is not None and self.step is not None:
+                status = 'complete' if returncode == 0 else 'failed'
+                self.step.status = status
+                self.save_with_batch(self.step)
+
+    def start(self):
+        if self._control is None:
+            self.create()
+            self._control = self.spawn(self._store)
+
+    def wait(self):
+        self.start()
+        self._control.join()
+        gevent.joinall(self.greenlets)
+
+    def create(self):
+        raise NotImplementedError
+
+    def run(self):
+        self.start()
+        return self.wait()
+
+    def kill(self):
+        pass
+

glas_process/procdir.py

-from .task import SubProcessProc
 from collections import Counter
+import glas_process.subprocess
 
 class ProcDir(object):
     scminfo = None
         )
 
     def run_python(self, script, belongs_to=None, _id=None):
-        return SubProcessProc(self, self.get_id(_id), 'python',
+        return glas_process.subprocess.SubProcessProc(self, self.get_id(_id), 'python',
                               ['python', '-'], script)
 
     def call(self, args, _id=None):
         return self._call(args, None, 'popen', self.get_id(_id))
 
     def _call(self, args, stdin, steper, _id):
-        return SubProcessProc(self, _id, 'popen', args, stdin)
+        return glas_process.subprocess.SubProcessProc(self, _id, 'popen', args, stdin)

glas_process/subprocess.py

-from __future__ import print_function
-from __future__ import absolute_import
+from __future__ import print_function, absolute_import
 
 import os
 import fcntl
 from gevent import socket
 
 from .model import Event
+from glas_process.model import Step
+from glas_process.baseproc import Proc
+
+
+class SubProcessProc(Proc):
+    def __init__(self, proc, _id, steper, args, stdin):
+        Proc.__init__(self, proc)
+        self.step = Step(
+            _id=_id,
+            status='prepared',
+            steper=steper,
+            inputs={
+                'args': [str(x) for x in args],
+                'cwd': str(self.dir.path),
+                'stdin': stdin,
+            },
+            belongs_to=self.dir.belongs_to,
+        )
+        self.save_with_batch(self.step)
+
+    def create(self):
+        self.step.status = 'running'
+        self.save_with_batch(self.step)
+        self.popen = start_subprocess(self)
 
 def stream_line_iter(fp):
     fcntl.fcntl(fp, fcntl.F_SETFL, os.O_NONBLOCK)  # make the file nonblocking

glas_process/task.py

-import gevent
-from gevent.queue import Queue
-from .model import Step
-from .subprocess import start_subprocess
-
-
-class Proc(object):
-    def save_with_batch(self, doc):
-        self.dir.save_with_batch(doc)
-
-    def __init__(self, procdir):
-        self.dir = procdir
-        self.queue = Queue()
-        self.greenlets = []
-        self._control = None
-        self.step = None
-
-    def spawn(self, func, *k, **kw):
-        res = gevent.spawn(func, *k,  **kw)
-        self.greenlets.append(res)
-        return res
-
-    def _store(self):
-        for i, doc in enumerate(self.queue):
-            if not doc._id:
-                doc._id = '%s:%s' %(self.step._id, i)
-            self.save_with_batch(doc)
-            returncode = getattr(doc, 'returncode', None)
-            if returncode is not None and self.step is not None:
-                status = 'complete' if returncode == 0 else 'failed'
-                self.step.status = status
-                self.save_with_batch(self.step)
-
-    def start(self):
-        if self._control is None:
-            self.create()
-            self._control = self.spawn(self._store)
-
-    def wait(self):
-        self.start()
-        self._control.join()
-        gevent.joinall(self.greenlets)
-
-    def create(self):
-        raise NotImplementedError
-
-    def run(self):
-        self.start()
-        return self.wait()
-
-    def kill(self):
-        pass
-
-class SubProcessProc(Proc):
-    def __init__(self, proc, _id, steper, args, stdin):
-        Proc.__init__(self, proc)
-        self.step = Step(
-            _id=_id,
-            status='prepared',
-            steper=steper,
-            inputs={
-                'args': [str(x) for x in args],
-                'cwd': str(self.dir.path),
-                'stdin': stdin,
-            },
-            belongs_to=self.dir.belongs_to,
-        )
-        self.save_with_batch(self.step)
-
-    def create(self):
-        self.step.status = 'running'
-        self.save_with_batch(self.step)
-        self.popen = start_subprocess(self)
-
-