Commits

Walter Dörwald committed bd84b78

Use signal.alarm/os.fork/fcntl.flock instead of pid files.

A fcntl.flock() call on the script file is used to ensure that only one instance
of the job is running.

Use os.fork() for runtime monitoring. The child process is doing the work, the
parent process is doing the runtime monitoring.

Comments (0)

Files changed (2)

 __docformat__ = "reStructuredText"
 
 
+# get the current directory as early as possible to minimize the chance that someone has called ``os.chdir()``
+_curdir = os.getcwd()
+
+
 # fetch item, first, last, count and xmlescape
 try:
 	from ll._misc import *
 		self.python_executable = _string(sys.executable)
 		self.python_version = ("{}.{}.{}" if sys.version_info.micro else "{}.{}").format(*sys.version_info)
 		self.pid = os.getpid()
-		self.scriptname = _string(sys._getframe(-1).f_code.co_filename)
+		self.scriptname = _string(os.path.join(_curdir, sys.modules["__main__"].__file__))
 
 	def __getitem__(self, key):
 		if key in self._keys:
 		yield "\n"
 
 
+
 class JSMinUnterminatedComment(Exception):
 	pass
 

src/ll/sisyphus.py

 """
 
 
-import sys, os, socket, pwd, codecs, traceback, errno, pprint, datetime, re, contextlib, argparse
+import sys, os, signal, fcntl, codecs, traceback, errno, pprint, datetime, re, contextlib, argparse
 
 from ll import url, ul4c, misc
 
 encodingdeclaration = re.compile(r"coding[:=]\s*([-\w.]+)")
 
 
+class MaximumRuntimeExceeded(Exception):
+	def __init__(self, maxtime):
+		self.maxtime = maxtime
+
+	def __str__(self):
+		return "maximum runtime of {} seconds exceeded".format(self.maxtime)
+
+
 class Job(object):
 	"""
 	A Job object executes a task once.
 		Description for help message of the command line argument parser.
 
 	``maxtime`` : :option:`-m` or :option:`--maxtime`
-		Maximum allowed runtime for the job (as a :class:`datetime.timedelta`
-		instance or the number of seconds). If a job is started and the previous
-		invocation has been running for more than ``maxtime`` the previous job
-		will be killed.
+		Maximum allowed runtime for the job (as the number of seconds). If the job
+		runs longer than that it will kill itself.
+
+	``noisykills`` : :option:`--noisykills`
+		Should an exception be raised when the maximum runtime is exceeded?
 
 	``logfilename`` : :option:`--logfilename`
 		Path/name of the logfile for this job as an UL4 template. Variables
 		A link that points to the currently active logfile (as an UL4 template).
 		If this is :const:`None` no link will be created.
 
-	``pidfilename`` : :option:`--pidfilename`
-		The path/name of the file where the job stores its process id (as an UL4
-		template).
-
 	``log2file`` : :option:`-l` or :option:`--log2file`
 		Should a logfile be written at all?
 
 
 	maxtime = 5 * 60
 
-	logfilename = u"~/ll.sisyphus/log/<?print projectname?>/<?print jobname?>/<?print starttime.format('%Y-%m-%d-%H-%M-%S-%f')?>.sisyphuslog"
-	loglinkname = u"~/ll.sisyphus/log/<?print projectname?>/<?print jobname?>/current.sisyphuslog"
-	pidfilename = u"~/ll.sisyphus/run/<?print projectname?>/<?print jobname?>.pid"
+	noisykills = False
+
+	logfilename = u"~/ll.sisyphus/<?print projectname?>/<?print jobname?>/<?print starttime.format('%Y-%m-%d-%H-%M-%S-%f')?>.sisyphuslog"
+	loglinkname = u"~/ll.sisyphus/<?print projectname?>/<?print jobname?>/current.sisyphuslog"
 
 	log2file = True
 
 		p = argparse.ArgumentParser(description=self.argdescription)
 		p.add_argument("-p", "--projectname", dest="projectname", metavar="NAME", help="The name of the project this job belongs to", type=self._string, default=self.projectname)
 		p.add_argument("-j", "--jobname", dest="jobname", metavar="NAME", help="The name of the job", type=self._string, default=self.jobname if self.jobname is not None else self.__class__.__name__)
-		p.add_argument("-m", "--maxtime", dest="maxtime", metavar="SECONDS", help="Maximum number of seconds the job is allowed to run", type=int, default=self.info.maxtime.total_seconds())
+		p.add_argument("-m", "--maxtime", dest="maxtime", metavar="SECONDS", help="Maximum number of seconds the job is allowed to run", type=int, default=self.maxtime)
 		p.add_argument("-l", "--log2file", dest="log2file", metavar="FLAG", help="Should a logfile be written?", type=misc.flag, default=self.log2file)
 		p.add_argument(      "--keepfilelogs", dest="keepfilelogs", metavar="DAYS", help="Number of days log files are kept", type=float, default=self.keepfilelogs)
 		p.add_argument(      "--inputencoding", dest="inputencoding", metavar="ENCODING", help="Encoding for system data (i.e. crontab etc.)", default=self.inputencoding)
 		p.add_argument(      "--inputerrors", dest="inputerrors", metavar="METHOD", help="Error handling method for encoding errors in system data", default=self.inputerrors)
 		p.add_argument(      "--outputencoding", dest="outputencoding", metavar="ENCODING", help="Encoding for the log file", default=self.outputencoding)
 		p.add_argument(      "--outputerrors", dest="outputerrors", metavar="METHOD", help="Error handling method for encoding errors in log texts", default=self.outputerrors)
+		p.add_argument(      "--noisykills", dest="noisykills", metavar="FLAG", help="Should an exception be raised if the maximum runtime is exceeded?", type=misc.flag, default=self.noisykills)
 		return p
 
 	def parseargs(self, args=None):
 		"""
 		p = self.argparser()
 		args = p.parse_args(args)
-		self.info.projectname = args.projectname
-		self.info.jobname = args.jobname
-		self.info.maxtime = datetime.timedelta(seconds=args.maxtime)
+		self.projectname = args.projectname
+		self.jobname = args.jobname
+		self.maxtime = args.maxtime
+		self.noisykills = args.noisykills
 		self.log2file = args.log2file
 		self.keepfilelogs = datetime.timedelta(days=args.keepfilelogs)
 		self.inputencoding = args.inputencoding
 		self.info.sysinfo = misc.SysInfo(self.inputencoding, self.inputerrors)
 		self.info.projectname = self._string(self.projectname)
 		self.info.jobname = self._string(self.jobname)
-
-		maxtime = self.maxtime
-		if not isinstance(maxtime, datetime.timedelta):
-			maxtime = datetime.timedelta(seconds=maxtime)
-		self.info.maxtime = maxtime
+		self.info.maxtime = self.maxtime
 
 		# Get source code
 		try:
 		# Current line number
 		self.lineno = 1
 
-		self.pidfilewritten = False
-
 		self.log = Tag(self._log)
 
 	def _handleexecution(self):
 		"""
 		Handle executing the job including handling of duplicate or hanging jobs.
 		"""
-		self.info.starttime = datetime.datetime.now()
+		# Obtain a lock on the script file to make sure we're the only one running
+		with open(self.info.sysinfo.scriptname, "rb") as f:
+			try:
+				fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+			except IOError, exc:
+				if exc[0] not in (errno.EACCES, errno.EAGAIN): # some other error
+					raise
+				# The previous invocation of the job is still running
+				return # Return without calling :meth:`execute`
 
-		pidfilename = ul4c.compile(self.pidfilename).renders(**self.info)
-		pidfilename = os.path.expanduser(pidfilename)
+			# we were able to obtain the lock, so we are the only one running
+			self.info.starttime = datetime.datetime.now()
 
-		formatlogline = self.formatlogline.replace("\n", "").replace("\r", "") + u"\n"
-		self._formatlogline = ul4c.compile(formatlogline)
+			formatlogline = self.formatlogline.replace("\n", "").replace("\r", "") + u"\n"
+			self._formatlogline = ul4c.compile(formatlogline)
 
-		self._createlogfile()
+			self._createlog()
 
-		self.log.sisyphus.info(u"{0.sysinfo.scriptname} (pid {0.sysinfo.pid})".format(self.info))
-		try: # is there a pid file from a running job?
-			pidfile = open(pidfilename, "r")
-		except IOError, exc: # no pid file => the job has finished without problems
-			if exc[0] == errno.ENOENT: # file not found
-				self._writepid(pidfilename)
-				self.log.sisyphus.info(u"no previous job running; here we go!")
+			self.log.sisyphus.info(u"{0.sysinfo.scriptname} (maxtime {0.maxtime} seconds; parent pid {0.sysinfo.pid})".format(self.info))
+
+			# Fork the process; the child will do the work; the parent will monitor the maximum runtime
+			pid = os.fork()
+
+			if pid: # We are the parent process
+				# set a signal that kills the child process after the maximum runtime
+				def alarm(signum, frame):
+					os.kill(pid, signal.SIGTERM) # Kill our child
+					self.log.sisyphus.error(u"Terminated after {} seconds!".format(self.maxtime))
+					if self.noisykills:
+						raise MaximumRuntimeExceeded(self.maxtime)
+					sys.exit(1)
+				signal.signal(signal.SIGALRM, alarm)
+				signal.alarm(self.maxtime)
+				os.wait() # Wait for the child process to terminate
+				return # Exit normally
+			# From now on we are in the child process
+			self.log.sisyphus.info(u"no previous job running; here we go (child pid {})!".format(os.getpid()))
+
+			try:
+				with url.Context():
+					result = self.execute()
+				self._cleanupoldlogs() # Clean up old logfiles
+			except BaseException, exc:
+				# log the error to the logfile, because the job probably didn't have a chance to do it
+				self.log.sisyphus.exc(exc)
+				result = u"failed with {}".format(self._exc(exc))
+				self.log.sisyphus.result.error(result)
+				self.failed()
+				raise
 			else:
-				raise
-		else:
-			lastmodified = datetime.datetime.fromtimestamp(os.fstat(pidfile.fileno()).st_mtime)
-			try:
-				pid = int(pidfile.read())
-			except ValueError:
-				# file is empty or otherwise broken (disk may have been full)
-				pidfile.close()
-				self._writepid(pidfilename)
-				self.log.sisyphus.warning(u"ignoring bogus pid file {} (invalid content)".format(pidfilename))
-			else:
-				pidfile.close()
-				# Check if this process really exists, if not continue as if the pid file wasn't there
-				try:
-					os.kill(pid, 0)
-				except OSError, exc:
-					if exc[0] != errno.ESRCH:
-						raise
-					self._writepid(pidfilename)
-					msg = u"ignoring bogus pid file {} (process with pid {} doesn't exist)".format(pidfilename, pid)
-					self.log.sisyphus.warning(msg)
-				else:
-					if self.info.maxtime and self.info.starttime-lastmodified > self.info.maxtime: # the job is to old, so it probably hangs => kill it
-						try:
-							os.kill(pid, 9)
-						except OSError, exc:
-							if exc[0] != errno.ESRCH: # there was no process
-								raise
-						self._writepid(pidfilename)
-						msg = u"killed previous job running with pid {} (ran {} seconds; {} allowed); here we go!".format(pid, self.info.starttime-lastmodified, self.info.maxtime)
-						self.log.sisyphus.warning(msg)
-					else:
-						msg = u"Job still running (for {}; {} allowed; started on {}) with pid {} (according to {})".format(self.info.starttime-lastmodified, lastmodified, pid, pidfilename)
-						self.log.sisyphus.warning(msg)
-						return # Return without calling :meth:`execute`
-
-		self._createloglink() # We only want to create the link if it is clear that the job *will* run
-		try:
-			with url.Context():
-				result = self.execute()
-			self._cleanupoldlogs() # Clean up old logfiles
-		except BaseException, exc:
-			# log the error to the logfile, because the job probably didn't have a chance to do it
-			self.log.sisyphus.exc(exc)
-			result = u"failed with {}".format(self._exc(exc))
-			self.log.sisyphus.result.error(result)
-			self.failed()
-			self._killpid(pidfilename)
-			raise
-		else:
-			# log the result
-			self.log.sisyphus.result(self._string(result))
-		finally:
-			if self._logfile is not None:
-				self._logfile.close()
-		self._killpid(pidfilename) # finished => remove the pid file
+				# log the result
+				self.log.sisyphus.result(self._string(result))
+			finally:
+				if self._logfile is not None:
+					self._logfile.close()
+				fcntl.flock(f, fcntl.LOCK_UN | fcntl.LOCK_NB)
+			os._exit(0)
 
 	def _log(self, tags, *texts):
 		"""
 					self._logfile.flush()
 					self.lineno += 1
 
-	def _createlogfile(self):
+	def _createlog(self):
 		"""
-		Create the logfile (if requested).
+		Create the logfile and the link to the logfile (if requested).
 		"""
 		self._logfile = None
 		self._logfilename = None
+		self._loglinkname = None
 		if self.log2file:
+			# Create the log file
 			logfilename = ul4c.compile(self.logfilename).renders(**self.info)
 			lf = self._logfilename = url.File(logfilename).abs()
 			self._logfile = lf.openwrite()
-
-	def _createloglink(self):
-		"""
-		Create a link to the logfile (if requested).
-		"""
-		self._loglinkname = None
-		if self.log2file and self.loglinkname is not None:
-			loglinkname = ul4c.compile(self.loglinkname).renders(**self.info)
-			ll = self._loglinkname = url.File(loglinkname).abs()
-			lf = self._logfilename
-			try:
-				lf.symlink(ll)
-			except OSError, exc:
-				if exc[0] == errno.EEXIST:
-					ll.remove()
+			if self.loglinkname is not None:
+				# Create the log link
+				loglinkname = ul4c.compile(self.loglinkname).renders(**self.info)
+				ll = self._loglinkname = url.File(loglinkname).abs()
+				lf = self._logfilename
+				try:
 					lf.symlink(ll)
-				else:
-					raise
+				except OSError, exc:
+					if exc[0] == errno.EEXIST:
+						ll.remove()
+						lf.symlink(ll)
+					else:
+						raise
 
 	def _cleanupoldlogs(self):
 		"""
 					self.log.sisyphus.info("Removing logfile {}".format(fileurl.local()))
 					fileurl.remove()
 
-	def _writepid(self, pidfilename):
-		"""
-		Create the file containing the pid of the current process.
-		"""
-		if not self.pidfilewritten:
-			with contextlib.closing(url.File(pidfilename).openwrite()) as file:
-				file.write(str(self.info.sysinfo.pid))
-			self.pidfilewritten = True
-
-	def _killpid(self, pidfilename):
-		"""
-		Delete the pid file.
-		"""
-		if self.pidfilewritten:
-			url.File(pidfilename).remove()
-			self.pidfilewritten = False
-
 	def _string(self, s):
 		"""
 		Convert :var:`s` to unicode if it's a :class:`str`.
 	:var:`args` are the command line arguments (:const:`None` results in
 	``sys.argv`` being used)
 	"""
+	job.parseargs(args)
 	job._setup()
-	job.parseargs(args)
 	job._handleexecution()