1. Alex Turcu
  2. reflow

Commits

Alex Turcu  committed e6f97f1

Granola StableLog and ClusterManager

  • Participants
  • Parent commits 89e9b38
  • Branches granola

Comments (0)

Files changed (14)

File etc/granola.conf

View file
+hyflow.granola {
+    master = "127.0.0.1:40000"
+    cluster {
+        nodes = 2
+        repos = [0, 1]
+    }
+}

File hyflow-granola/src/main/scala/org/hyflow/Hyflow.scala

View file
 import org.hyflow.granola.server._
 
 import akka.actor._
+import akka.dispatch._
+import akka.pattern.ask
+import akka.util.duration._
+
 import com.typesafe.config.{ Config, ConfigFactory }
 
+object HyflowCfg extends CfgObj("hyflow.granola")
+
 object Hyflow extends GranolaServer with GranolaClient {
 
 	// Common types
 
 	// Initialize akka actor system
 	var system: ActorSystem = _
+
+	// Cluster interface
+	var clusterManager: ActorRef = _
+	var master_clusterMgr: ActorRef = _
 	
-	// Other actors
-	var clusterManager: ActorRef = _
+	var barrierManager: ActorRef = _ 
 
-	// Legacy
-	def barrier(barrierName: String) {}
-	def init() {
+	// Replicas
+	def getReplicas(repo: Long) = {
+	}
+
+	// Barrier
+	// TODO: add enumeration to configure # of barrier waiters
+	def barrier(barrierName: String, threads: Boolean) {
+		import org.hyflow.benchmarks.BenchCfg
+		// TODO // TODO // TODO // TODO: actually, number of benchmark injectors, not nodes!
+		val nodes = HyflowCfg.i("cluster.nodes")
+		val totalThreads = BenchCfg.i("clientThreads") * nodes
+		val waitOn = if (threads) totalThreads else nodes 
+		val barr_fu = ask(cluster(0).entry.barrierManager, BarrierMsg(barrierName, waitOn))(30 seconds)
+		Await.ready(barr_fu, 30 seconds)
+	}
+	
+	// Initialize Hyflow
+	def init(repoId: Long) {
+		// Init config
+		HyflowConfig.init()
+		Hyflow.repoState.repoId = repoId
 		// Configuration
 		val akkaConfigFile = new java.io.File("etc/application.conf")
 		// Configure port
 		//val newPort = HyflowConfig.cfg[Int]("hyflow.basePort") + HyflowConfig.cfg[Int]("id")
 		//val hostname = HyflowConfig.cfg[String]("hyflow.hostname")
 		val confOverrides = ConfigFactory.parseString(
-			"akka.remote.netty.port = %d\n".format(40000) +
-				"akka.remote.netty.hostname = \"%s\"\n".format("127.0.0.1")
+			"akka.remote.netty.port = %d\n".format(40000 + repoId) +
+			"akka.remote.netty.hostname = \"%s\"\n".format("127.0.0.1")
 		)
 		system = ActorSystem("hyflow", confOverrides.withFallback(akkaConf))
-		clusterManager = system.actorOf(Props[ClusterManager])
+		clusterManager = system.actorOf(Props[ClusterManager], name = "clusterManager")
+		barrierManager = system.actorOf(Props[BarrierManager], name = "barrierManager")
 		
+		master_clusterMgr = system.actorFor("akka://hyflow@%s/user/clusterManager".format(HyflowCfg("master")))
+
 		init_server()
-		
-		// Init cluster configuration
+
+		// Inform cluster master about this node
+		master_clusterMgr ! ClusterHello(repoId, NodeEntryPoints())
+		// Await cluster formation
+		synchronized {
+			wait()
+		}
+		Hyflow.stableLog ! 'update_replicas
 	}
 }

File hyflow-granola/src/main/scala/org/hyflow/benchmarks/BenchApp.scala

View file
 
 import org.eintr.loglady.Logging
 
-object BenchApp extends App with Logging {
+object BenchApp extends App {
+	
+	// Configure logging
 	
 	def benchmark() {
-		log.info("Starting Hyflow benchmark.")
+		if (args.length != 1) {
+			println("Incorrect number of arguments. Usage: hyflow-granola <repoId>")
+			return
+		}
+		
+		System.setProperty("logback.configurationFile", new java.io.File("etc/logback.xml").getAbsolutePath)
+		System.setProperty("hyflow_nodeid", args(0))
+		System.setProperty("hyflow_hostname", "host") // TODO
+		
+		println("Starting Hyflow benchmark.")
 		var bench: Benchmark = new GranolaBaseBenchmark()
 		
-		Hyflow.init()
+		Hyflow.init(args(0).toLong)
 		bench.runBenchmark()
 		Hyflow.system.shutdown()
 	}

File hyflow-granola/src/main/scala/org/hyflow/benchmarks/Benchmark.scala

View file
 
 import org.eintr.loglady.Logging
 
-object BenchCfg extends util.ConfigObj("hyflow.benchmarks") {
-	val clientThreads = d("clientThreads", 3)
-	val roundTime = d("roundTime", 1)
-	val rounds = d("rounds", 20)
-	val readOnlyRatio = d("readOnlyRatio", 50.0)
-}
+object BenchCfg extends util.CfgObj("hyflow.benchmarks", Map(
+		"clientThreads" -> 3,
+		"roundTime" -> 1,
+		"rounds" -> 20,
+		"readOnlyRatio" -> 50.0))
 
 abstract class Benchmark extends Logging {
 
 
 		// Create client threads
 		val node = 0
-		val threads = for (i <- 0 until BenchCfg.clientThreads)
+		val threads = for (i <- 0 until BenchCfg.i("clientThreads"))
 			yield new ClientThread(node, i)
 
 		// Wait for all nodes to reach this point
-		Hyflow.barrier("init-done")
+		Hyflow.barrier("init-done", false)
 
 		// Start threads
 		threads.foreach(_.start())
 
 		def benchRound() = {
 			val startTime = System.currentTimeMillis
-			val estimatedEndTime = startTime + BenchCfg.roundTime * 1000
+			val estimatedEndTime = startTime + BenchCfg.i("roundTime") * 1000
 
 			var j = 0
 
 		}
 
 		override def run() {
-			for (i <- 1 to BenchCfg.rounds) {
+			for (i <- 1 to BenchCfg.i("rounds")) {
 				val elapsed = benchRound()
 				if (id == 0) {
-					//log.info("Round %d: %.2fk tps", i,  1.0 * ops / elapsed )
+					log.info("Round %d: time=%d ms", i, elapsed )
 					//ops = 0
 
 					val ktps_fu = ask(Hyflow.granolaExecutor, QueryThroughput())(1 second)
 
 					log.info("Round %d: %s ktps", i, ktps)
 				}
-				Hyflow.barrier("end_round")
+				Hyflow.barrier("end-round", true)
 			}
 		}
 	}

File hyflow-granola/src/main/scala/org/hyflow/benchmarks/granolabase/GranolaBaseBenchmark.scala

View file
 	//var sTrie = Map[String,GranCtr]()
 	
 	
-	def benchInit() {
+	override def benchInit() {
 		// Register server-side transactions
 		Hyflow.registerAtomic("transfer") { args =>
 			val id1 = scala.util.Random.nextInt(BaseBenchCfg.NUM_COUNTERS)
 		if (true) {
 			Hyflow.invokeSingle(0, ("transfer", null), false)
 		} else {
-			Hyflow.invokeSingle(0, ("read-all", null), true)
+			Hyflow.invokeSingle(0, ("sum-all", null), true)
 		}
 	}
 	

File hyflow-granola/src/main/scala/org/hyflow/granola/client/GranolaClient.scala

View file
 	import org.hyflow.Hyflow.{HyflowTxnReq, system}
 	
 	// TODO: May need to make this an atomic reference??? 
-	var cluster = Map[Long, RepoEntryPoints]()
+	var cluster = Map[Long, RepoInfo]()
 	
 	private val cclk = new AtomicLong()
 
 
 	def invokeSingle(rid: Int, txnReq: HyflowTxnReq, readOnly: Boolean): Any = {
 		val req = InvokeSingleReq(txnReq, readOnly, getTxnId(), cclk.get)
-		val req_fu = ask(cluster(rid).dispatcher, req)(5 seconds)
+		val req_fu = ask(cluster(rid).entry.dispatcher, req)(5 seconds)
 		val reply = Await.result(req_fu, 5 seconds).asInstanceOf[InvocationReply]
 		updateClock(reply.sclk)
 		reply.result
 		val reqs_fu =
 			for ((rid, txnReq) <- ridTxnReqs) yield {
 				val req = makeReq(txnReq, repoIds, txnId, cclk_)
-				ask(cluster(rid).dispatcher, req)(5 seconds)
+				ask(cluster(rid).entry.dispatcher, req)(5 seconds)
 			}
 
 		implicit val executor = system.dispatcher

File hyflow-granola/src/main/scala/org/hyflow/granola/common/MiscReqs.scala

View file
 
 case class QueryThroughput()
 
-object RepoEntryPoints {
+object NodeEntryPoints {
 	def apply() = {
-		val res = new RepoEntryPoints(Hyflow.repoState.repoId) // TODO: config my id
+		// Returns entry point for local node
+		val res = new NodeEntryPoints(Hyflow.repoState.repoId)
 		res.dispatcher = Hyflow.granolaDispatcher
 		res.clusterManager = Hyflow.clusterManager
+		res.stableLog = Hyflow.stableLog
+		res.barrierManager = Hyflow.barrierManager
 		res
 	}
 }
 
-class RepoEntryPoints(rid: Long) {
-	// TODO: provide easy entry-points for remote messaging
+// provide entry-points for one node for remote messaging
+case class NodeEntryPoints(rid: Long) {
 	var dispatcher: ActorRef = _
 	var clusterManager: ActorRef = _
-}
+	var stableLog: ActorRef = _
+	var barrierManager: ActorRef = _
+}
+
+case class RepoInfo(rid: Long, entry: NodeEntryPoints, replicas: Set[NodeEntryPoints])

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaDispatcher.scala

View file
 	def receive() = {
 		// Request from client
 		case req: InvocationRequest =>
-			log.trace("Received request. | req = %s", req)
+			log.debug("Received request. | req = %s", req)
 
 			// Step 1 of the protocol: select time-stamp 
 			// (this is only one example of how TS can be selected)
 
 		// A transaction has become ready
 		case key: Tuple2[Long, Long] =>
-			log.trace("Transaction is ready. | key = %s", key)
+			log.debug("Transaction is ready. | key = %s", key)
 			// Return worker to pool
 			workers ::= sender
 
 		// An independent transaction is ready with a different time-stamp
 		case m: Tuple2[Tuple2[Long,Long], Long] =>
 			val (key0, finalTS) = m
-			log.trace("Transaction is ready. | key0 = %s | finalTS = %d", key0, finalTS)
+			log.debug("Transaction is ready. | key0 = %s | finalTS = %d", key0, finalTS)
 			// Return worker to pool
 			workers ::= sender
 

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaProtocolWorker.scala

View file
 import org.hyflow.granola.common._
 import org.eintr.loglady.Logging
 import akka.actor._
+import akka.routing._
+import akka.dispatch._
 import java.util.concurrent._
 
 object WorkerPool {
 		// Granola Protocol for handling single-repository/independent transactions
 		// granola-thesis.pdf, page 49, 51
 		case m: RequestRecord =>
-			log.trace("Received request. | m = %s | sender = %s", m, sender)
+			log.debug("Received request. | m = %s | sender = %s", m, sender)
 
 			record = m
 			val tid = record.req.txnId
 			Hyflow.stableLog ! record
 
 		case m: LogDone if (record.requestType == TxnType.Single) =>
-			log.trace("Stable log done for SingleNode transaction. | ts_tid = %s", ts_tid)
+			log.debug("Stable log done for SingleNode transaction. | ts_tid = %s", ts_tid)
 			// Step 3 of SingleNode: Ready to execute, send back to dispatcher
 			Hyflow.granolaDispatcher ! ts_tid
 			// Done, clear assignment
 			// Init
 			votes = 0
 			finalTS = record.ts
-			log.trace("Stable log done for Independent transaction. | ts_tid = %s", ts_tid)
+			log.debug("Stable log done for Independent transaction. | ts_tid = %s", ts_tid)
 			// Step 3 of Independent: Vote for final time-stamp
 			val req = record.req.asInstanceOf[InvokeIndepReq]
 			val myrid = Hyflow.repoState.repoId
 			// Send own vote
 			for (rid <- req.repoIds) {
 				if (rid != myrid) {
-					Hyflow.repos(rid) !
+					Hyflow.cluster(rid).entry.dispatcher !
 						IndepVote(record.req.txnId, myrid, record.ts, TxnStatus.Commit, false)
 				}
 			}
 		}
 	}
 }
+
+class VoteRouter extends Actor {
+	def receive() = {
+		case m: IndepVote =>
+			
+	}
+}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaStableLog.scala

View file
 
 import scala.collection.mutable
 
+case class VSReplComplete
+case class VSReplRequest(batch: List[RequestRecord])
+
+// TODO: Implement viewstamp replication
 class GranolaStableLog extends Actor with Logging {
-	
-	val queue = mutable.ListBuffer[RequestRecord]()
-	
+	// MASTER SIDE
+	// Init with initial set of replicas for this repo
+	var replicas: Set[NodeEntryPoints] = _
+	var numReplicas = 0
+
+	// Holds batch of requests currently replicating
+	var crtLogBatch = List[Tuple2[RequestRecord, ActorRef]]()
+	// Holds a buffer for the next batch of requests to replicate
+	val nextLogBatch = mutable.ListBuffer[Tuple2[RequestRecord, ActorRef]]()
+
+	// Keep track which replicas acknowledged current replication request 
+	var replies = mutable.ListBuffer[ActorRef]()
+	// Count how many replies still to go
+	var expectingReplies = 0
+	// Next replication request is processed immediately
+	var immediate = true
+
+	def updateReplicas() = {
+		replicas = Hyflow.cluster(Hyflow.repoState.repoId).replicas
+		numReplicas = replicas.count(_ => true)
+
+	}
+
+	def replicate() {
+		// Notify transactions in current batch
+		for ((m, sndr) <- crtLogBatch) {
+			sndr ! LogDone(m.req.txnId)
+		}
+
+		// Update with new batch
+		crtLogBatch = nextLogBatch.toList
+		nextLogBatch.clear()
+
+		if (numReplicas != 0) {
+			// Send replication requests
+			val msg = VSReplRequest(crtLogBatch.map(_._1))
+			for (replica <- replicas) {
+				replica.stableLog ! msg
+			}
+			replies.clear()
+			expectingReplies = numReplicas
+			immediate = false
+		} else {
+			// Nothing to do, we are the only replica in this repo
+			sender ! LogDone(crtLogBatch.head._1.req.txnId)
+			crtLogBatch = Nil
+			immediate = true
+		}
+	}
+
+	// REPLICA SIDE
+	val pendingReplication = mutable.ListBuffer[List[RequestRecord]]()
+
 	def receive() = {
-		// Granola replication
+		case 'update_replicas =>
+			updateReplicas()
+		// Transaction requests stable log
 		case m: RequestRecord =>
-			log.trace("Recv stable log req. | m = %s | sender = %s", m, sender)
-			queue.append(m)
-			sender.tell(LogDone(m.req.txnId))
+			nextLogBatch.append((m, sender))
+
+			if (immediate) {
+				replicate()
+			}
+
+		// All replicas are finished
+		case m: VSReplComplete if (expectingReplies == 1) =>
+			replicate()
+
+		// One replica is finished, but not all
+		case m: VSReplComplete =>
+			replies += sender
+			expectingReplies -= 1
+
+		// Master has requested replication
+		case m: VSReplRequest =>
+			// TODO: implement log
+			pendingReplication += m.batch
+			// Acknowledge request
+			sender ! VSReplComplete
 	}
 }

File hyflow-granola/src/main/scala/org/hyflow/util/BarrierManager.scala

View file
+package org.hyflow.util
+
+import akka.actor._
+import scala.collection.mutable
+
+case class BarrierMsg(id: String, target: Int) 
+case class BarrierRespMsg()
+
+// TODO: Barrier timeout?
+class BarrierManager extends Actor {
+	case class BarrierEntry(
+		var count: Int = 0,
+		val waiters: mutable.ListBuffer[ActorRef] = mutable.ListBuffer())
+
+	val entries = mutable.Map[String, BarrierEntry]()
+
+	def receive() = {
+		case m: BarrierMsg =>
+			val entry = entries.getOrElseUpdate(m.id, BarrierEntry())
+			entry.count += 1
+			entry.waiters += sender
+			if (entry.count >= m.target) {
+				entry.waiters.foreach(_ ! BarrierRespMsg())
+				entries.remove(m.id)
+			}
+	}
+}

File hyflow-granola/src/main/scala/org/hyflow/util/ClusterManager.scala

View file
 
 import akka.actor._
 
-case class ClusterHello(repoId: Long, entry: RepoEntryPoints)
-case class ClusterComplete(cluster: Map[Long, RepoEntryPoints])
+import org.eintr.loglady.Logging
 
-class ClusterManager extends Actor {
+import scala.collection.mutable
 
-	var numRepos = 1
-	var cluster = Map[Long, RepoEntryPoints]()
+
+case class ClusterHello(repoId: Long, entry: NodeEntryPoints)
+case class ClusterComplete(master: Boolean, cluster: Map[Long, RepoInfo])
+
+object ClusterCfg extends CfgObj("hyflow.granola.cluster")
+
+class ClusterManager extends Actor with Logging {
+
+	var numRepos = 0
+	var numNodes = 0
+	var cluster = Map[Long, RepoInfo]()
+	var notifyComplete = List[ActorRef]()
 
 	def receive() = {
 		case m: ClusterHello =>
+			log.debug("%s, remote=%s", m, m.entry.clusterManager)
+			numNodes += 1
+			notifyComplete ::= sender
 
-			cluster += ((m.repoId, m.entry))
-			numRepos += 1
+			val ri = cluster.get(m.repoId) match {
+				case Some(old) =>
+					// Subsequent nodes will be replicas
+					RepoInfo(old.rid, old.entry, old.replicas + m.entry)
+				case None =>
+					// First node to join for a repo will be master
+					numRepos += 1
+					RepoInfo(m.repoId, m.entry, Set())
+			}
+			cluster = cluster.updated(m.repoId, ri)
 
-			if (numRepos == 2) { // TODO: configurable
-				cluster += ((0, RepoEntryPoints()))
-				 for ( (rid, entry) <- cluster) {
-					 if (rid != Hyflow.repoState.repoId) {
-						 entry.clusterManager ! cluster
-					 }
-				 }
+			// Wait for configured number of nodes to join the cluster,
+			// then inform all about cluster composition
+			if (numNodes == ClusterCfg.i("nodes")) {
+				log.debug("cluster=%s", cluster)
+				for ((rid, repo) <- cluster) {
+					//if (rid != Hyflow.repoState.repoId) {
+						// Notify master
+						repo.entry.clusterManager ! ClusterComplete(true, cluster)
+						// Notify replicas
+						repo.replicas.foreach {
+							_.clusterManager ! ClusterComplete(false, cluster)
+						}
+					//}
+				}
 			}
+			
+			// Complete waiting futures, if any
+			notifyComplete.foreach(_ ! 'clusterComplete)
 		case m: ClusterComplete =>
-			Hyflow.cluster = m.cluster
-			// TODO: start benchmark?
+			Hyflow.synchronized {
+				Hyflow.cluster = m.cluster
+				Hyflow.notifyAll()
+			}
 	}
 }

File hyflow-granola/src/main/scala/org/hyflow/util/Config.scala

View file
+package org.hyflow.util
+
+import scala.collection.mutable
+import com.typesafe.config._
+
+class CfgObj(cfgPath: String, defaults: Map[String,Any] = Map()) {
+	def cfg = HyflowConfig.cfg
+	
+	def apply(key: String): String = {
+		val path = cfgPath + "." + key
+		if (cfg.hasPath(path)) {
+			cfg.getString(path)
+		} else {
+			defaults.get(key) match {
+				case Some(x) =>
+					x.toString()
+				case None =>
+					throw new ConfigException.Missing(path)
+			}
+		}
+	}
+	
+	def i(key: String): Int = apply(key).toInt
+	def b(key: String): Boolean = apply(key).toBoolean
+	def f(key: String): Float = apply(key).toFloat
+}
+
+object HyflowConfig {
+	var cfg: Config = _
+
+	def init(fileName: String = "etc/granola.conf") {
+		val f = new java.io.File(fileName)
+		cfg = ConfigFactory.parseFile(f)
+	}
+}

File hyflow-granola/src/main/scala/org/hyflow/util/ConfigObj.scala

-package org.hyflow.util
-
-import scala.collection.mutable
-class ConfigObj(cfgPath: String) {
-	
-	val values = mutable.Map[String,String]()
-	
-	def d(key: String, v: String): String = {
-		v
-	}
-	def d[T](key: String, v: T): T = { 
-		v 
-	}
-	
-	def apply(key: String): String = {
-		values(key)
-	}
-}