Commits

Alex Turcu committed acfd2f7

Progress towards Granola protocol.

  • Participants
  • Parent commits 44f7123
  • Branches granola

Comments (0)

Files changed (17)

File etc/application.conf

 		serializers {
 			java = "akka.serialization.JavaSerializer"
 			proto = "akka.serialization.ProtobufSerializer"
-			kryo = "akka.serialization.KryoSerializer"
+#			kryo = "akka.serialization.KryoSerializer"
 		}
 		serialization-bindings {
 			"java.lang.String" = java
 			"com.google.protobuf.Message" = proto
-			"org.hyflow.api.Message" = kryo
+#			"org.hyflow.api.Message" = java
 			"java.io.Serializable" = java
 		}
 		default-dispatcher {

File hyflow-core/src/main/scala/org/hyflow/Hyflow.scala

 
 	// Hyflow service list
 	val services = mutable.ListBuffer[Service]()
-
+	
 	// Hyflow message payloads
 	var payloads = List[PayloadHandler]()
 
 			services.append(s)
 		}
 	}
-
+	
 	/**
 	 * Register a payload handler
 	 * TODO: extract this into a trait

File hyflow-granola/src/main/scala/org/hyflow/Hyflow.scala

 package org.hyflow
 
+import org.hyflow.granola._
+import org.hyflow.granola.client._
+import org.hyflow.granola.server._
+
 import akka.actor._
+import com.typesafe.config.{ Config, ConfigFactory }
 
-import org.hyflow.granola._
+object Hyflow extends GranolaServer with GranolaClient {
 
-object Hyflow {
-	def barrier(barrierName: String) { }
-	def done() {}
-	def init() {}
-	val peers = List()//
-	val mainActor = None
-	
-	
-	val system = ActorSystem("hyflow")
-	val exec = system.actorOf(Props[TxnExecutor], name="executor")
+	// Common types
+	type HyflowTxnReq = Tuple2[String, Array[Any]]
+	type HyflowBlock = (Array[Any]) => Any
+
+	// Initialize akka actor system
+	var system: ActorSystem = _
+
+	// Legacy
+	def barrier(barrierName: String) {}
+	def init() {
+		// Configuration
+		val akkaConfigFile = new java.io.File("etc/application.conf")
+		// Configure port
+		val akkaConf = ConfigFactory.parseFile(akkaConfigFile)
+		//val newPort = HyflowConfig.cfg[Int]("hyflow.basePort") + HyflowConfig.cfg[Int]("id")
+		//val hostname = HyflowConfig.cfg[String]("hyflow.hostname")
+		val confOverrides = ConfigFactory.parseString(
+			"akka.remote.netty.port = %d\n".format(40000) +
+				"akka.remote.netty.hostname = \"%s\"\n".format("127.0.0.1")
+		)
+		system = ActorSystem("hyflow", confOverrides.withFallback(akkaConf))
+		
+		init_server()
+		
+		repos(0) = granolaScheduler
+	}
 }

File hyflow-granola/src/main/scala/org/hyflow/benchmarks/BenchApp.scala

 
 object BenchApp extends App with Logging {
 	
-	log.info("Starting Hyflow benchmark.")
-	var bench: Benchmark = new GranolaBaseBenchmark()
-	
-	Hyflow.init()
-	bench.runBenchmark()
-	Hyflow.system.shutdown()
+	def benchmark() {
+		log.info("Starting Hyflow benchmark.")
+		var bench: Benchmark = new GranolaBaseBenchmark()
+		
+		Hyflow.init()
+		bench.runBenchmark()
+		Hyflow.system.shutdown()
+	}
+
+	benchmark()
 }

File hyflow-granola/src/main/scala/org/hyflow/benchmarks/Benchmark.scala

 package org.hyflow.benchmarks
 
-
 import org.hyflow._
+import org.hyflow.granola.common._
 
 import scala.util.Random
 import scala.collection.mutable
 
 import org.eintr.loglady.Logging
 
-
-
 object BenchCfg extends util.ConfigObj("hyflow.benchmarks") {
-	val clientThreads = d("clientThreads", 1)
-	val roundTime = d("roundTime", 5)
-	val rounds = d("rounds", 10)
+	val clientThreads = d("clientThreads", 4)
+	val roundTime = d("roundTime", 1)
+	val rounds = d("rounds", 20)
 	val readOnlyRatio = d("readOnlyRatio", 50.0)
 }
 
 
 		// Create client threads
 		val node = 0
-		val threads = for (i <- 0 until BenchCfg.clientThreads) 
+		val threads = for (i <- 0 until BenchCfg.clientThreads)
 			yield new ClientThread(node, i)
 
 		// Wait for all nodes to reach this point
 	}
 
 	private class ClientThread(node: Int, id: Int) extends Thread("bench-n%d-t%d".format(node, id)) with Logging {
-		var opsPerformed = 0
+		var ops = 0
 		val rand: Random = new Random(##)
 
 		def benchRound() = {
 			val startTime = System.currentTimeMillis
-			val estimatedEndTime = startTime + BenchCfg.roundTime * 1000 
+			val estimatedEndTime = startTime + BenchCfg.roundTime * 1000
 
 			var j = 0
 
 			var stop = false
 			while (!stop) {
-				val iter_fu = ask(Hyflow.exec, Benchmark.this)(5 seconds)
-				Await.ready(iter_fu, 5 seconds)
-				
+				//val iter_fu = ask(Hyflow.exec, Benchmark.this)(5 seconds)
+				//Await.ready(iter_fu, 5 seconds)
+
+				benchIter()
+				ops += 1
+
 				// Check the time every once in a while
 				j += 1
 				if (j == 1000) {
 			for (i <- 1 to BenchCfg.rounds) {
 				val elapsed = benchRound()
 				if (id == 0) {
-					//log.info("Round %d: %.2fk tps", i,  1.0 * opsPerformed / elapsed )
-					val ktps_fu = ask(Hyflow.exec, true)(1 second)
+					//log.info("Round %d: %.2fk tps", i,  1.0 * ops / elapsed )
+					//ops = 0
+
+					val ktps_fu = ask(Hyflow.granolaExecutor, QueryThroughput())(1 second)
 					val ktps = Await.result(ktps_fu, 1 second)
-					
+
 					log.info("Round %d: %s ktps", i, ktps)
 				}
 				Hyflow.barrier("end_round")

File hyflow-granola/src/main/scala/org/hyflow/benchmarks/granolabase/GranolaBaseBenchmark.scala

 package org.hyflow.benchmarks.granolabase
 
+import org.hyflow.Hyflow
+
 import org.eintr.loglady.Logging
 import org.hyflow.benchmarks._
+
 import scala.collection.mutable
 
 object BaseBenchCfg {
-	val NUM_COUNTERS = 10000
+	val NUM_COUNTERS = 1000000
 }
 
 class GranCtr(val _id: String) {
 	//val sLinked = new mutable.LinkedHashMap[String,GranCtr]()
 	val jHash = new java.util.concurrent.ConcurrentHashMap[String, GranCtr]()
 	//val jSkip = new java.util.concurrent.ConcurrentSkipListMap[String, GranCtr]()
+	//var sTrie = Map[String,GranCtr]()
 	
 	
 	def benchInit() {
+		// Register server-side transactions
+		Hyflow.registerAtomic("transfer") { args =>
+			val id1 = scala.util.Random.nextInt(BaseBenchCfg.NUM_COUNTERS)
+			val id2 = scala.util.Random.nextInt(BaseBenchCfg.NUM_COUNTERS)
+			val ctr1 = jHash.get("ctr"+id1)
+			val ctr2 = jHash.get("ctr"+id2)
+			ctr1.ctr += 1
+			ctr2.ctr -= 1
+		}
+		
+		Hyflow.registerAtomic("sum-all") { args =>
+			var sum = 0
+			for (i <- 0 until BaseBenchCfg.NUM_COUNTERS) {
+				val ctr1 = jHash.get("ctr"+i)
+				sum += ctr1.ctr
+			}
+			sum
+		}
+		
 		// Create bank accounts
 		for (i <- 0 until BaseBenchCfg.NUM_COUNTERS) {
 			val o = new GranCtr("ctr"+i)
 			//sOpenH.put(o._id, o)
 			//sLinked.put(o._id, o)
 			jHash.put(o._id, o)
+			//sTrie = sTrie + (o._id -> o)
 			//jSkip.put(o._id, o)
 		}
 	}
 	
 	def benchIter() {
 		if (true) {
-			val id1 = scala.util.Random.nextInt(BaseBenchCfg.NUM_COUNTERS)
-			val id2 = scala.util.Random.nextInt(BaseBenchCfg.NUM_COUNTERS)
-			val ctr1 = jHash.get("ctr"+id1)
-			val ctr2 = jHash.get("ctr"+id2)
-			ctr1.ctr += 1
-			ctr2.ctr -= 1
+			Hyflow.invokeSingle(0, ("transfer", null), false)
 		} else {
-			var sum = 0
-			for (i <- 0 until BaseBenchCfg.NUM_COUNTERS) {
-				val ctr1 = jHash.get("ctr"+i)
-				sum += ctr1.ctr
-			}
+			Hyflow.invokeSingle(0, ("read-all", null), true)
 		}
 	}
 	

File hyflow-granola/src/main/scala/org/hyflow/granola/TxnExecutor.scala

-package org.hyflow.granola
-
-import org.hyflow.benchmarks._
-import akka.actor._
-
-class TxnExecutor extends Actor {
-	
-	var ops = 0L
-	var startTime = System.currentTimeMillis
-	
-	def receive() = {
-		
-		case m: Benchmark =>
-			m.benchIter()
-			sender.tell(true)
-			ops += 1
-			
-		case m: Boolean =>
-			val endTime = System.currentTimeMillis
-			val ktps = 1.0 * ops / (endTime - startTime)
-			startTime = endTime
-			ops = 0
-			sender.tell(ktps)
-	}
-}

File hyflow-granola/src/main/scala/org/hyflow/granola/client/GranolaClient.scala

+package org.hyflow.granola.client
+
+import org.hyflow.granola.common._
+
+import org.eintr.loglady.Logging
+
+import akka.actor._
+import akka.dispatch.{ Dispatchers, Future, Await }
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.util.duration._
+
+import scala.collection.mutable
+
+import java.util.concurrent.atomic._
+
+
+trait GranolaClient extends Logging {
+	import org.hyflow.Hyflow.{HyflowTxnReq, system}
+	
+	val repos = mutable.Map[Int, ActorRef]()
+	
+	private val cclk = new AtomicLong()
+
+	private def getTxnId(): Int = {
+		scala.util.Random.nextInt()
+		//throw new AbstractMethodError()
+	}
+	
+	private def updateClock(sclk: Long) {
+		while (true) {
+			val prev = cclk.get
+			if (sclk > prev) {
+				if (cclk.compareAndSet(prev, sclk))
+					return
+			} else 
+				return
+		}
+	}
+
+	def invokeSingle(rid: Int, txnReq: HyflowTxnReq, readOnly: Boolean): Any = {
+		val req = InvokeSingleReq(txnReq, readOnly, getTxnId(), cclk.get)
+		val req_fu = ask(repos(rid), req)(5 seconds)
+		val reply = Await.result(req_fu, 5 seconds).asInstanceOf[InvocationReply]
+		updateClock(reply.sclk)
+		reply.result
+	}
+
+	private def invoke_helper(
+		ridTxnReqs: Seq[Tuple2[Int, HyflowTxnReq]],
+		makeReq: (HyflowTxnReq, Seq[Int], Long, Long) => InvocationRequest): Seq[Any] = {
+
+		val repoIds = ridTxnReqs.map(_._1)
+		val txnId = getTxnId()
+		val cclk_ = cclk.get
+
+		val reqs_fu =
+			for ((rid, txnReq) <- ridTxnReqs) yield {
+				val req = makeReq(txnReq, repoIds, txnId, cclk_)
+				ask(repos(rid), req)(5 seconds)
+			}
+
+		implicit val executor = system.dispatcher
+		val seqRes_fu = Future.sequence(reqs_fu)
+		val replies = Await.result(seqRes_fu, 5 seconds).asInstanceOf[Seq[InvocationReply]]
+		replies.map(_.result)
+	}
+
+	def invokeIndep(ridTxnReqs: Seq[Tuple2[Int, HyflowTxnReq]], readOnly: Boolean): Seq[Any] = {
+		invoke_helper(ridTxnReqs, InvokeIndepReq(_, readOnly, _, _, _))
+	}
+
+	def invokeCoord(ridTxnReqs: Seq[Tuple2[Int, HyflowTxnReq]]): Seq[Any] = {
+		invoke_helper(ridTxnReqs, InvokeCoordReq(_, _, _, _))
+	}
+}

File hyflow-granola/src/main/scala/org/hyflow/granola/common/InvocationReqs.scala

+package org.hyflow.granola.common
+
+import org.hyflow.Hyflow.HyflowTxnReq
+
+sealed abstract class InvocationRequest {
+	def txnReq: HyflowTxnReq
+	def txnId: Long
+	def cclk: Long
+}
+
+case class InvokeSingleReq(
+		txnReq: HyflowTxnReq, 
+		readOnly: Boolean, 
+		txnId: Long,
+		cclk: Long
+	) extends InvocationRequest
+
+case class InvokeIndepReq(
+		txnReq: HyflowTxnReq, 
+		readOnly: Boolean, 
+		repoIds: Seq[Int],
+		txnId: Long, 
+		cclk: Long
+	) extends InvocationRequest 
+
+case class InvokeCoordReq(
+		txnReq: HyflowTxnReq, 
+		repoIds: Seq[Int],
+		txnId: Long, 
+		cclk: Long
+	) extends InvocationRequest
+
+case class InvocationReply(
+		txnId: Long, // unneeded, Akka tracks senders, present for completeness
+		result: Any,
+		sclk: Long)

File hyflow-granola/src/main/scala/org/hyflow/granola/common/MiscReqs.scala

+package org.hyflow.granola.common
+
+case class QueryThroughput()

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaExecutor.scala

+package org.hyflow.granola.server
+
+import org.hyflow.Hyflow
+import org.hyflow.granola.common._
+
+import org.eintr.loglady.Logging
+import akka.actor._
+
+class GranolaExecutor extends Actor with Logging {
+
+	var roundOps = 0L
+	var totalOps = 0L
+	var startTime = System.currentTimeMillis
+
+	def receive() = {
+
+		case batch: List[RequestRecord] =>
+			for (req <- batch) {
+				// Update repository timestamp
+				Hyflow.repoState.lastExecTS.set(req.ts)
+				// Locate
+				val block = Hyflow.atomicBlocks(req.req.txnReq._1)
+				// Execute
+				val res = block(req.req.txnReq._2)
+				// Reply
+				req.client.tell(InvocationReply(req.req.txnId, res, req.ts))
+				// Stats
+				roundOps += 1
+			}
+
+		case m: QueryThroughput =>
+			val endTime = System.currentTimeMillis
+			val ktps = 1.0 * roundOps / (endTime - startTime)
+			startTime = endTime
+			totalOps += roundOps
+			roundOps = 0
+			sender.tell(ktps)
+	}
+}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaProtocolWorker.scala

+package org.hyflow.granola.server
+
+import org.hyflow.Hyflow
+import org.hyflow.granola.common._
+import org.eintr.loglady.Logging
+import akka.actor._
+
+// Workers
+class GranolaProtocolWorker extends Actor with Logging {
+
+	var record: RequestRecord = _
+	var reqType: Int = _
+	var ts_tid: Tuple2[Long, Long] = _
+
+	def receive() = {
+		// Granola Protocol for handling single-repository transactions
+		// granola-thesis.pdf, page 49
+		case m: RequestRecord =>
+			log.trace("Received request. | m = %s | sender = %s", m, sender)
+
+			record = m
+			reqType = record.req match {
+				case m: InvokeSingleReq => 0
+				case m: InvokeIndepReq => 1
+				case m: InvokeCoordReq => 2
+			}
+			ts_tid = (record.ts, record.req.txnId)
+
+			// Step 2: stable log write
+			// TODO: send to all replicas of this repository
+			//Hyflow.stableLog ! record
+			context.actorFor("/user/stablelog") ! record
+
+		case m: LogDone =>
+			log.trace("Stable log done. | ts_tid = %s | sender = %s", ts_tid, sender)
+			// Step 3: Ready to execute, send back to scheduler
+			Hyflow.granolaScheduler ! ts_tid
+	}
+}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaRepository.scala

+package org.hyflow.granola.server
+
+import java.util.concurrent.atomic._
+
+class RepositoryState {
+
+	var repoId: Long = _
+	var mode = new AtomicInteger() // 0 for timestamp, 1 for locking
+	var lastExecTS = new AtomicLong()
+}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaScheduler.scala

+package org.hyflow.granola.server
+
+import org.hyflow.Hyflow
+import org.hyflow.granola.common._
+
+import org.eintr.loglady.Logging
+import akka.actor._
+
+import scala.collection.SortedMap
+import scala.collection.mutable.ListBuffer
+
+// Messages
+case class RequestRecord(req: InvocationRequest, ts: Long, client: ActorRef)
+
+// Pool
+class GranolaScheduler extends Actor with Logging {
+
+	var workers = List[ActorRef]()
+	var pending = SortedMap[Tuple2[Long, Long], RequestRecord]()
+	var ready = SortedMap[Tuple2[Long, Long], RequestRecord]()
+
+	def receive() = {
+
+		case req: InvocationRequest =>
+			log.trace("Received request. | req = %s", req)
+			// Step 1 of the protocol: select time-stamp 
+			// (this is only one example of how TS can be selected)
+			val ts = scala.math.max(req.cclk, Hyflow.repoState.lastExecTS.get) + 1
+
+			// Record pending transaction
+			val ts_tid = (ts, req.txnId)
+			val reqRec = RequestRecord(req, ts, sender)
+			pending += ((ts_tid, reqRec))
+
+			// Manage a worker pool, forward requests to other actors
+			if (workers.isEmpty) {
+				// Create a new actor to handle this request
+				val worker0 = context.actorOf(Props[GranolaProtocolWorker], name = "worker"+scala.util.Random.nextInt)
+				worker0.forward(reqRec)
+			} else {
+				// Take one from pool
+				workers.head.forward(reqRec)
+				workers = workers.tail
+			}
+
+		// A transaction has become ready
+		case key: Tuple2[Long, Long] =>
+			log.trace("Transaction is ready. | key = %s", key)
+			// Return worker to pool
+			workers ::= sender
+			
+			// Update state
+			val ready0 = pending(key)
+			pending -= key
+			ready += ((key, ready0)) // TODO: try to avoid this insertion if possible
+
+			// See if we can execute any transactions
+			// TODO: optimize
+			// TODO: test throughput by sending individual vs batches
+			var batch = ListBuffer[RequestRecord]()
+			if (pending.isEmpty) {
+				// If no transactions are pending, we can execute all ready transactions
+				batch.appendAll(ready.values)
+				ready = SortedMap()
+			} else {
+				// Pay attention not to call firstKey on an empty map
+				import scala.math.Ordering.Implicits._
+				while (!ready.isEmpty && (ready.firstKey < pending.firstKey)) {
+					val (key, value) = ready.first
+					batch.append(value)
+					ready -= key
+				}
+			}
+			
+			// Send batch to executor
+			if (!batch.isEmpty) {
+				Hyflow.granolaExecutor ! batch.toList
+			}
+	}
+}
+

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaServer.scala

+package org.hyflow.granola.server
+
+import akka.actor._
+
+// FROM THE GRANOLA PAPER: SERVER INTERFACE
+/*
+* Independent Interface
+*/
+// executes transaction to completion
+// returns false if blocked by coord trans, true otherwise
+//boolean run(ByteBuffer op, ByteBuffer result);
+/*
+* Coordinated Interface
+*/
+// runs to commit point and acquires locks
+// returns COMMIT if voting commit, CONFLICT if aborting
+// due to a lock conflict, and ABORT if aborting due to
+// application logic
+// result is non-empty only if returning ABORT
+//Status prepare(ByteBuffer op, long tid, ByteBuffer result);
+// commits trans with given TID, releases locks
+//void commit(long tid, ByteBuffer result);
+// aborts trans with given TID, releases locks
+//void abort(long tid);
+/*
+* Recovery Interface
+*/
+// acquires any locks that could be required if preparing
+// the trans at any point in the serial order
+// acquires additional handle on locks if there's a conflict
+// returns true if no conflict, but acquires locks regardess
+//boolean forcePrepare(ByteBuffer request, long tid);
+
+
+trait GranolaServer {
+	import org.hyflow.Hyflow.{system, HyflowBlock}
+	
+	// Actors
+	var granolaExecutor: ActorRef = _
+	var granolaScheduler: ActorRef = _
+	var stableLog: ActorRef = _
+	
+	// Repo state
+	val repoState = new RepositoryState()
+	
+	// Hyflow atomic list
+	var atomicBlocks = Map[String, HyflowBlock]()
+	
+	def init_server() {
+		granolaExecutor = system.actorOf(Props[GranolaExecutor], name = "executor")
+		granolaScheduler = system.actorOf(Props[GranolaScheduler], name = "scheduler")
+		stableLog = system.actorOf(Props[GranolaStableLog], name = "stablelog")
+	}
+	
+	def registerAtomic(name: String)(block: HyflowBlock) {
+		synchronized {
+			atomicBlocks += ((name, block))
+		}
+	}
+	
+	
+	
+}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaStableLog.scala

+package org.hyflow.granola.server
+
+import org.hyflow.Hyflow
+import org.hyflow.granola.common._
+
+import org.eintr.loglady.Logging
+import akka.actor._
+
+import scala.collection.mutable
+
+case class LogDone(txnId: Long)
+
+class GranolaStableLog extends Actor with Logging {
+	
+	val queue = mutable.ListBuffer[RequestRecord]()
+	
+	def receive() = {
+		// Granola replication
+		case m: RequestRecord =>
+			log.trace("Recv stable log req. | m = %s | sender = %s", m, sender)
+			queue.append(m)
+			sender.tell(LogDone(m.req.txnId))
+	}
+}

File hyflow-granola/src/main/scala/org/hyflow/test/CAS_Test.scala

+package org.hyflow.test
+
+import java.util.concurrent.atomic._
+
+class CAS_Thread(id: Int) extends Thread {
+
+	override def run() {
+		import CAS_Test._
+		if (id == 0)
+			ready0 = true
+		else if (id == 1)
+			ready1 = true
+		else if (id == 3)
+			ready3 = true
+		else
+			ready4 = true
+
+		println("entered thread", id)
+		while (!ready0 || !ready1 || !ready3 || !ready4) {}
+
+		println("started thread", id)
+
+		val started = System.currentTimeMillis
+		var j = 0
+		var stop = false
+
+		while (!stop) {
+			ctr.incrementAndGet()
+
+			j += 1
+			if (j == 10000) {
+				if (System.currentTimeMillis > (started + 1000)) {
+					stop = true
+					println("stopping thread", id)
+				}
+				j = 0
+			}
+		}
+
+	}
+}
+
+object CAS_Test {
+	@volatile var ready0 = false
+	@volatile var ready1 = false
+	@volatile var ready3 = false
+	@volatile var ready4 = false
+
+	val ctr = new AtomicLong()
+
+	val t0 = new CAS_Thread(0)
+	val t1 = new CAS_Thread(1)
+	val t3 = new CAS_Thread(3)
+	val t4 = new CAS_Thread(4)
+
+	println("starting threads")
+	t0.start()
+	t1.start()
+	t3.start()
+	t4.start()
+
+	println("joining threads")
+
+	t0.join()
+	t1.join()
+	t3.join()
+	t4.join()
+
+	println(ctr.get)
+}