Commits

Alex Turcu committed ac42470

Script update (choice of java), flat emulation, updated fail count.

  • Participants
  • Parent commits 4ca85bc
  • Branches checkpoints

Comments (0)

Files changed (3)

 
 # Config section here
 DEBUG = False
-JAVA_CMD = ["java", "-server", "-XX:CompileThreshold=400", 
+JAVA_CMD = ["-server", "-XX:CompileThreshold=400", 
 		"-Xmx512m", "-Xms256m" ]
 JAVA_DBG = JAVA_CMD + ["-Xdebug", "-Xrunjdwp:transport=dt_socket,address=8001,server=y"] 
 
 	startup.add_argument("--time-out", type=int, default=200, help="Seconds after which the test is forcefully stopped.")
 	startup.add_argument("--presets", type=str, default="", help="Common configuration shorcuts. Choose from: %s" % PRESETS)
 	startup.add_argument("--jar", type=str, default="bench", choices=JAR_FILES.keys(), help="Which jar file (test) to run.")
+	startup.add_argument("--java", type=str, default="java", help="Which java executable to use.")
 	# Hyflow
 	hyflow = parser.add_argument_group("Hyflow")
 	hyflow .add_argument("--port", type=int, help="Base (coordinator node) port. Additional nodes increment from here.")
 	crt_core = 0
 	for node in range(args.offset, spawn_last):
 		print("Spawning node id=%d" % node)
+		cmd = [ args.java ]
 		if DEBUG and node == 1:
-			cmd = JAVA_DBG + ["-jar", find_jar() ] + make_args(args, node)
+			cmd = cmd + JAVA_DBG
 		else:
-			cmd = JAVA_CMD + ["-jar", find_jar() ] + make_args(args, node)
+			cmd = cmd + JAVA_CMD
+		cmd = cmd + ["-jar", find_jar() ] + make_args(args, node)
 		if not args.no_taskset:
 			cmd = ["taskset", "-c", "%d-%d" % (crt_core, crt_core + args.cores - 1)] + cmd
 			crt_core += args.cores
 
 	haistm {
 		checkpointProb = 30
+		emulateFlat = true
 	}
 	
 	logging {

hyflow-checkpoints/src/main/scala/scala/concurrent/stm/haistm/HaiInTxn.scala

 import org.hyflow.Hyflow
 import org.hyflow.core._
 import org.hyflow.api._
+import org.hyflow.core.util._
 import org.hyflow.core.directory.CpTracker
 import scala.concurrent.stm._
 import scala.concurrent.stm.motstm.MotStats
 import org.apache.commons.javaflow.Continuation
 import org.eintr.loglady.Logging
 import scala.collection.mutable
+import scala.util.Random
 import akka.actor._
 import akka.dispatch.{ Await, Future, ExecutionContext, Promise }
 import akka.pattern.ask
 
 	def restartAtInval[Z](exec: TxnExecutor, block: InTxn => Z): Tuple2[HaiTxnLevel, Continuation] = {
 		log.debug("Restarting transaction at invalidated checkpoint.")
-		Hyflow._topAborts += 1
 
 		// Let's rollback transaction
-		// Find out how far back we need to abort
 		var level = _currentLevel.root
-		while (level.status == Txn.Active && level.activeChild != null) {
-			log.trace("Passed through checkpoint: %s", level)
-			level = level.activeChild
+		
+		if (!HyflowConfig.cfg[Boolean]("hyflow.haistm.emulateFlat")) {
+			// Find out how far back we need to abort
+			while (level.status == Txn.Active && level.activeChild != null) {
+				log.trace("Passed through checkpoint: %s", level)
+				level = level.activeChild
+			}
+			log.trace("Stopped at checkpoint: %s | status = %s | activeChild = %s",
+				level, level.status, level.activeChild)
+		} else {
+			// If we emulate flat nesting, rollback to first ckpoint
+			if (level.status == Txn.Active && level.activeChild != null) {
+				level = level.activeChild
+			}
 		}
-		log.trace("Stopped at checkpoint: %s | status = %s | activeChild = %s",
-			level, level.status, level.activeChild)
 
 		// Resume execution at that level
 		var cont = level.checkpoint
 
 		(level, cont)
 	}
+	
+	private def prepareBackoff(prevAttempts: Int) = {
+		// Random back-off policy
+		// TODO: Move this into contention management module
+		implicit val executor = Hyflow.system.dispatcher
+		val backoff = (20 * Math.pow(2, Math.min(prevAttempts, 5)) * (1 + Random.nextFloat)).toInt
+		log.debug("Random exponential back-off. | sleep = %d | prevAttempts = %d", backoff, prevAttempts)
+		val promise = Promise[Boolean]()
+		Hyflow.system.scheduler.scheduleOnce(backoff milliseconds) {
+			promise.complete(Right(true))
+		}
+		// End random exponential back-off.
+		promise
+	}
 
 	protected def startCheckpointedExec[Z](exec: TxnExecutor, block: InTxn => Z): Z = {
 		// Set up transaction
 			// Start execution
 			var cont = Continuation.startWith(new HaiRunCkpt(exec, block))
 			log.debug("Returned from first checkpoint.")
+			var prevFailures = 0
 
 			while (true) {
 				log.debug("Checkpoint iteration. | cont = %s | status = %s",
 						cont = Continuation.continueWith(cont)
 					case res: CheckpointFailure =>
 						log.debug("Checkpoint failure.")
+						prevFailures += 1
+						Await.ready(prepareBackoff(prevFailures), 30 seconds)
 						val restartRes = restartAtInval(exec, block)
 						level = restartRes._1
 						cont = restartRes._2
 						if (tryCommit()) {
 							_currentLevel = null
 							Hyflow._topCommits += 1
+							Hyflow._topAborts += prevFailures
 							return res.result.asInstanceOf[Z]
 						} else {
 							log.debug("Commit failed")
+							prevFailures += 1
+							Await.ready(prepareBackoff(prevFailures), 30 seconds)
 							val restartRes = restartAtInval(exec, block)
 							level = restartRes._1
 							cont = restartRes._2
 						// TODO: Decide to commit or abort
 						// Abort and rethrow
 						_currentLevel = null
+						//TODO: Hyflow._permFails += 1
+						Hyflow._topAborts += prevFailures
 						throw res.e
 					case _@ res =>
 						log.error("Continuation returned unexpected object. | obj = %s", res)