Commits

Alex Turcu committed 89e9b38

Progress on Indep transactions & utils.

Comments (0)

Files changed (14)

hyflow-granola/src/main/scala/org/hyflow/Hyflow.scala

 package org.hyflow
 
-import org.hyflow.granola._
+import org.hyflow.util._
+import org.hyflow.granola.common._
 import org.hyflow.granola.client._
 import org.hyflow.granola.server._
 
 
 	// Initialize akka actor system
 	var system: ActorSystem = _
+	
+	// Other actors
+	var clusterManager: ActorRef = _
 
 	// Legacy
 	def barrier(barrierName: String) {}
 				"akka.remote.netty.hostname = \"%s\"\n".format("127.0.0.1")
 		)
 		system = ActorSystem("hyflow", confOverrides.withFallback(akkaConf))
+		clusterManager = system.actorOf(Props[ClusterManager])
 		
 		init_server()
 		
-		repos(0) = granolaScheduler
+		// Init cluster configuration
 	}
 }

hyflow-granola/src/main/scala/org/hyflow/benchmarks/Benchmark.scala

 import org.eintr.loglady.Logging
 
 object BenchCfg extends util.ConfigObj("hyflow.benchmarks") {
-	val clientThreads = d("clientThreads", 4)
+	val clientThreads = d("clientThreads", 3)
 	val roundTime = d("roundTime", 1)
 	val rounds = d("rounds", 20)
 	val readOnlyRatio = d("readOnlyRatio", 50.0)

hyflow-granola/src/main/scala/org/hyflow/granola/client/GranolaClient.scala

 trait GranolaClient extends Logging {
 	import org.hyflow.Hyflow.{HyflowTxnReq, system}
 	
-	val repos = mutable.Map[Int, ActorRef]()
+	// TODO: May need to make this an atomic reference??? 
+	var cluster = Map[Long, RepoEntryPoints]()
 	
 	private val cclk = new AtomicLong()
 
 
 	def invokeSingle(rid: Int, txnReq: HyflowTxnReq, readOnly: Boolean): Any = {
 		val req = InvokeSingleReq(txnReq, readOnly, getTxnId(), cclk.get)
-		val req_fu = ask(repos(rid), req)(5 seconds)
+		val req_fu = ask(cluster(rid).dispatcher, req)(5 seconds)
 		val reply = Await.result(req_fu, 5 seconds).asInstanceOf[InvocationReply]
 		updateClock(reply.sclk)
 		reply.result
 		val reqs_fu =
 			for ((rid, txnReq) <- ridTxnReqs) yield {
 				val req = makeReq(txnReq, repoIds, txnId, cclk_)
-				ask(repos(rid), req)(5 seconds)
+				ask(cluster(rid).dispatcher, req)(5 seconds)
 			}
 
 		implicit val executor = system.dispatcher

hyflow-granola/src/main/scala/org/hyflow/granola/common/MiscReqs.scala

 package org.hyflow.granola.common
 
+import org.hyflow.Hyflow
+import akka.actor._
+
 case class QueryThroughput()
+
+object RepoEntryPoints {
+	def apply() = {
+		val res = new RepoEntryPoints(Hyflow.repoState.repoId) // TODO: config my id
+		res.dispatcher = Hyflow.granolaDispatcher
+		res.clusterManager = Hyflow.clusterManager
+		res
+	}
+}
+
+class RepoEntryPoints(rid: Long) {
+	// TODO: provide easy entry-points for remote messaging
+	var dispatcher: ActorRef = _
+	var clusterManager: ActorRef = _
+}

hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaDispatcher.scala

+package org.hyflow.granola.server
+
+import org.hyflow.Hyflow
+import org.hyflow.granola.common._
+
+import org.eintr.loglady.Logging
+import akka.actor._
+
+import scala.collection.SortedMap
+import scala.collection.mutable.ListBuffer
+
+
+// Dispatcher (Worker pool + transaction dispatcher)
+class GranolaDispatcher extends Actor with Logging {
+
+	var workers = List[ActorRef]()
+	var pending = SortedMap[Tuple2[Long, Long], RequestRecord]()
+	var ready = SortedMap[Tuple2[Long, Long], RequestRecord]()
+
+	// See if we can execute any transactions
+	def processReadyTransactions() {
+		// TODO: optimize
+		// TODO: test throughput by sending individual vs batches
+		var batch = ListBuffer[RequestRecord]()
+		if (pending.isEmpty) {
+			// If no transactions are pending, we can execute all ready transactions
+			batch.appendAll(ready.values)
+			ready = SortedMap()
+		} else {
+			// Pay attention not to call firstKey on an empty map
+			import scala.math.Ordering.Implicits._
+			while (!ready.isEmpty && (ready.firstKey < pending.firstKey)) {
+				val (key, value) = ready.first
+				batch.append(value)
+				ready -= key
+			}
+		}
+
+		// Send batch to executor
+		if (!batch.isEmpty) {
+			Hyflow.granolaExecutor ! batch.toList
+		}
+	}
+
+	// Manage a worker pool, forward requests to protocol worker actors
+	def forwardToWorker(reqRec: RequestRecord) {
+		if (workers.isEmpty) {
+			// Create a new actor to handle this request
+			val worker0 = context.actorOf(Props[GranolaProtocolWorker], name = "worker" + scala.util.Random.nextInt)
+			worker0.forward(reqRec)
+		} else {
+			// Take one from pool
+			workers.head.forward(reqRec)
+			workers = workers.tail
+		}
+	}
+
+	def receive() = {
+		// Request from client
+		case req: InvocationRequest =>
+			log.trace("Received request. | req = %s", req)
+
+			// Step 1 of the protocol: select time-stamp 
+			// (this is only one example of how TS can be selected)
+			// For Independent transactions, this is a proposedTS
+			val ts = scala.math.max(req.cclk, Hyflow.repoState.lastExecTS.get) + 1
+
+			// Create transaction record
+			val ts_tid = (ts, req.txnId)
+			val reqRec = RequestRecord(req, ts, sender)
+
+			// If it's a read-only, single-node transaction, it's ready right away
+			if (reqRec.requestType == TxnType.Single && req.asInstanceOf[InvokeSingleReq].readOnly) {
+				ready += ((ts_tid, reqRec))
+				processReadyTransactions()
+			} else {
+				// Otherwise record pending transaction
+				pending += ((ts_tid, reqRec))
+				forwardToWorker(reqRec)
+			}
+
+		// A transaction has become ready
+		case key: Tuple2[Long, Long] =>
+			log.trace("Transaction is ready. | key = %s", key)
+			// Return worker to pool
+			workers ::= sender
+
+			// Update state
+			val ready0 = pending(key)
+			pending -= key
+			ready += ((key, ready0)) // TODO: try to avoid this insertion if possible
+
+			processReadyTransactions()
+			
+		// An independent transaction is ready with a different time-stamp
+		case m: Tuple2[Tuple2[Long,Long], Long] =>
+			val (key0, finalTS) = m
+			log.trace("Transaction is ready. | key0 = %s | finalTS = %d", key0, finalTS)
+			// Return worker to pool
+			workers ::= sender
+
+			// Update state
+			val ready0 = pending(key0)
+			pending -= key0
+			// Use updated TS to insert into ready queue
+			val key1 = (finalTS, key0._2)
+			val ready1 = RequestRecord(ready0.req, finalTS, ready0.client)
+			ready += ((key1, ready1)) // TODO: try to avoid this insertion if possible
+
+			processReadyTransactions()
+		
+		case m: IndepVote =>
+			val worker = WorkerPool.assignments.get(m.txnId)
+			if (worker == null) {
+				var voteq = WorkerPool.missedVotes.get(m.txnId)
+				if (voteq == null) {
+					import java.util.concurrent._
+					voteq = WorkerPool.missedVotes.putIfAbsent(m.txnId, new ConcurrentLinkedQueue)
+				}
+				voteq.add(m)
+			} else {
+				// TODO: is it better to just add all messages to the queue, 
+				// and only forward when done?
+				worker.forward(m)
+			}
+	}
+}
+

hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaExecutor.scala

 	var roundOps = 0L
 	var totalOps = 0L
 	var startTime = System.currentTimeMillis
+	var lastRoundTime = startTime
 
 	def receive() = {
 
 
 		case m: QueryThroughput =>
 			val endTime = System.currentTimeMillis
-			val ktps = 1.0 * roundOps / (endTime - startTime)
-			startTime = endTime
+			val ktps = 1.0 * roundOps / (endTime - lastRoundTime)
 			totalOps += roundOps
+			val total_ktps = 1.0 * totalOps / (endTime - startTime)
+			lastRoundTime = endTime
 			roundOps = 0
-			sender.tell(ktps)
+			sender.tell( (ktps, total_ktps) )
 	}
 }

hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaProtocolWorker.scala

 import org.hyflow.granola.common._
 import org.eintr.loglady.Logging
 import akka.actor._
+import java.util.concurrent._
+
+object WorkerPool {
+	val assignments = new ConcurrentHashMap[Long, ActorRef]
+	val missedVotes = new ConcurrentHashMap[Long, ConcurrentLinkedQueue[IndepVote]]
+}
 
 // Workers
+// TODO: clean code up by using become for specific types of transactions
 class GranolaProtocolWorker extends Actor with Logging {
 
 	var record: RequestRecord = _
 	var reqType: Int = _
 	var ts_tid: Tuple2[Long, Long] = _
+	// Independent Transactions
+	var votes = 0
+	var finalTS = 0L
 
 	def receive() = {
-		// Granola Protocol for handling single-repository transactions
-		// granola-thesis.pdf, page 49
+		// Granola Protocol for handling single-repository/independent transactions
+		// granola-thesis.pdf, page 49, 51
 		case m: RequestRecord =>
 			log.trace("Received request. | m = %s | sender = %s", m, sender)
 
 			record = m
-			reqType = record.req match {
-				case m: InvokeSingleReq => 0
-				case m: InvokeIndepReq => 1
-				case m: InvokeCoordReq => 2
-			}
-			ts_tid = (record.ts, record.req.txnId)
+			val tid = record.req.txnId
+			val ts = record.ts
+
+			// TODO: we don't need this for single-node txns, maybe we can optimize it out
+			WorkerPool.assignments.put(tid, self)
+
+			ts_tid = (ts, tid)
 
 			// Step 2: stable log write
 			// TODO: send to all replicas of this repository
-			//Hyflow.stableLog ! record
-			context.actorFor("/user/stablelog") ! record
+			Hyflow.stableLog ! record
 
-		case m: LogDone =>
-			log.trace("Stable log done. | ts_tid = %s | sender = %s", ts_tid, sender)
-			// Step 3: Ready to execute, send back to scheduler
-			Hyflow.granolaScheduler ! ts_tid
+		case m: LogDone if (record.requestType == TxnType.Single) =>
+			log.trace("Stable log done for SingleNode transaction. | ts_tid = %s", ts_tid)
+			// Step 3 of SingleNode: Ready to execute, send back to dispatcher
+			Hyflow.granolaDispatcher ! ts_tid
+			// Done, clear assignment
+			WorkerPool.assignments.remove(ts_tid._2)
+
+		case m: LogDone if (record.requestType == TxnType.Indep) =>
+			// Init
+			votes = 0
+			finalTS = record.ts
+			log.trace("Stable log done for Independent transaction. | ts_tid = %s", ts_tid)
+			// Step 3 of Independent: Vote for final time-stamp
+			val req = record.req.asInstanceOf[InvokeIndepReq]
+			val myrid = Hyflow.repoState.repoId
+
+			// Send own vote
+			for (rid <- req.repoIds) {
+				if (rid != myrid) {
+					Hyflow.repos(rid) !
+						IndepVote(record.req.txnId, myrid, record.ts, TxnStatus.Commit, false)
+				}
+			}
+
+			// Process previously missed votes
+			val voteq = WorkerPool.missedVotes.get(ts_tid._2)
+			import scala.collection.JavaConversions._
+			for (vote <- voteq) {
+				processVote(vote, req)
+			}
+		case vote: IndepVote =>
+			processVote(vote, record.req.asInstanceOf[InvokeIndepReq])
+
+	}
+
+	def processVote(vote: IndepVote, req: InvokeIndepReq) {
+		votes += 1
+		if (vote.proposedTS > finalTS)
+			finalTS = vote.proposedTS
+		if (votes == req.repoIds.length - 1) {
+			// Received all votes
+			if (finalTS != record.ts) {
+				// Time-stamp has changed 
+				Hyflow.granolaDispatcher ! (ts_tid, finalTS)
+			} else {
+				// Same time-stamp as before
+				Hyflow.granolaDispatcher ! ts_tid
+			}
+			// Done, clear assignment
+			WorkerPool.assignments.remove(ts_tid._2)
+		}
 	}
 }

hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaRepository.scala

 	var repoId: Long = _
 	var mode = new AtomicInteger() // 0 for timestamp, 1 for locking
 	var lastExecTS = new AtomicLong()
-}
+}
+

hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaScheduler.scala

-package org.hyflow.granola.server
-
-import org.hyflow.Hyflow
-import org.hyflow.granola.common._
-
-import org.eintr.loglady.Logging
-import akka.actor._
-
-import scala.collection.SortedMap
-import scala.collection.mutable.ListBuffer
-
-// Messages
-case class RequestRecord(req: InvocationRequest, ts: Long, client: ActorRef)
-
-// Pool
-class GranolaScheduler extends Actor with Logging {
-
-	var workers = List[ActorRef]()
-	var pending = SortedMap[Tuple2[Long, Long], RequestRecord]()
-	var ready = SortedMap[Tuple2[Long, Long], RequestRecord]()
-
-	def receive() = {
-
-		case req: InvocationRequest =>
-			log.trace("Received request. | req = %s", req)
-			// Step 1 of the protocol: select time-stamp 
-			// (this is only one example of how TS can be selected)
-			val ts = scala.math.max(req.cclk, Hyflow.repoState.lastExecTS.get) + 1
-
-			// Record pending transaction
-			val ts_tid = (ts, req.txnId)
-			val reqRec = RequestRecord(req, ts, sender)
-			pending += ((ts_tid, reqRec))
-
-			// Manage a worker pool, forward requests to other actors
-			if (workers.isEmpty) {
-				// Create a new actor to handle this request
-				val worker0 = context.actorOf(Props[GranolaProtocolWorker], name = "worker"+scala.util.Random.nextInt)
-				worker0.forward(reqRec)
-			} else {
-				// Take one from pool
-				workers.head.forward(reqRec)
-				workers = workers.tail
-			}
-
-		// A transaction has become ready
-		case key: Tuple2[Long, Long] =>
-			log.trace("Transaction is ready. | key = %s", key)
-			// Return worker to pool
-			workers ::= sender
-			
-			// Update state
-			val ready0 = pending(key)
-			pending -= key
-			ready += ((key, ready0)) // TODO: try to avoid this insertion if possible
-
-			// See if we can execute any transactions
-			// TODO: optimize
-			// TODO: test throughput by sending individual vs batches
-			var batch = ListBuffer[RequestRecord]()
-			if (pending.isEmpty) {
-				// If no transactions are pending, we can execute all ready transactions
-				batch.appendAll(ready.values)
-				ready = SortedMap()
-			} else {
-				// Pay attention not to call firstKey on an empty map
-				import scala.math.Ordering.Implicits._
-				while (!ready.isEmpty && (ready.firstKey < pending.firstKey)) {
-					val (key, value) = ready.first
-					batch.append(value)
-					ready -= key
-				}
-			}
-			
-			// Send batch to executor
-			if (!batch.isEmpty) {
-				Hyflow.granolaExecutor ! batch.toList
-			}
-	}
-}
-

hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaServer.scala

 	
 	// Actors
 	var granolaExecutor: ActorRef = _
-	var granolaScheduler: ActorRef = _
+	var granolaDispatcher: ActorRef = _
 	var stableLog: ActorRef = _
 	
 	// Repo state
 	
 	def init_server() {
 		granolaExecutor = system.actorOf(Props[GranolaExecutor], name = "executor")
-		granolaScheduler = system.actorOf(Props[GranolaScheduler], name = "scheduler")
+		granolaDispatcher = system.actorOf(Props[GranolaDispatcher], name = "scheduler")
 		stableLog = system.actorOf(Props[GranolaStableLog], name = "stablelog")
 	}
 	

hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaStableLog.scala

 
 import scala.collection.mutable
 
-case class LogDone(txnId: Long)
-
 class GranolaStableLog extends Actor with Logging {
 	
 	val queue = mutable.ListBuffer[RequestRecord]()

hyflow-granola/src/main/scala/org/hyflow/granola/server/ServerMessages.scala

+package org.hyflow.granola.server
+
+import org.hyflow.granola.common._
+import akka.actor._
+
+// Transaction type enumeration
+object TxnType extends Enumeration {
+	type T = Value
+	val Single, Indep, Coord = Value
+}
+
+object TxnStatus extends Enumeration {
+	type T = Value
+	val Commit, Abort, Conflict = Value
+}
+
+// Messages
+case class RequestRecord(req: InvocationRequest, ts: Long, client: ActorRef) {
+	val requestType = req match {
+		case m: InvokeSingleReq => TxnType.Single
+		case m: InvokeIndepReq => TxnType.Indep
+		case m: InvokeCoordReq => TxnType.Coord
+	}
+}
+
+// stable log
+case class LogDone(txnId: Long)
+
+// Voting
+case class IndepVote(txnId: Long, rid: Long, proposedTS: Long, status: TxnStatus.T, retry: Boolean)
+

hyflow-granola/src/main/scala/org/hyflow/test/Actor_Test.scala

+package org.hyflow.test
+
+import akka.actor._
+import akka.routing._
+import akka.pattern.ask
+import akka.util.duration._
+import akka.dispatch.Await
+
+object Actor_Test {
+	val system = ActorSystem("test")
+	val router = system.actorOf(Props[Actor_Test])//.withRouter(
+		//RoundRobinRouter(nrOfInstances = 4)))
+	system.actorOf(Props[Actor_Test])
+	system.actorOf(Props[Actor_Test])
+	system.actorOf(Props[Actor_Test])
+		
+	val started = System.currentTimeMillis
+	for (i <- 0 to 5000000) {
+		router ! 7
+	}
+	val res_f = ask(router, 8)(30 seconds)
+	Await.ready(res_f, 30 seconds)
+	val elapsed = System.currentTimeMillis - started
+	val ktps = 5000002.0 / elapsed
+	println(ktps)
+	system.shutdown()
+}
+
+class Actor_Test extends Actor {
+	var cnt = 0L
+	def receive() = {
+		case m: Int =>
+			cnt += 1
+			sender ! true
+	}
+}

hyflow-granola/src/main/scala/org/hyflow/util/ClusterManager.scala

+package org.hyflow.util
+
+import org.hyflow.Hyflow
+import org.hyflow.granola.common._
+
+import akka.actor._
+
+case class ClusterHello(repoId: Long, entry: RepoEntryPoints)
+case class ClusterComplete(cluster: Map[Long, RepoEntryPoints])
+
+class ClusterManager extends Actor {
+
+	var numRepos = 1
+	var cluster = Map[Long, RepoEntryPoints]()
+
+	def receive() = {
+		case m: ClusterHello =>
+
+			cluster += ((m.repoId, m.entry))
+			numRepos += 1
+
+			if (numRepos == 2) { // TODO: configurable
+				cluster += ((0, RepoEntryPoints()))
+				 for ( (rid, entry) <- cluster) {
+					 if (rid != Hyflow.repoState.repoId) {
+						 entry.clusterManager ! cluster
+					 }
+				 }
+			}
+		case m: ClusterComplete =>
+			Hyflow.cluster = m.cluster
+			// TODO: start benchmark?
+	}
+}