Commits

Alex Turcu  committed c765b7b

Updates to granola (serialization, benchmark engine). Incresed indep-txn performance by about 20x. Left todo: batching.

  • Participants
  • Parent commits 6e63556
  • Branches granola

Comments (0)

Files changed (17)

File etc/application.conf

 	loglevel = "INFO"
 	#stdout-loglevel = "DEBUG"
 	#log-config-on-start = on
+	#extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
 	actor {
 		provider = "akka.remote.RemoteActorRefProvider"
+		kryo  {  
+			type = "graph"
+			#idstrategy = "explicit"
+			serializer-pool-size = 16
+			buffer-size = 16384		#4096  
+			use-manifests = true
+			implicit-registration-logging = true 
+			kryo-trace = true
+			mappings {
+				"scala.collection.immutable.Map$Map2" = 20,
+				"scala.collection.immutable.Set$EmptySet$" = 21,
+				"scala.collection.immutable.$colon$colon" = 22,
+                "scala.Tuple2" = 23,
+                "scala.Enumeration$Val" = 24,
+                "scala.runtime.BoxedUnit" = 25,
+
+				"org.hyflow.util.ClusterHello" = 60,
+				"org.hyflow.granola.common.NodeEntryPoints" = 61,
+				"akka.actor.LocalActorRef" = 40,
+				"akka.remote.RemoteActorRef" = 41,
+
+				"org.hyflow.util.ClusterComplete" = 62,
+				"org.hyflow.granola.common.RepoInfo" = 63,
+				"org.hyflow.util.BarrierMsg" = 64,
+				"org.hyflow.util.BarrierRespMsg" = 65,
+                "org.hyflow.granola.common.TxnReq_Indep" = 66,
+                "org.hyflow.granola.server.IVoteMsg" = 67,
+                "org.hyflow.granola.server.TS_TID" = 68,
+                "org.hyflow.granola.common.TxnType$" = 69,
+                "org.hyflow.granola.common.TxnStatus$" = 70,
+                "org.hyflow.granola.common.TxnReply" = 71,
+
+
+			}
+		}
 		serialize-messages = off
 		serializers {
 			java = "akka.serialization.JavaSerializer"
 			proto = "akka.serialization.ProtobufSerializer"
-#			kryo = "akka.serialization.KryoSerializer"
+			#kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
+			kryo = "akka.serialization.KryoSerializer"
 		}
 		serialization-bindings {
-			"java.lang.String" = java
 			"com.google.protobuf.Message" = proto
-#			"org.hyflow.api.Message" = java
-			"java.io.Serializable" = java
+			"java.io.Serializable" = kryo
 		}
 		default-dispatcher {
 			type = "Dispatcher" #default

File etc/granola.conf

         repos = [0, 1]
     }
 }
+hyflow.benchmarks {
+	roundTime = 2
+	clientThreads = 128
+	rounds = 100
+	readOnlyRatio = 50
+}

File hyflow-core/src/main/scala/akka/serialization/KryoSerializer.scala

 package akka.serialization
 
 import com.typesafe.config.ConfigFactory
-import com.esotericsoftware.kryo._
-import de.javakaffee.kryoserializers._
-import com.esotericsoftware.kryo.serialize.SimpleSerializer
 import java.nio.ByteBuffer
 import akka.actor._
 import akka.routing._
 import akka.remote._
-import org.hyflow.api._
-import scala.collection.mutable
+import scala.collection.{ mutable, immutable }
+import org.eintr.loglady.Logging
+
+/*
+// Old Kryo 1.05
+import com.esotericsoftware.kryo._
+import de.javakaffee.kryoserializers._
+import com.esotericsoftware.kryo.serialize.SimpleSerializer
 
 private class ActorRefSerializer(val kryo: Kryo, val system: ExtendedActorSystem) extends SimpleSerializer[ActorRef] {
 	override def read(buffer: ByteBuffer): ActorRef = {
 	}
 }
 
+private class EnumerationSerializer(val kryo: Kryo) extends SimpleSerializer[Enumeration#Value] {
+
+	// Caching of parent enum types for value types
+	var valueClass2enumClass = immutable.Map[Class[_], Class[_]]()
+	// Cache enumeration values for a given enumeration class
+	var enumClass2enumValues = immutable.Map[Class[_], mutable.ArrayBuffer[Enumeration#Value]]()
+
+	private def cacheEnumValue(obj: Enumeration#Value) = {
+
+		val enumClass = valueClass2enumClass.get(obj.getClass) getOrElse {
+			val parentEnum = obj.asInstanceOf[AnyRef].getClass.getSuperclass.getDeclaredFields.find(f => f.getName == "$outer").get
+			val parentEnumObj = parentEnum.get(obj)
+			val enumClass = parentEnumObj.getClass
+			valueClass2enumClass += obj.getClass -> enumClass
+			val enumValues = enumClass2enumValues.get(enumClass) getOrElse {
+				val size = parentEnumObj.asInstanceOf[Enumeration].maxId + 1
+				val values = new mutable.ArrayBuffer[Enumeration#Value](size)
+				0 until size foreach { e => values += null }
+				enumClass2enumValues += enumClass -> values
+				values
+			}
+			enumClass
+		}
+
+		val enumValues = enumClass2enumValues.get(enumClass).get
+
+		if (enumValues(obj.id) == null) {
+			enumValues.update(obj.id, obj)
+		}
+
+		enumClass
+	}
+
+	override def write(buffer: ByteBuffer, obj: Enumeration#Value) {
+		println("Writing enum %s", obj)
+		val enumClass = cacheEnumValue(obj)
+		kryo.writeClass(buffer, enumClass)
+		kryo.writeObject(buffer, obj.id)
+	}
+
+	override def read(buffer: ByteBuffer): Enumeration#Value = {
+		val clazz = kryo.readClass(buffer).getType
+		val id = kryo.readObject(buffer, classOf[Int]) 
+
+		val enumValues = enumClass2enumValues.get(clazz).getOrElse {
+			cacheEnumValue(kryo.newInstance(clazz).asInstanceOf[Enumeration](id))
+			enumClass2enumValues.get(clazz).get
+		}
+
+		val enumInstance = enumValues(id)
+		println("Reading enum %s", enumInstance)
+		enumInstance
+	}
+}
+
 class KryoSerializer(val system: ExtendedActorSystem) extends Serializer {
 	val buffer = new ThreadLocal[ObjectBuffer] {
 		override def initialValue() = {
 			kryo.setRegistrationOptional(true)
 			kryo.register(classOf[RoutedActorRef], new ActorRefSerializer(kryo, system))
 			kryo.register(classOf[RemoteActorRef], new ActorRefSerializer(kryo, system))
+			kryo.register(classOf[LocalActorRef], new ActorRefSerializer(kryo, system))
+			kryo.register(classOf[org.hyflow.granola.common.TxnType.T], new EnumerationSerializer(kryo))
 			new ObjectBuffer(kryo)
 		}
 	}
 		buffer.get.readClassAndObject(bytes)
 	}
 }
+
+*/
+
+// New Kryo 2.20
+
+import com.esotericsoftware.kryo
+import com.esotericsoftware.kryo._
+import com.esotericsoftware.kryo.io.{ Input, Output }
+import org.objenesis.strategy.StdInstantiatorStrategy
+
+private class ActorRefSerializer(val system: ExtendedActorSystem) extends kryo.Serializer[ActorRef] {
+
+	override def read(kryo: Kryo, input: Input, typ: Class[ActorRef]): ActorRef = {
+		val path = input.readString()
+		system.actorFor(path)
+	}
+
+	override def write(kryo: Kryo, output: Output, obj: ActorRef) = {
+		Serialization.currentTransportAddress.value match {
+			case null => output.writeString(obj.path.toString)
+			case addr => output.writeString(obj.path.toStringWithAddress(addr))
+		}
+	}
+}
+
+class ScalaEnumSerializer extends kryo.Serializer[Enumeration#Value] with Logging {
+
+	// Caching of parent enum types for value types
+	var valueClass2enumClass = immutable.Map[Class[_], Class[_]]()
+	// Cache enumeration values for a given enumeration class
+	var enumClass2enumValues = immutable.Map[Class[_], mutable.ArrayBuffer[Enumeration#Value]]()
+
+	private def cacheEnumValue(obj: Enumeration#Value) = {
+		log.warn("CacheEnumValue ( obj = %s, class = %s )", obj, obj.getClass())
+		val enumClass = valueClass2enumClass.get(obj.getClass) getOrElse {
+			val parentEnum = obj.asInstanceOf[AnyRef].getClass.getSuperclass.getDeclaredFields.find(f => f.getName == "$outer").get
+			val parentEnumObj = parentEnum.get(obj)
+			val enumClass = parentEnumObj.getClass
+			log.warn("OrElse, parentEnum = %s, parentEnumObj = %s", parentEnum, parentEnumObj)
+			valueClass2enumClass += obj.getClass -> enumClass
+			val enumValues = enumClass2enumValues.get(enumClass) getOrElse {
+				val size = parentEnumObj.asInstanceOf[Enumeration].maxId + 1
+				val values = new mutable.ArrayBuffer[Enumeration#Value](size)
+				0 until size foreach { e => values += null }
+				enumClass2enumValues += enumClass -> values
+				values
+			}
+			enumClass
+		}
+
+		val enumValues = enumClass2enumValues.get(enumClass).get
+		log.warn("enumValues = %s", enumValues)
+
+		if (enumValues(obj.id) == null) {
+			enumValues.update(obj.id, obj)
+			log.warn("null, updating, enumValues = %s", enumValues)
+		}
+
+		log.warn("res enumClass = %s", enumClass)
+		enumClass
+	}
+
+	override def write(kryo: Kryo, output: Output, obj: Enumeration#Value) = {
+		val enumClass = cacheEnumValue(obj)
+		kryo.writeClass(output, enumClass)
+		output.writeInt(obj.id)
+	}
+
+	override def read(kryo: Kryo, input: Input, typ: Class[Enumeration#Value]): Enumeration#Value = {
+		val clazz = kryo.readClass(input).getType
+		val id = input.readInt()
+
+		val enumValues = enumClass2enumValues.get(clazz).getOrElse {
+			cacheEnumValue(kryo.newInstance(clazz).asInstanceOf[Enumeration](id))
+			enumClass2enumValues.get(clazz).get
+		}
+
+		val enumInstance = enumValues(id)
+		enumInstance
+	}
+}
+
+class KryoSerializer(val system: ExtendedActorSystem) extends Serializer with Logging {
+
+	import com.romix.akka.serialization.kryo.{
+		EnumerationSerializer,
+		ScalaMapSerializer,
+		ScalaSetSerializer,
+		ScalaCollectionSerializer,
+		ObjectPool,
+		KryoBasedSerializer
+	}
+
+	// This is whether "fromBinary" requires a "clazz" or not
+	def includeManifest: Boolean = false
+
+	// A unique identifier for this Serializer
+	def identifier = 123454323
+
+	// Delegate to a real serializer
+	def toBinary(obj: AnyRef): Array[Byte] = {
+		val ser = getSerializer
+		val bin = ser.toBinary(obj)
+		releaseSerializer(ser)
+		bin
+	}
+
+	def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
+		try {
+			val ser = getSerializer
+			val obj = ser.fromBinary(bytes, clazz)
+			releaseSerializer(ser)
+			obj
+		} catch {
+			case e: KryoException =>
+				log.error(e.getCause(), "KryoException.getCause()")
+				throw e
+		}
+	}
+
+	//val serializerPool = new ObjectPool[Serializer](8, () => {
+	//	new KryoBasedSerializer(getKryo, 16 * 1024, 6, false)
+	//})
+	
+	val threadLocalSerializer = new ThreadLocal[Serializer] {
+		override def initialValue() = {
+			new KryoBasedSerializer(getKryo, 16 * 1024, 6, false)
+		}
+	}
+
+	private def getSerializer = threadLocalSerializer.get() //serializerPool.fetch
+	private def releaseSerializer(ser: Serializer) = () //serializerPool.release(ser)
+
+	private def getKryo(): Kryo = {
+		val kryo = new Kryo()
+		// Support deserialization of classes without no-arg constructors
+		kryo.setInstantiatorStrategy(new StdInstantiatorStrategy())
+
+		// Support serialization of Scala collections
+		//kryo.addDefaultSerializer(classOf[scala.Enumeration#Value], classOf[ScalaEnumSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.Map[_, _]], classOf[ScalaMapSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.Set[_]], classOf[ScalaSetSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaMapSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.generic.SetFactory[scala.collection.Set]], classOf[ScalaSetSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.Traversable[_]], classOf[ScalaCollectionSerializer])
+
+		// Akka Remote actors
+		kryo.addDefaultSerializer(classOf[ActorRef], new ActorRefSerializer(system))
+
+		kryo.setReferences(true)
+		kryo
+	}
+}
+

File hyflow-core/src/main/scala/scala/concurrent/stm/motstm/MotInTxn.scala

 		try {
 			runBlock(block)
 		} finally {
+			log.info("HOUDINI: txn=%s", block.getClass().toString()) // TODO // TODO // TODO // remove debug
 			rethrowFromStatus(openComplete())
 		}
 	}
 		val exec = _currentLevel.executor
 		//detach()
 		_currentLevel.stats.countCommit()
+		
+		///////// TODO // TODO // TODO: remove this
+		if (true) {
+			// Log readset
+			log.info("HOUDINI: rs=%s", _currentLevel.rsHandleSet.map(_._hy_id))
+			// Log writeset
+			log.info("HOUDINI: ws=%s", _currentLevel.wsGetHandles().map(_._hy_id) )
+		}
+		///////// END REMOVE
 
 		_currentLevel = _currentLevel.parLevel
 

File hyflow-granola/src/main/scala/akka/serialization/KryoSerializer.scala

+package akka.serialization
+
+import com.typesafe.config.ConfigFactory
+import java.nio.ByteBuffer
+import akka.actor._
+import akka.routing._
+import akka.remote._
+import scala.collection.{ mutable, immutable }
+import org.eintr.loglady.Logging
+
+/*
+// Old Kryo 1.05
+import com.esotericsoftware.kryo._
+import de.javakaffee.kryoserializers._
+import com.esotericsoftware.kryo.serialize.SimpleSerializer
+
+private class ActorRefSerializer(val kryo: Kryo, val system: ExtendedActorSystem) extends SimpleSerializer[ActorRef] {
+	override def read(buffer: ByteBuffer): ActorRef = {
+		val path = kryo.readObject(buffer, classOf[String])
+		system.actorFor(path)
+	}
+
+	override def write(buffer: ByteBuffer, ref: ActorRef) {
+		Serialization.currentTransportAddress.value match {
+			case null => kryo.writeObject(buffer, ref.path.toString)
+			case addr => kryo.writeObject(buffer, ref.path.toStringWithAddress(addr))
+		}
+	}
+}
+
+private class EnumerationSerializer(val kryo: Kryo) extends SimpleSerializer[Enumeration#Value] {
+
+	// Caching of parent enum types for value types
+	var valueClass2enumClass = immutable.Map[Class[_], Class[_]]()
+	// Cache enumeration values for a given enumeration class
+	var enumClass2enumValues = immutable.Map[Class[_], mutable.ArrayBuffer[Enumeration#Value]]()
+
+	private def cacheEnumValue(obj: Enumeration#Value) = {
+
+		val enumClass = valueClass2enumClass.get(obj.getClass) getOrElse {
+			val parentEnum = obj.asInstanceOf[AnyRef].getClass.getSuperclass.getDeclaredFields.find(f => f.getName == "$outer").get
+			val parentEnumObj = parentEnum.get(obj)
+			val enumClass = parentEnumObj.getClass
+			valueClass2enumClass += obj.getClass -> enumClass
+			val enumValues = enumClass2enumValues.get(enumClass) getOrElse {
+				val size = parentEnumObj.asInstanceOf[Enumeration].maxId + 1
+				val values = new mutable.ArrayBuffer[Enumeration#Value](size)
+				0 until size foreach { e => values += null }
+				enumClass2enumValues += enumClass -> values
+				values
+			}
+			enumClass
+		}
+
+		val enumValues = enumClass2enumValues.get(enumClass).get
+
+		if (enumValues(obj.id) == null) {
+			enumValues.update(obj.id, obj)
+		}
+
+		enumClass
+	}
+
+	override def write(buffer: ByteBuffer, obj: Enumeration#Value) {
+		println("Writing enum %s", obj)
+		val enumClass = cacheEnumValue(obj)
+		kryo.writeClass(buffer, enumClass)
+		kryo.writeObject(buffer, obj.id)
+	}
+
+	override def read(buffer: ByteBuffer): Enumeration#Value = {
+		val clazz = kryo.readClass(buffer).getType
+		val id = kryo.readObject(buffer, classOf[Int]) 
+
+		val enumValues = enumClass2enumValues.get(clazz).getOrElse {
+			cacheEnumValue(kryo.newInstance(clazz).asInstanceOf[Enumeration](id))
+			enumClass2enumValues.get(clazz).get
+		}
+
+		val enumInstance = enumValues(id)
+		println("Reading enum %s", enumInstance)
+		enumInstance
+	}
+}
+
+class KryoSerializer(val system: ExtendedActorSystem) extends Serializer {
+	val buffer = new ThreadLocal[ObjectBuffer] {
+		override def initialValue() = {
+			val kryo = new KryoReflectionFactorySupport
+			kryo.setRegistrationOptional(true)
+			kryo.register(classOf[RoutedActorRef], new ActorRefSerializer(kryo, system))
+			kryo.register(classOf[RemoteActorRef], new ActorRefSerializer(kryo, system))
+			kryo.register(classOf[LocalActorRef], new ActorRefSerializer(kryo, system))
+			kryo.register(classOf[org.hyflow.granola.common.TxnType.T], new EnumerationSerializer(kryo))
+			new ObjectBuffer(kryo)
+		}
+	}
+
+	// This is whether "fromBinary" requires a "clazz" or not
+	def includeManifest: Boolean = false
+
+	def identifier = 4747835
+
+	def toBinary(obj: AnyRef): Array[Byte] = {
+		buffer.get.writeClassAndObject(obj)
+	}
+
+	def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
+		buffer.get.readClassAndObject(bytes)
+	}
+}
+
+*/
+
+// New Kryo 2.20
+
+import com.esotericsoftware.kryo
+import com.esotericsoftware.kryo._
+import com.esotericsoftware.kryo.io.{ Input, Output }
+import org.objenesis.strategy.StdInstantiatorStrategy
+
+private class ActorRefSerializer(val system: ExtendedActorSystem) extends kryo.Serializer[ActorRef] {
+
+	override def read(kryo: Kryo, input: Input, typ: Class[ActorRef]): ActorRef = {
+		val path = input.readString()
+		system.actorFor(path)
+	}
+
+	override def write(kryo: Kryo, output: Output, obj: ActorRef) = {
+		Serialization.currentTransportAddress.value match {
+			case null => output.writeString(obj.path.toString)
+			case addr => output.writeString(obj.path.toStringWithAddress(addr))
+		}
+	}
+}
+
+class ScalaEnumSerializer extends kryo.Serializer[Enumeration#Value] with Logging {
+
+	// Caching of parent enum types for value types
+	var valueClass2enumClass = immutable.Map[Class[_], Class[_]]()
+	// Cache enumeration values for a given enumeration class
+	var enumClass2enumValues = immutable.Map[Class[_], mutable.ArrayBuffer[Enumeration#Value]]()
+
+	private def cacheEnumValue(obj: Enumeration#Value) = {
+		log.warn("CacheEnumValue ( obj = %s, class = %s )", obj, obj.getClass())
+		val enumClass = valueClass2enumClass.get(obj.getClass) getOrElse {
+			val parentEnum = obj.asInstanceOf[AnyRef].getClass.getSuperclass.getDeclaredFields.find(f => f.getName == "$outer").get
+			val parentEnumObj = parentEnum.get(obj)
+			val enumClass = parentEnumObj.getClass
+			log.warn("OrElse, parentEnum = %s, parentEnumObj = %s", parentEnum, parentEnumObj)
+			valueClass2enumClass += obj.getClass -> enumClass
+			val enumValues = enumClass2enumValues.get(enumClass) getOrElse {
+				val size = parentEnumObj.asInstanceOf[Enumeration].maxId + 1
+				val values = new mutable.ArrayBuffer[Enumeration#Value](size)
+				0 until size foreach { e => values += null }
+				enumClass2enumValues += enumClass -> values
+				values
+			}
+			enumClass
+		}
+
+		val enumValues = enumClass2enumValues.get(enumClass).get
+		log.warn("enumValues = %s", enumValues)
+
+		if (enumValues(obj.id) == null) {
+			enumValues.update(obj.id, obj)
+			log.warn("null, updating, enumValues = %s", enumValues)
+		}
+
+		log.warn("res enumClass = %s", enumClass)
+		enumClass
+	}
+
+	override def write(kryo: Kryo, output: Output, obj: Enumeration#Value) = {
+		val enumClass = cacheEnumValue(obj)
+		kryo.writeClass(output, enumClass)
+		output.writeInt(obj.id)
+	}
+
+	override def read(kryo: Kryo, input: Input, typ: Class[Enumeration#Value]): Enumeration#Value = {
+		val clazz = kryo.readClass(input).getType
+		val id = input.readInt()
+
+		val enumValues = enumClass2enumValues.get(clazz).getOrElse {
+			cacheEnumValue(kryo.newInstance(clazz).asInstanceOf[Enumeration](id))
+			enumClass2enumValues.get(clazz).get
+		}
+
+		val enumInstance = enumValues(id)
+		enumInstance
+	}
+}
+
+class KryoSerializer(val system: ExtendedActorSystem) extends Serializer with Logging {
+
+	import com.romix.akka.serialization.kryo.{
+		EnumerationSerializer,
+		ScalaMapSerializer,
+		ScalaSetSerializer,
+		ScalaCollectionSerializer,
+		ObjectPool,
+		KryoBasedSerializer
+	}
+
+	// This is whether "fromBinary" requires a "clazz" or not
+	def includeManifest: Boolean = false
+
+	// A unique identifier for this Serializer
+	def identifier = 123454323
+
+	// Delegate to a real serializer
+	def toBinary(obj: AnyRef): Array[Byte] = {
+		val ser = getSerializer
+		val bin = ser.toBinary(obj)
+		releaseSerializer(ser)
+		bin
+	}
+
+	def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
+		try {
+			val ser = getSerializer
+			val obj = ser.fromBinary(bytes, clazz)
+			releaseSerializer(ser)
+			obj
+		} catch {
+			case e: KryoException =>
+				log.error(e.getCause(), "KryoException.getCause()")
+				throw e
+		}
+	}
+
+	//val serializerPool = new ObjectPool[Serializer](8, () => {
+	//	new KryoBasedSerializer(getKryo, 16 * 1024, 6, false)
+	//})
+	
+	val threadLocalSerializer = new ThreadLocal[Serializer] {
+		override def initialValue() = {
+			new KryoBasedSerializer(getKryo, 16 * 1024, 6, false)
+		}
+	}
+
+	private def getSerializer = threadLocalSerializer.get() //serializerPool.fetch
+	private def releaseSerializer(ser: Serializer) = () //serializerPool.release(ser)
+
+	private def getKryo(): Kryo = {
+		val kryo = new Kryo()
+		// Support deserialization of classes without no-arg constructors
+		kryo.setInstantiatorStrategy(new StdInstantiatorStrategy())
+
+		// Support serialization of Scala collections
+		//kryo.addDefaultSerializer(classOf[scala.Enumeration#Value], classOf[ScalaEnumSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.Map[_, _]], classOf[ScalaMapSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.Set[_]], classOf[ScalaSetSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaMapSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.generic.SetFactory[scala.collection.Set]], classOf[ScalaSetSerializer])
+		kryo.addDefaultSerializer(classOf[scala.collection.Traversable[_]], classOf[ScalaCollectionSerializer])
+
+		// Akka Remote actors
+		kryo.addDefaultSerializer(classOf[ActorRef], new ActorRefSerializer(system))
+
+		kryo.setReferences(true)
+		kryo
+	}
+}
+

File hyflow-granola/src/main/scala/org/hyflow/benchmarks/Benchmark.scala

 import scala.util.Random
 import scala.collection.mutable
 
+import akka.actor._
 import akka.dispatch.{ Dispatchers, Future, Await }
 import akka.pattern.ask
 import akka.util.Timeout
 import org.eintr.loglady.Logging
 
 object BenchCfg extends util.CfgObj("hyflow.benchmarks", Map(
-		"clientThreads" -> 3,
-		"roundTime" -> 1,
-		"rounds" -> 20,
-		"readOnlyRatio" -> 50.0))
+	"clientThreads" -> 16,
+	"roundTime" -> 10,
+	"rounds" -> 100,
+	"readOnlyRatio" -> 50.0))
 
 abstract class Benchmark extends Logging {
 
 	val name: String
 	protected def benchInit(): Unit
 	def benchIter(): Unit
+	def benchIterAsync(implicit sender: ActorRef): Tuple2[Long, Int]
 	def benchCheck(): Boolean
 
 	val LOCAL_CONFIG_OPTION = 1
 		// Perform initialization
 		benchInit()
 
+		//useThreads()
+		useActor()
+	}
+
+	def useThreads() {
 		// Create client threads
 		val node = 0
 		val threads = for (i <- 0 until BenchCfg.i("clientThreads"))
 		}
 	}
 
+	def useActor() {
+		val benchActor = Hyflow.system.actorOf(Props[BenchmarkActor], "bench")
+
+		for (i <- 1 to BenchCfg.i("rounds")) {
+			Hyflow.system.scheduler.scheduleOnce(
+					BenchCfg.i("roundTime") seconds,
+					benchActor,
+					"stop")
+			
+			val round_fu = ask(benchActor, this)(60 seconds)
+			Await.ready(round_fu, 60 seconds)
+
+			val ktps_fu = ask(Hyflow.granolaExecutor, QueryThroughput())(1 second)
+			val ktps = Await.result(ktps_fu, 1 second)
+			log.info("Round %d: %s ktps", i, ktps)
+
+			Hyflow.barrier("end-round", false)
+		}
+		println("done")
+	}
+
 	private class ClientThread(node: Int, id: Int) extends Thread("bench-n%d-t%d".format(node, id)) with Logging {
 		var ops = 0
 		val rand: Random = new Random(##)
 
 				// Check the time every once in a while
 				j += 1
-				if (j == 1000) {
+				if (j == 50) {
 					if (System.currentTimeMillis > estimatedEndTime) {
 						stop = true
 					}
 			for (i <- 1 to BenchCfg.i("rounds")) {
 				val elapsed = benchRound()
 				if (id == 0) {
-					log.info("Round %d: time=%d ms", i, elapsed )
+					log.info("Round %d: time=%d ms", i, elapsed)
 					//ops = 0
 
 					val ktps_fu = ask(Hyflow.granolaExecutor, QueryThroughput())(1 second)
 	}
 }
 
+class BenchmarkActor extends Actor {
+	var running = true
+	var pending = mutable.Map[Long, Int]()
+	var starter: ActorRef = _
+	var bench: Benchmark = _
+
+	def receive() = {
+		case b: Benchmark =>
+			running = true
+			starter = sender
+			bench = b
+			pending.clear()
+			for (i <- 0 until BenchCfg.i("clientThreads")) {
+				pending += bench.benchIterAsync 
+			}
+		case "stop" =>
+			running = false
+		case m: TxnReply =>
+			Hyflow.updateClock(m.sclk)
+			val entry = pending(m.txnId)
+			if (entry == 1) {
+				pending -= m.txnId
+				if (running) {
+					pending += bench.benchIterAsync
+				} else {
+					if (pending.size == 0) {
+						starter ! "done"
+					}
+				}
+			} else {
+				pending(m.txnId) = entry - 1
+			}
+	}
+}
+

File hyflow-granola/src/main/scala/org/hyflow/benchmarks/granolabase/GranolaBaseBenchmark.scala

 
 import scala.collection.mutable
 
+import akka.actor._
+
 object BaseBenchCfg {
 	val NUM_COUNTERS = 1000000
 }
 	}
 	
 	def benchIter() {
-		if (true) {
+		if (false) {
 			Hyflow.invokeSingle(0, ("transfer", null), false)
 		} else {
-			Hyflow.invokeSingle(0, ("sum-all", null), true)
+			Hyflow.invokeIndep(Seq( (0, ("transfer", null)), (1, ("transfer", null)) ), true)
+		}
+	}
+	
+	def benchIterAsync(implicit sender: ActorRef) = {
+		if (true) {
+			val tid = Hyflow.invokeAsyncSingle(0, ("transfer", null), false)
+			(tid, 1)
+		} else {
+			val tid = Hyflow.invokeAsyncIndep(Seq( (0, ("transfer", null)), (1, ("transfer", null)) ), true)
+			(tid, 2)
 		}
 	}
 	

File hyflow-granola/src/main/scala/org/hyflow/granola/client/GranolaClient.scala

 
 import java.util.concurrent.atomic._
 
+trait GranolaClient extends Logging {
+	import org.hyflow.Hyflow.{ HyflowTxnCmd, system }
 
-trait GranolaClient extends Logging {
-	import org.hyflow.Hyflow.{HyflowTxnCmd, system}
-	
 	// TODO: May need to make this an atomic reference??? 
 	var cluster = Map[Long, RepoInfo]()
-	
+
 	private val cclk = new AtomicLong()
 
 	private def getTxnId(): Int = {
 		scala.util.Random.nextInt()
 		//throw new AbstractMethodError()
 	}
-	
-	private def updateClock(sclk: Long) {
+
+	def updateClock(sclk: Long) {
 		while (true) {
 			val prev = cclk.get
 			if (sclk > prev) {
 				if (cclk.compareAndSet(prev, sclk))
 					return
-			} else 
+			} else
 				return
 		}
 	}
 
 	def invokeSingle(rid: Int, txnCmd: HyflowTxnCmd, readOnly: Boolean): Any = {
 		val req = TxnReq_Single(txnCmd, readOnly, getTxnId(), cclk.get)
-		val req_fu = ask(cluster(rid).entry.dispatcher, req)(5 seconds)
-		val reply = Await.result(req_fu, 5 seconds).asInstanceOf[TxnReply]
+		val req_fu = ask(cluster(rid).entry.dispatcher, req)(50 seconds)
+		val reply = Await.result(req_fu, 50 seconds).asInstanceOf[TxnReply]
 		updateClock(reply.sclk)
 		reply.result
 	}
 
+	def invokeAsyncSingle(rid: Int, txnCmd: HyflowTxnCmd, readOnly: Boolean)(implicit sender: ActorRef) = {
+		val txnid = getTxnId()
+		val req = TxnReq_Single(txnCmd, readOnly, txnid, cclk.get)
+		cluster(rid).entry.dispatcher ! req
+		txnid
+	}
+
 	private def invoke_helper(
 		ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]],
 		makeReq: (HyflowTxnCmd, Seq[Int], Long, Long) => TxnReq): Seq[Any] = {
 		val reqs_fu =
 			for ((rid, txnCmd) <- ridTxnCmds) yield {
 				val req = makeReq(txnCmd, repoIds, txnId, cclk_)
-				ask(cluster(rid).entry.dispatcher, req)(5 seconds)
+				ask(cluster(rid).entry.dispatcher, req)(50 seconds)
 			}
 
 		implicit val executor = system.dispatcher
 		val seqRes_fu = Future.sequence(reqs_fu)
-		val replies = Await.result(seqRes_fu, 5 seconds).asInstanceOf[Seq[TxnReply]]
+		val replies = Await.result(seqRes_fu, 50 seconds).asInstanceOf[Seq[TxnReply]]
+		updateClock(replies.map(_.sclk).max)
 		replies.map(_.result)
 	}
 
+	private def invokeAsync_helper(
+		ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]],
+		makeReq: (HyflowTxnCmd, Seq[Int], Long, Long) => TxnReq)
+		(implicit sender: ActorRef): Long = {
+		val repoIds = ridTxnCmds.map(_._1)
+		val txnId = getTxnId()
+		val cclk_ = cclk.get
+		for ((rid, txnCmd) <- ridTxnCmds) {
+			val req = makeReq(txnCmd, repoIds, txnId, cclk_)
+			cluster(rid).entry.dispatcher ! req
+		}
+		txnId
+	}
+
 	def invokeIndep(ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]], readOnly: Boolean): Seq[Any] = {
 		invoke_helper(ridTxnCmds, TxnReq_Indep(_, readOnly, _, _, _))
 	}
 	def invokeCoord(ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]]): Seq[Any] = {
 		invoke_helper(ridTxnCmds, TxnReq_Coord(_, _, _, _))
 	}
+	
+	def invokeAsyncIndep(ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]], readOnly: Boolean)(implicit sender: ActorRef): Long = {
+		invokeAsync_helper(ridTxnCmds, TxnReq_Indep(_, readOnly, _, _, _))
+	}
+
+	def invokeAsyncCoord(ridTxnCmds: Seq[Tuple2[Int, HyflowTxnCmd]])(implicit sender: ActorRef): Long = {
+		invokeAsync_helper(ridTxnCmds, TxnReq_Coord(_, _, _, _))
+	}
 }

File hyflow-granola/src/main/scala/org/hyflow/granola/common/RepoInfo.scala

 	var voteManager: ActorRef = _
 }
 
-case class RepoInfo(rid: Long, entry: NodeEntryPoints, replicas: Set[NodeEntryPoints])
+case class RepoInfo(rid: Long, entry: NodeEntryPoints, var replicas: Set[NodeEntryPoints])

File hyflow-granola/src/main/scala/org/hyflow/granola/common/TxnInfo.scala

 sealed abstract class TxnReq {
 	def txnCmd: HyflowTxnCmd
 	def txnId: Long
-	def txnType: TxnType.T
+	def txnType: Int
 	def cclk: Long
 }
 
 		txnId: Long,
 		cclk: Long
 	) extends TxnReq {
-	override val txnType = TxnType.Single
+	override val txnType = TxnType.Single.id
 }
 
 case class TxnReq_Indep(
 		txnId: Long, 
 		cclk: Long
 	) extends TxnReq {
-	override val txnType = TxnType.Indep
+	override val txnType = TxnType.Indep.id
 }
 
 case class TxnReq_Coord(
 		txnId: Long, 
 		cclk: Long
 	) extends TxnReq {
-	override val txnType = TxnType.Coord
+	override val txnType = TxnType.Coord.id
 }
 
 case class TxnReply(

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaDispatcher.scala

 
 import org.hyflow.Hyflow
 import org.hyflow.granola.common._
-
 import org.eintr.loglady.Logging
 import akka.actor._
-
 import scala.collection.SortedMap
 import scala.collection.mutable.ListBuffer
 
-case class TxnRecord(req: TxnReq, ts: Long, client: ActorRef)
-
 // Dispatcher (Worker pool + transaction dispatcher)
 class GranolaDispatcher extends Actor with Logging {
 
 	def receive() = {
 		// Request from client
 		case req: TxnReq =>
-			log.debug("Received request. | req = %s", req)
-
 			// Step 1 of the protocol: select time-stamp 
 			// (this is only one example of how TS can be selected)
 			// For Independent transactions, this is a proposedTS
 			val ts = scala.math.max(req.cclk, Hyflow.repoState.lastExecTS.get) + 1
 
 			// Create transaction record
-			val tsKey = TS_TID(ts, req.txnId)
+			val key = TS_TID(ts, req.txnId)
+			log.debug("Received request. | req = %s | key = %s", req, key)
 			val record = TxnRecord(req, ts, sender)
 
 			// If it's a read-only, single-node transaction, it's ready right away
-			if (req.txnType == TxnType.Single && req.asInstanceOf[TxnReq_Single].readOnly) {
-				ready += ((tsKey, record))
+			if (req.txnType == TxnType.Single.id && req.asInstanceOf[TxnReq_Single].readOnly) {
+				ready += ((key, record))
 				processReadyTransactions()
 			} else {
 				// Otherwise record pending transaction
-				pending += ((tsKey, record))
+				pending += ((key, record))
 
 				// Step 2: stable log write
-				Hyflow.stableLog ! LogReq(req, ts)
+				Hyflow.stableLog ! LogReq(key, record)
 			}
 
 		case m: LogAck =>
-			val key = m.tsKey
+			val key = m.key
 			val record = pending(key)
-			record.req.txnType match {
-				case TxnType.Single =>
-					log.debug("SingleNode transaction is ready. | key = %s", key)
+			val id = record.req.txnType
+			if (id == TxnType.Single.id) {
+				log.debug("SingleNode transaction is ready. | key = %s", key)
 
-					// Step 3 of SingleNode: Ready to execute
-					// Update state
-					val ready0 = pending(key)
-					pending -= key
-					ready += ((key, ready0)) // TODO: try to avoid this insertion if possible
-					// Send to executor
-					processReadyTransactions()
-				case TxnType.Indep =>
-					log.debug("Stable log done for Independent transaction. | key = %s", key)
+				// Step 3 of SingleNode: Ready to execute
+				// Update state
+				val ready0 = pending(key)
+				pending -= key
+				ready += ((key, ready0)) // TODO: try to avoid this insertion if possible
+				// Send to executor
+				processReadyTransactions()
+			} else if (id == TxnType.Indep.id) {
+				log.debug("Stable log done for Independent transaction. | key = %s", key)
 
-					// Step 3 of Independent: Vote for final time-stamp
-					val req = record.req.asInstanceOf[TxnReq_Indep]
-					Hyflow.voteMgr ! IVoteReq(key, req.repoIds)
+				// Step 3 of Independent: Vote for final time-stamp
+				val req = record.req.asInstanceOf[TxnReq_Indep]
+				Hyflow.voteMgr ! IVoteReq(key, req.repoIds)
+			} else {
+				log.error("Missed match case.")
+				log.error("Enum val (m, TxnType.Indep): %s, %s",
+					m,
+					TxnType.Indep)
+				log.error("hash: %s %s", System.identityHashCode(m), System.identityHashCode(TxnType.Indep))
+				log.error("toString: %s %s",
+					m.asInstanceOf[AnyRef].toString(),
+					TxnType.Indep.asInstanceOf[AnyRef].toString())
+				log.error("Class (m, TxnType.T, TxnType.Indep): %s, %s, %s",
+					m.getClass(),
+					classOf[TxnType.T],
+					TxnType.Indep.getClass())
+				log.error("Class loader (m, class, this): %s, %s, %s",
+					m.getClass().getClassLoader(),
+					classOf[TxnType.T].getClassLoader(),
+					this.getClass().getClassLoader())
+				log.error("test: %s, %s, %s",
+					m.equals(TxnType.Indep),
+					m.getClass() == TxnType.Indep.getClass(),
+					m == TxnType.Indep.id)
+				System.exit(0)
 			}
 
 		// Independent transaction is ready (TS voting process complete) 
 		case m: IVoteReply =>
+			log.debug("Received IVote reply. | m = %s", m)
 			val key0 = m.key
 			if (key0.ts == m.finalTS) {
 				// Transaction has the same timestamp

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaExecutor.scala

 
 	def receive() = {
 
-		case batch: List[RequestRecord] =>
-			for (req <- batch) {
+		case batch: List[TxnRecord] =>
+			for (record <- batch) {
 				// Update repository timestamp
-				Hyflow.repoState.lastExecTS.set(req.ts)
+				Hyflow.repoState.lastExecTS.set(record.ts)
 				// Locate
-				val block = Hyflow.atomicBlocks(req.req.txnReq._1)
+				val block = Hyflow.atomicBlocks(record.req.txnCmd._1)
 				// Execute
-				val res = block(req.req.txnReq._2)
+				val res = block(record.req.txnCmd._2)
 				// Reply
-				req.client.tell(InvocationReply(req.req.txnId, res, req.ts))
+				val reply = TxnReply(record.req.txnId, res, record.ts)
+				record.client ! reply
 				// Stats
 				roundOps += 1
 			}

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaReplica.scala

 package org.hyflow.granola.server
 
-
 import org.eintr.loglady.Logging
 
 import akka.actor._
 
+class GranolaReplica extends Actor with Logging {
 
-class GranolaReplica extends Actor with Logging {
-	
-	var pendingReplication: List[List[RequestRecord]] = Nil
-	
+	var pendingReplication: List[List[TxnRecord]] = Nil
+
 	def receive() = {
-	// Master has requested replication
+		// Master has requested replication
 		case m: ReplicationReq =>
 			// TODO: implement log
 			pendingReplication ::= m.batch
 			// Acknowledge request
 			sender ! ReplicationAck
 	}
-	
+
 }

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaStableLog.scala

 	def replicate() {
 		// Notify transactions in current batch
 		for ((m, sndr) <- crtLogBatch) {
-			sndr ! LogAck(m.req.txnId)
+			sndr ! LogAck(TS_TID(m.ts, m.req.txnId))
 		}
 
 		// Update with new batch
 			immediate = false
 		} else {
 			// Nothing to do, we are the only replica in this repo
-			sender ! LogAck(crtLogBatch.head._1.req.txnId)
+			val record = crtLogBatch.head._1
+			val key = TS_TID(record.ts, record.req.txnId)
+			log.debug("Log done. | key=%s", key)
+			sender ! LogAck(key)
 			crtLogBatch = Nil
 			immediate = true
 		}
 			updateReplicas()
 		
 		// Transaction requests stable log
-		case m: TxnRecord =>
-			nextLogBatch.append((m, sender))
+		case m: LogReq =>
+			log.debug("Received log request. | %s", m)
+			nextLogBatch.append((m.record, sender))
 
 			if (immediate) {
 				replicate()

File hyflow-granola/src/main/scala/org/hyflow/granola/server/GranolaVoteMgr.scala

 
 	def receive() = {
 		case req: IVoteReq =>
+			log.debug("Received vote request. | %s", req)
 			val key = req.key
-			val vote = IVoteMsg(key, myrid, TxnStatus.Commit, false)
+			val myrid = Hyflow.repoState.repoId
+			val vote = IVoteMsg(key, myrid, TxnStatus.Commit.id, false)
 			votes.get(key.tid) match {
 				case Some(record) =>
 					// Record exists, update
 					votes += (key.tid -> record)
 			}
 			// Send own vote to peers
-			val myrid = Hyflow.repoState.repoId
 			for (rid <- req.peers if rid != myrid) {
 				Hyflow.cluster(rid).entry.voteManager ! vote
 			}
 		
 		case vote: IVoteMsg =>
+			log.debug("Received vote. | %s", vote)
 			val key = vote.key
 			votes.get(key.tid) match {
 				case Some(record) =>
 	}
 
 	def processVote(vote: IVoteMsg, record: VoteRecord) {
-		val key = vote.key
+		val key = record.key
+		val proposedTS = vote.key.ts 
 		record.numVotes += 1
-		if (key.ts > record.maxTS)
-			record.maxTS = key.ts
+		if (proposedTS > record.maxTS)
+			record.maxTS = proposedTS
 		if (record.numVotes == record.expectedVotes) {
 			// Received all votes, inform dispatcher
 			Hyflow.granolaDispatcher ! IVoteReply(key, record.maxTS)

File hyflow-granola/src/main/scala/org/hyflow/granola/server/ServerMessages.scala

 	}
 }
 
+case class TxnRecord(req: TxnReq, ts: Long, client: ActorRef)
+
 // stable log
 // TODO: do we need to send the whole request or just the command?
-case class LogReq(tsKey: TS_TID, req: TxnReq) 
-case class LogAck(tsKey: TS_TID)
+case class LogReq(key: TS_TID, record: TxnRecord) 
+case class LogAck(key: TS_TID)
 
 // log replication
 case class ReplicationReq(batch: List[TxnRecord])
 // Voting
 case class IVoteReq(key: TS_TID, peers: Seq[Int])
 case class IVoteReply(key: TS_TID, finalTS: Long)
-case class IVoteMsg(key: TS_TID, rid: Long, status: TxnStatus.T, retry: Boolean)
+case class IVoteMsg(key: TS_TID, rid: Long, status: Int, retry: Boolean)
 
 // execution
 case class ExecReq()

File project/Build.scala

 // Define resolvers and dependencies
 object Resolvers {
   val typesafeRepo = "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases"
+  val typesafeSnapshot = "Typesafe Snapshots Repository" at "http://repo.typesafe.com/typesafe/snapshots/"
   val scalatoolsRepo = "Scala-tools.org Repository" at "http://scala-tools.org/repo-releases/"
   val mavenRepo = "Maven Repo" at "http://repo2.maven.org/maven2/"
   
-  val all = Seq(typesafeRepo, scalatoolsRepo, mavenRepo)
+  val all = Seq(typesafeRepo, scalatoolsRepo, mavenRepo, typesafeSnapshot)
 }
 
 
   val loglady = 	"org.eintr.loglady" %% 	"loglady" 		% "1.0.0"
   val scalatest = 	"org.scalatest" 	%% "scalatest" 		% "1.7.1" 	% "test"
 
-  val kryo = 		"com.googlecode" 	% "kryo"		% "1.04"
-  val kryoSer = 	"de.javakaffee"		% "kryo-serializers"	% "0.9"
+  //val kryo = 		"com.googlecode" 	% "kryo"		% "1.04"
+  val kryo = 		"com.esotericsoftware.kryo"	% "kryo"		% "2.20"
+  //val kryoSer = 	"de.javakaffee"		% "kryo-serializers"	% "0.9"
+  val kryoAkka = 		"com.romix.akka" 	% "akka-kryo-serialization" % "0.2-SNAPSHOT"
   val configrity = 	"org.streum" 		%%	"configrity-core" % "0.10.0"  
  
-  val all = Seq(akka, akkaRemote, akkaSlf4j, akkaTestkit, scalaStm, loglady, scalatest, configrity, kryo, kryoSer)
+  val all = Seq(akka, akkaRemote, akkaSlf4j, akkaTestkit, scalaStm, loglady, scalatest, configrity, kryo, kryoAkka)
 }    
 
 // Build settings