1. Alex Turcu
  2. reflow

Commits

Alex Turcu  committed 6e63556

Updated server architecture slightly.

  • Participants
  • Parent commits e6f97f1
  • Branches granola

Comments (0)

Files changed (13)

File hyflow-granola/src/main/scala/org/hyflow/Hyflow.scala

View file
 object Hyflow extends GranolaServer with GranolaClient {
 
 	// Common types
-	type HyflowTxnReq = Tuple2[String, Array[Any]]
+	type HyflowTxnCmd = Tuple2[String, Array[Any]]
 	type HyflowBlock = (Array[Any]) => Any
 
 	// Initialize akka actor system

File hyflow-granola/src/main/scala/org/hyflow/granola/client/GranolaClient.scala

View file
 
 
 trait GranolaClient extends Logging {
-	import org.hyflow.Hyflow.{HyflowTxnReq, system}
+	import org.hyflow.Hyflow.{HyflowTxnCmd, system}
 	
 	// TODO: May need to make this an atomic reference??? 
 	var cluster = Map[Long, RepoInfo]()
 		}
 	}
 
-	def invokeSingle(rid: Int, txnReq: HyflowTxnReq, readOnly: Boolean): Any = {
-		val req = InvokeSingleReq(txnReq, readOnly, getTxnId(), cclk.get)
+	def invokeSingle(rid: Int, txnCmd: HyflowTxnCmd, readOnly: Boolean): Any = {
+		val req = TxnReq_Single(txnCmd, readOnly, getTxnId(), cclk.get)
 		val req_fu = ask(cluster(rid).entry.dispatcher, req)(5 seconds)
-		val reply = Await.result(req_fu, 5 seconds).asInstanceOf[InvocationReply]
+		val reply = Await.result(req_fu, 5 seconds).asInstanceOf[TxnReply]
 		updateClock(reply.sclk)
 		reply.result
 	}
 
 	private def invoke_helper(
-		ridTxnReqs: Seq[Tuple2[Int, HyflowTxnReq]],
-		makeReq: (HyflowTxnReq, Seq[Int], Long, Long) => InvocationRequest): Seq[Any] = {
+		ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]],
+		makeReq: (HyflowTxnCmd, Seq[Int], Long, Long) => TxnReq): Seq[Any] = {
 
-		val repoIds = ridTxnReqs.map(_._1)
+		val repoIds = ridTxnCmds.map(_._1)
 		val txnId = getTxnId()
 		val cclk_ = cclk.get
 
 		val reqs_fu =
-			for ((rid, txnReq) <- ridTxnReqs) yield {
-				val req = makeReq(txnReq, repoIds, txnId, cclk_)
+			for ((rid, txnCmd) <- ridTxnCmds) yield {
+				val req = makeReq(txnCmd, repoIds, txnId, cclk_)
 				ask(cluster(rid).entry.dispatcher, req)(5 seconds)
 			}
 
 		implicit val executor = system.dispatcher
 		val seqRes_fu = Future.sequence(reqs_fu)
-		val replies = Await.result(seqRes_fu, 5 seconds).asInstanceOf[Seq[InvocationReply]]
+		val replies = Await.result(seqRes_fu, 5 seconds).asInstanceOf[Seq[TxnReply]]
 		replies.map(_.result)
 	}
 
-	def invokeIndep(ridTxnReqs: Seq[Tuple2[Int, HyflowTxnReq]], readOnly: Boolean): Seq[Any] = {
-		invoke_helper(ridTxnReqs, InvokeIndepReq(_, readOnly, _, _, _))
+	def invokeIndep(ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]], readOnly: Boolean): Seq[Any] = {
+		invoke_helper(ridTxnCmds, TxnReq_Indep(_, readOnly, _, _, _))
 	}
 
-	def invokeCoord(ridTxnReqs: Seq[Tuple2[Int, HyflowTxnReq]]): Seq[Any] = {
-		invoke_helper(ridTxnReqs, InvokeCoordReq(_, _, _, _))
+	def invokeCoord(ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]]): Seq[Any] = {
+		invoke_helper(ridTxnCmds, TxnReq_Coord(_, _, _, _))
 	}
 }

File hyflow-granola/src/main/scala/org/hyflow/granola/common/InvocationReqs.scala

-package org.hyflow.granola.common
-
-import org.hyflow.Hyflow.HyflowTxnReq
-
-sealed abstract class InvocationRequest {
-	def txnReq: HyflowTxnReq
-	def txnId: Long
-	def cclk: Long
-}
-
-case class InvokeSingleReq(
-		txnReq: HyflowTxnReq, 
-		readOnly: Boolean, 
-		txnId: Long,
-		cclk: Long
-	) extends InvocationRequest
-
-case class InvokeIndepReq(
-		txnReq: HyflowTxnReq, 
-		readOnly: Boolean, 
-		repoIds: Seq[Int],
-		txnId: Long, 
-		cclk: Long
-	) extends InvocationRequest 
-
-case class InvokeCoordReq(
-		txnReq: HyflowTxnReq, 
-		repoIds: Seq[Int],
-		txnId: Long, 
-		cclk: Long
-	) extends InvocationRequest
-
-case class InvocationReply(
-		txnId: Long, // unneeded, Akka tracks senders, present for completeness
-		result: Any,
-		sclk: Long)

File hyflow-granola/src/main/scala/org/hyflow/granola/common/MiscReqs.scala

-package org.hyflow.granola.common
-
-import org.hyflow.Hyflow
-import akka.actor._
-
-case class QueryThroughput()
-
-object NodeEntryPoints {
-	def apply() = {
-		// Returns entry point for local node
-		val res = new NodeEntryPoints(Hyflow.repoState.repoId)
-		res.dispatcher = Hyflow.granolaDispatcher
-		res.clusterManager = Hyflow.clusterManager
-		res.stableLog = Hyflow.stableLog
-		res.barrierManager = Hyflow.barrierManager
-		res
-	}
-}
-
-// provide entry-points for one node for remote messaging
-case class NodeEntryPoints(rid: Long) {
-	var dispatcher: ActorRef = _
-	var clusterManager: ActorRef = _
-	var stableLog: ActorRef = _
-	var barrierManager: ActorRef = _
-}
-
-case class RepoInfo(rid: Long, entry: NodeEntryPoints, replicas: Set[NodeEntryPoints])

File hyflow-granola/src/main/scala/org/hyflow/granola/common/RepoInfo.scala

View file
+package org.hyflow.granola.common
+
+import org.hyflow.Hyflow
+import akka.actor._
+
+case class QueryThroughput()
+
+object NodeEntryPoints {
+	def apply() = {
+		// Returns entry point for local node
+		val res = new NodeEntryPoints(Hyflow.repoState.repoId)
+		res.dispatcher = Hyflow.granolaDispatcher
+		res.clusterManager = Hyflow.clusterManager
+		res.stableLog = Hyflow.stableLog
+		res.barrierManager = Hyflow.barrierManager
+		res.voteManager = Hyflow.voteMgr
+		res
+	}
+}
+
+// provide entry-points for one node for remote messaging
+case class NodeEntryPoints(rid: Long) {
+	var dispatcher: ActorRef = _
+	var clusterManager: ActorRef = _
+	var stableLog: ActorRef = _
+	var barrierManager: ActorRef = _
+	var voteManager: ActorRef = _
+}
+
+case class RepoInfo(rid: Long, entry: NodeEntryPoints, replicas: Set[NodeEntryPoints])

File hyflow-granola/src/main/scala/org/hyflow/granola/common/TxnInfo.scala

View file
+package org.hyflow.granola.common
+
+import org.hyflow.Hyflow.HyflowTxnCmd
+
+// Transaction type enumeration
+object TxnType extends Enumeration {
+	type T = Value
+	val Single, Indep, Coord = Value
+}
+
+object TxnStatus extends Enumeration {
+	type T = Value
+	val Commit, Abort, Conflict = Value
+}
+
+
+sealed abstract class TxnReq {
+	def txnCmd: HyflowTxnCmd
+	def txnId: Long
+	def txnType: TxnType.T
+	def cclk: Long
+}
+
+// TODO: benchmark (de)ser encapsulation vs inheritance
+case class TxnReq_Single(
+		txnCmd: HyflowTxnCmd, 
+		readOnly: Boolean, 
+		txnId: Long,
+		cclk: Long
+	) extends TxnReq {
+	override val txnType = TxnType.Single
+}
+
+case class TxnReq_Indep(
+		txnCmd: HyflowTxnCmd, 
+		readOnly: Boolean, 
+		repoIds: Seq[Int],
+		txnId: Long, 
+		cclk: Long
+	) extends TxnReq {
+	override val txnType = TxnType.Indep
+}
+
+case class TxnReq_Coord(
+		txnCmd: HyflowTxnCmd, 
+		repoIds: Seq[Int],
+		txnId: Long, 
+		cclk: Long
+	) extends TxnReq {
+	override val txnType = TxnType.Coord
+}
+
+case class TxnReply(
+		txnId: Long, // unneeded, Akka tracks senders, present for completeness
+		result: Any,
+		sclk: Long)

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaDispatcher.scala

View file
 import scala.collection.SortedMap
 import scala.collection.mutable.ListBuffer
 
+case class TxnRecord(req: TxnReq, ts: Long, client: ActorRef)
 
 // Dispatcher (Worker pool + transaction dispatcher)
 class GranolaDispatcher extends Actor with Logging {
 
-	var workers = List[ActorRef]()
-	var pending = SortedMap[Tuple2[Long, Long], RequestRecord]()
-	var ready = SortedMap[Tuple2[Long, Long], RequestRecord]()
+	var pending = SortedMap[TS_TID, TxnRecord]()
+	var ready = SortedMap[TS_TID, TxnRecord]()
 
 	// See if we can execute any transactions
 	def processReadyTransactions() {
 		// TODO: optimize
 		// TODO: test throughput by sending individual vs batches
-		var batch = ListBuffer[RequestRecord]()
+		var batch = ListBuffer[TxnRecord]()
 		if (pending.isEmpty) {
 			// If no transactions are pending, we can execute all ready transactions
 			batch.appendAll(ready.values)
 		}
 	}
 
-	// Manage a worker pool, forward requests to protocol worker actors
-	def forwardToWorker(reqRec: RequestRecord) {
-		if (workers.isEmpty) {
-			// Create a new actor to handle this request
-			val worker0 = context.actorOf(Props[GranolaProtocolWorker], name = "worker" + scala.util.Random.nextInt)
-			worker0.forward(reqRec)
-		} else {
-			// Take one from pool
-			workers.head.forward(reqRec)
-			workers = workers.tail
-		}
-	}
-
 	def receive() = {
 		// Request from client
-		case req: InvocationRequest =>
+		case req: TxnReq =>
 			log.debug("Received request. | req = %s", req)
 
 			// Step 1 of the protocol: select time-stamp 
 			val ts = scala.math.max(req.cclk, Hyflow.repoState.lastExecTS.get) + 1
 
 			// Create transaction record
-			val ts_tid = (ts, req.txnId)
-			val reqRec = RequestRecord(req, ts, sender)
+			val tsKey = TS_TID(ts, req.txnId)
+			val record = TxnRecord(req, ts, sender)
 
 			// If it's a read-only, single-node transaction, it's ready right away
-			if (reqRec.requestType == TxnType.Single && req.asInstanceOf[InvokeSingleReq].readOnly) {
-				ready += ((ts_tid, reqRec))
+			if (req.txnType == TxnType.Single && req.asInstanceOf[TxnReq_Single].readOnly) {
+				ready += ((tsKey, record))
 				processReadyTransactions()
 			} else {
 				// Otherwise record pending transaction
-				pending += ((ts_tid, reqRec))
-				forwardToWorker(reqRec)
+				pending += ((tsKey, record))
+
+				// Step 2: stable log write
+				Hyflow.stableLog ! LogReq(req, ts)
 			}
 
-		// A transaction has become ready
-		case key: Tuple2[Long, Long] =>
-			log.debug("Transaction is ready. | key = %s", key)
-			// Return worker to pool
-			workers ::= sender
+		case m: LogAck =>
+			val key = m.tsKey
+			val record = pending(key)
+			record.req.txnType match {
+				case TxnType.Single =>
+					log.debug("SingleNode transaction is ready. | key = %s", key)
 
-			// Update state
-			val ready0 = pending(key)
-			pending -= key
-			ready += ((key, ready0)) // TODO: try to avoid this insertion if possible
+					// Step 3 of SingleNode: Ready to execute
+					// Update state
+					val ready0 = pending(key)
+					pending -= key
+					ready += ((key, ready0)) // TODO: try to avoid this insertion if possible
+					// Send to executor
+					processReadyTransactions()
+				case TxnType.Indep =>
+					log.debug("Stable log done for Independent transaction. | key = %s", key)
 
+					// Step 3 of Independent: Vote for final time-stamp
+					val req = record.req.asInstanceOf[TxnReq_Indep]
+					Hyflow.voteMgr ! IVoteReq(key, req.repoIds)
+			}
+
+		// Independent transaction is ready (TS voting process complete) 
+		case m: IVoteReply =>
+			val key0 = m.key
+			if (key0.ts == m.finalTS) {
+				// Transaction has the same timestamp
+				val ready0 = pending(key0)
+				pending -= key0
+				ready += ((key0, ready0)) // TODO: try to avoid this insertion if possible
+			} else {
+				// Transaction has different timestamp
+				val ready0 = pending(key0)
+				pending -= key0
+				// Use updated TS to insert into ready queue
+				val key1 = TS_TID(m.finalTS, key0.tid)
+				val ready1 = TxnRecord(ready0.req, m.finalTS, ready0.client) // TODO: clone or update?
+				ready += ((key1, ready1)) // TODO: try to avoid this insertion if possible
+			}
 			processReadyTransactions()
-			
-		// An independent transaction is ready with a different time-stamp
-		case m: Tuple2[Tuple2[Long,Long], Long] =>
-			val (key0, finalTS) = m
-			log.debug("Transaction is ready. | key0 = %s | finalTS = %d", key0, finalTS)
-			// Return worker to pool
-			workers ::= sender
-
-			// Update state
-			val ready0 = pending(key0)
-			pending -= key0
-			// Use updated TS to insert into ready queue
-			val key1 = (finalTS, key0._2)
-			val ready1 = RequestRecord(ready0.req, finalTS, ready0.client)
-			ready += ((key1, ready1)) // TODO: try to avoid this insertion if possible
-
-			processReadyTransactions()
-		
-		case m: IndepVote =>
-			val worker = WorkerPool.assignments.get(m.txnId)
-			if (worker == null) {
-				var voteq = WorkerPool.missedVotes.get(m.txnId)
-				if (voteq == null) {
-					import java.util.concurrent._
-					voteq = WorkerPool.missedVotes.putIfAbsent(m.txnId, new ConcurrentLinkedQueue)
-				}
-				voteq.add(m)
-			} else {
-				// TODO: is it better to just add all messages to the queue, 
-				// and only forward when done?
-				worker.forward(m)
-			}
 	}
 }
 

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaProtocolWorker.scala

-package org.hyflow.granola.server
-
-import org.hyflow.Hyflow
-import org.hyflow.granola.common._
-import org.eintr.loglady.Logging
-import akka.actor._
-import akka.routing._
-import akka.dispatch._
-import java.util.concurrent._
-
-object WorkerPool {
-	val assignments = new ConcurrentHashMap[Long, ActorRef]
-	val missedVotes = new ConcurrentHashMap[Long, ConcurrentLinkedQueue[IndepVote]]
-}
-
-// Workers
-// TODO: clean code up by using become for specific types of transactions
-class GranolaProtocolWorker extends Actor with Logging {
-
-	var record: RequestRecord = _
-	var reqType: Int = _
-	var ts_tid: Tuple2[Long, Long] = _
-	// Independent Transactions
-	var votes = 0
-	var finalTS = 0L
-
-	def receive() = {
-		// Granola Protocol for handling single-repository/independent transactions
-		// granola-thesis.pdf, page 49, 51
-		case m: RequestRecord =>
-			log.debug("Received request. | m = %s | sender = %s", m, sender)
-
-			record = m
-			val tid = record.req.txnId
-			val ts = record.ts
-
-			// TODO: we don't need this for single-node txns, maybe we can optimize it out
-			WorkerPool.assignments.put(tid, self)
-
-			ts_tid = (ts, tid)
-
-			// Step 2: stable log write
-			// TODO: send to all replicas of this repository
-			Hyflow.stableLog ! record
-
-		case m: LogDone if (record.requestType == TxnType.Single) =>
-			log.debug("Stable log done for SingleNode transaction. | ts_tid = %s", ts_tid)
-			// Step 3 of SingleNode: Ready to execute, send back to dispatcher
-			Hyflow.granolaDispatcher ! ts_tid
-			// Done, clear assignment
-			WorkerPool.assignments.remove(ts_tid._2)
-
-		case m: LogDone if (record.requestType == TxnType.Indep) =>
-			// Init
-			votes = 0
-			finalTS = record.ts
-			log.debug("Stable log done for Independent transaction. | ts_tid = %s", ts_tid)
-			// Step 3 of Independent: Vote for final time-stamp
-			val req = record.req.asInstanceOf[InvokeIndepReq]
-			val myrid = Hyflow.repoState.repoId
-
-			// Send own vote
-			for (rid <- req.repoIds) {
-				if (rid != myrid) {
-					Hyflow.cluster(rid).entry.dispatcher !
-						IndepVote(record.req.txnId, myrid, record.ts, TxnStatus.Commit, false)
-				}
-			}
-
-			// Process previously missed votes
-			val voteq = WorkerPool.missedVotes.get(ts_tid._2)
-			import scala.collection.JavaConversions._
-			for (vote <- voteq) {
-				processVote(vote, req)
-			}
-		case vote: IndepVote =>
-			processVote(vote, record.req.asInstanceOf[InvokeIndepReq])
-
-	}
-
-	def processVote(vote: IndepVote, req: InvokeIndepReq) {
-		votes += 1
-		if (vote.proposedTS > finalTS)
-			finalTS = vote.proposedTS
-		if (votes == req.repoIds.length - 1) {
-			// Received all votes
-			if (finalTS != record.ts) {
-				// Time-stamp has changed 
-				Hyflow.granolaDispatcher ! (ts_tid, finalTS)
-			} else {
-				// Same time-stamp as before
-				Hyflow.granolaDispatcher ! ts_tid
-			}
-			// Done, clear assignment
-			WorkerPool.assignments.remove(ts_tid._2)
-		}
-	}
-}
-
-class VoteRouter extends Actor {
-	def receive() = {
-		case m: IndepVote =>
-			
-	}
-}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaReplica.scala

View file
+package org.hyflow.granola.server
+
+
+import org.eintr.loglady.Logging
+
+import akka.actor._
+
+
+class GranolaReplica extends Actor with Logging {
+	
+	var pendingReplication: List[List[RequestRecord]] = Nil
+	
+	def receive() = {
+	// Master has requested replication
+		case m: ReplicationReq =>
+			// TODO: implement log
+			pendingReplication ::= m.batch
+			// Acknowledge request
+			sender ! ReplicationAck
+	}
+	
+}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaServer.scala

View file
 	var granolaExecutor: ActorRef = _
 	var granolaDispatcher: ActorRef = _
 	var stableLog: ActorRef = _
+	var voteMgr: ActorRef = _
 	
 	// Repo state
 	val repoState = new RepositoryState()
 	var atomicBlocks = Map[String, HyflowBlock]()
 	
 	def init_server() {
-		granolaExecutor = system.actorOf(Props[GranolaExecutor], name = "executor")
-		granolaDispatcher = system.actorOf(Props[GranolaDispatcher], name = "scheduler")
-		stableLog = system.actorOf(Props[GranolaStableLog], name = "stablelog")
+		granolaExecutor = system.actorOf(Props[GranolaExecutor], name = "exec")
+		granolaDispatcher = system.actorOf(Props[GranolaDispatcher], name = "sched")
+		stableLog = system.actorOf(Props[GranolaStableLog], name = "log")
+		voteMgr = system.actorOf(Props[GranolaVoteMgr], name = "voteMgr")
 	}
 	
 	def registerAtomic(name: String)(block: HyflowBlock) {

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaStableLog.scala

View file
 
 import scala.collection.mutable
 
-case class VSReplComplete
-case class VSReplRequest(batch: List[RequestRecord])
-
 // TODO: Implement viewstamp replication
 class GranolaStableLog extends Actor with Logging {
-	// MASTER SIDE
 	// Init with initial set of replicas for this repo
 	var replicas: Set[NodeEntryPoints] = _
 	var numReplicas = 0
 
 	// Holds batch of requests currently replicating
-	var crtLogBatch = List[Tuple2[RequestRecord, ActorRef]]()
+	var crtLogBatch = List[Tuple2[TxnRecord, ActorRef]]()
 	// Holds a buffer for the next batch of requests to replicate
-	val nextLogBatch = mutable.ListBuffer[Tuple2[RequestRecord, ActorRef]]()
+	val nextLogBatch = mutable.ListBuffer[Tuple2[TxnRecord, ActorRef]]()
 
 	// Keep track which replicas acknowledged current replication request 
 	var replies = mutable.ListBuffer[ActorRef]()
 	def replicate() {
 		// Notify transactions in current batch
 		for ((m, sndr) <- crtLogBatch) {
-			sndr ! LogDone(m.req.txnId)
+			sndr ! LogAck(m.req.txnId)
 		}
 
 		// Update with new batch
 
 		if (numReplicas != 0) {
 			// Send replication requests
-			val msg = VSReplRequest(crtLogBatch.map(_._1))
+			val msg = ReplicationReq(crtLogBatch.map(_._1))
 			for (replica <- replicas) {
 				replica.stableLog ! msg
 			}
 			immediate = false
 		} else {
 			// Nothing to do, we are the only replica in this repo
-			sender ! LogDone(crtLogBatch.head._1.req.txnId)
+			sender ! LogAck(crtLogBatch.head._1.req.txnId)
 			crtLogBatch = Nil
 			immediate = true
 		}
 	}
 
-	// REPLICA SIDE
-	val pendingReplication = mutable.ListBuffer[List[RequestRecord]]()
-
 	def receive() = {
 		case 'update_replicas =>
 			updateReplicas()
+		
 		// Transaction requests stable log
-		case m: RequestRecord =>
+		case m: TxnRecord =>
 			nextLogBatch.append((m, sender))
 
 			if (immediate) {
 			}
 
 		// All replicas are finished
-		case m: VSReplComplete if (expectingReplies == 1) =>
+		case m: ReplicationAck if (expectingReplies == 1) =>
 			replicate()
 
 		// One replica is finished, but not all
-		case m: VSReplComplete =>
+		case m: ReplicationAck =>
 			replies += sender
 			expectingReplies -= 1
-
-		// Master has requested replication
-		case m: VSReplRequest =>
-			// TODO: implement log
-			pendingReplication += m.batch
-			// Acknowledge request
-			sender ! VSReplComplete
 	}
 }

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaVoteMgr.scala

View file
+package org.hyflow.granola.server
+
+import org.hyflow.Hyflow
+import org.hyflow.granola.common._
+import org.eintr.loglady.Logging
+import akka.actor._
+
+// TODO: implement coordinated voting
+
+class GranolaVoteMgr extends Actor with Logging {
+	case class VoteRecord(var key: TS_TID, var maxTS: Long, var numVotes: Int, var expectedVotes: Int)
+
+	var votes = Map[Long, VoteRecord]()
+
+	def receive() = {
+		case req: IVoteReq =>
+			val key = req.key
+			val vote = IVoteMsg(key, myrid, TxnStatus.Commit, false)
+			votes.get(key.tid) match {
+				case Some(record) =>
+					// Record exists, update
+					record.expectedVotes = req.peers.length
+					record.key = key
+					processVote(vote, record)
+				case None =>
+					// This is first vote, create record
+					val record = VoteRecord(key, key.ts, 1, req.peers.length)
+					votes += (key.tid -> record)
+			}
+			// Send own vote to peers
+			val myrid = Hyflow.repoState.repoId
+			for (rid <- req.peers if rid != myrid) {
+				Hyflow.cluster(rid).entry.voteManager ! vote
+			}
+		
+		case vote: IVoteMsg =>
+			val key = vote.key
+			votes.get(key.tid) match {
+				case Some(record) =>
+					// Record exists, update
+					processVote(vote, record)
+				case None =>
+					// This is the first vote, create record
+					val record = VoteRecord(key, key.ts, 1, 0)
+					votes += (key.tid -> record)
+			}
+	}
+
+	def processVote(vote: IVoteMsg, record: VoteRecord) {
+		val key = vote.key
+		record.numVotes += 1
+		if (key.ts > record.maxTS)
+			record.maxTS = key.ts
+		if (record.numVotes == record.expectedVotes) {
+			// Received all votes, inform dispatcher
+			Hyflow.granolaDispatcher ! IVoteReply(key, record.maxTS)
+			// Remove record
+			votes -= key.tid
+		}
+	}
+}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/ServerMessages.scala

View file
 import org.hyflow.granola.common._
 import akka.actor._
 
-// Transaction type enumeration
-object TxnType extends Enumeration {
-	type T = Value
-	val Single, Indep, Coord = Value
-}
 
-object TxnStatus extends Enumeration {
-	type T = Value
-	val Commit, Abort, Conflict = Value
-}
-
-// Messages
-case class RequestRecord(req: InvocationRequest, ts: Long, client: ActorRef) {
-	val requestType = req match {
-		case m: InvokeSingleReq => TxnType.Single
-		case m: InvokeIndepReq => TxnType.Indep
-		case m: InvokeCoordReq => TxnType.Coord
+// Classes
+case class TS_TID(ts: Long, tid: Long) extends Ordered[TS_TID] {
+	def compare(that: TS_TID) = {
+		ts.compare(that.ts) match {
+			case 0 => tid.compare(that.tid)
+			case i => i
+		}
 	}
 }
 
 // stable log
-case class LogDone(txnId: Long)
+// TODO: do we need to send the whole request or just the command?
+case class LogReq(tsKey: TS_TID, req: TxnReq) 
+case class LogAck(tsKey: TS_TID)
+
+// log replication
+case class ReplicationReq(batch: List[TxnRecord])
+case class ReplicationAck()
 
 // Voting
-case class IndepVote(txnId: Long, rid: Long, proposedTS: Long, status: TxnStatus.T, retry: Boolean)
+case class IVoteReq(key: TS_TID, peers: Seq[Int])
+case class IVoteReply(key: TS_TID, finalTS: Long)
+case class IVoteMsg(key: TS_TID, rid: Long, status: TxnStatus.T, retry: Boolean)
 
+// execution
+case class ExecReq()