Commits

Jason Baldridge committed 6c24fb2

Added tutorial items for Actors.

Comments (0)

Files changed (5)

 libraryDependencies ++= Seq(
   "org.apache.hadoop" % "hadoop-core" % "0.20.2",
   "se.scalablesolutions.akka" % "akka-actor" % "1.2",
+  "se.scalablesolutions.akka" % "akka-stm" % "1.2",
   "net.sf.trove4j" % "trove4j" % "3.0.1"
 )
 

src/main/scala/fogbow/example/actor/BankStm.scala

+package fogbow.example.actor
+
+import akka.stm._
+import akka.actor._
+import akka.util.duration._
+import akka.event.EventHandler
+
+// From Akka documentation
+
+object BankStm {
+  type Account = Ref[Double]
+
+  case class Transfer(from: Account, to: Account, amount: Double)
+
+  class Transferer extends Actor {
+    implicit val txFactory = 
+      TransactionFactory(blockingAllowed = true, trackReads = true, timeout = 10 seconds)
+
+    def receive = {
+      case Transfer(from, to, amount) =>
+	atomic {
+          if (from.get < amount) {
+            EventHandler.info(this, "not enough money - retrying")
+            retry
+          }
+          EventHandler.info(this, "transferring")
+          from alter (_ - amount)
+          to alter (_ + amount)
+	}
+    }
+  }
+
+  def main (args: Array[String]) {
+
+    val account1 = Ref(100.0)
+    val account2 = Ref(100.0)
+
+    val transferer = Actor.actorOf(new Transferer).start
+
+    transferer ! Transfer(account1, account2, 500.0)
+    // INFO Transferer: not enough money - retrying
+
+    Thread.sleep(1000)
+
+    println("Account 1 gets paid!")
+    atomic { 
+      account1 alter (_ + 2000) 
+    }
+    // INFO Transferer: transferring
+
+    Thread.sleep(1000)
+
+    atomic { 
+      println("Account 1: " + account1.get)
+    }
+    // -> 1600.0
+
+    atomic { 
+      println("Account 2: " + account2.get)
+    }
+    // -> 600.0
+
+    println("Done.")
+    transferer.stop
+    System.exit(0)
+  }
+
+}

src/main/scala/fogbow/example/actor/Counter.scala

+package fogbow.example.actor
+
+import akka.actor.{Actor, ActorRef, PoisonPill}
+import Actor._
+
+object CountingDemo {
+  
+  case object Tick
+  case object Tock
+
+  class Counter extends Actor {
+    var counter = 0
+
+    def receive = {
+      case Tick => 
+	counter += 1
+	println(counter)
+
+      case Tock =>
+	counter -= 1
+	println(counter)
+
+    }
+  }
+
+  def main (args: Array[String]) {
+
+    println("Starting Counter 1.")
+    val counter1 = actorOf[Counter]
+    counter1.start
+
+    counter1 ! Tick
+
+    (1 to 10).foreach(num => counter1 ! Tick)
+
+
+    println("Starting Counter 2.")
+    val counter2 = actorOf[Counter].start
+    counter2 ! Tick
+
+    (1 to 10).foreach {
+      num => 
+    	counter1 ! Tock
+    	counter2 ! Tick
+    }
+ 
+    Thread.sleep(1000)
+    counter1.stop
+    counter2.stop
+  }
+
+}
+
+

src/main/scala/fogbow/example/actor/PingPong.scala

+package fogbow.example.actor
+
+import akka.actor.{Actor, ActorRef, PoisonPill}
+import Actor._
+
+import scala.util.Random
+
+object PingPong {
+  
+  val randomGen = new Random
+
+  case class Ball(partner: ActorRef, volley: Int)
+
+  case object EndGame
+
+  case class Player (name: String) extends Actor {
+
+    def receive = {
+
+      case Ball(partner, volley) =>
+	Thread.sleep(500)
+	
+	if (randomGen.nextInt(100) < 10) {
+
+	  println(name + ": ending game!")
+	  partner ! EndGame
+	  self.stop
+
+	} else {
+
+	  println (name + ": " + volley)
+	  partner ! Ball(self, volley+1)
+
+	}
+
+      case EndGame =>
+	self.stop
+    }
+  }
+
+  def main (args: Array[String]) {
+    val ping = actorOf(Player("ping"))
+    ping.start
+
+    val pong = actorOf(Player("pong"))
+    pong.start
+
+    pong ! Ball(ping, 1)
+  }
+
+}
+
+

src/main/scala/fogbow/example/actor/ShowTheFuture.scala

+package fogbow.example.actor
+
+import akka.actor.{Actor, ActorRef, PoisonPill}
+import Actor._
+import akka.dispatch.Future
+
+import scala.util.Random
+
+object ShowTheFuture {
+  
+  var verbose = false
+
+  val randomGen = new Random
+
+  case object Square
+  case object AddOne
+
+  class CalculatingActor (val number: Int) extends Actor {
+
+    def receive = {
+
+      case Square =>
+	//Thread.sleep(randomGen.nextInt(100))
+	Thread.sleep(number)
+	val squared = number*number
+        if (verbose) println("SQ: " + number + " -> " + squared)
+	self reply (squared)
+
+      case AddOne =>
+	Thread.sleep(randomGen.nextInt(10))
+	val added = number + 1
+	if (verbose) println("ADD: " + number + " -> " + added)
+	self reply (added)
+    }
+
+  }
+
+  def main (args: Array[String]) {
+
+    if (args.length > 0 && args(0) == "v") 
+      verbose = true
+  
+    // Actor based futures
+    val calculators: List[ActorRef] = 
+      (1 to 10).toList.map(number => actorOf(new CalculatingActor(number)).start)
+
+    val futureSquareSum: Future[Int] = 
+      Future.traverse(calculators)(calc => (calc ? Square).mapTo[Int]).map(_.sum)
+
+    // Retrieve right away: causes blocking
+    //val squareSum = futureSquareSum.get
+
+    val futureAddSum: Future[Int] = 
+      Future.traverse(calculators)(calc => (calc ? AddOne).mapTo[Int]).map(_.sum)
+
+    // Retrieve after add one messages have been sent: non-blocking
+    val squareSum = futureSquareSum.get
+
+    val addSum = futureAddSum.get
+    println("\nResult: " + addSum  + " / " + squareSum + " = " + addSum.toDouble/squareSum)
+
+    // Kill the calculators
+    calculators.foreach(_ ! PoisonPill)
+  }
+
+}
+
+object FutureDemo {
+
+  def main (args: Array[String]) = {
+
+    // Basic futures
+    val futureA = Future(42)
+    val futureB = Future(10.0)
+    println(futureA)
+    println("A/B = " + futureA.get / futureB.get)
+
+    // List of futures
+    val manyFutures: List[Future[Int]] = 
+      (1 to 100).toList.map(x => Future(x * 2 - 1))
+
+    val manyInts: List[Int] = 
+      manyFutures.map(futureInt => futureInt.get)
+
+    println("Sum: " + manyInts.sum)
+
+    // Another way that doesn't create Future[Int]'s, but directly
+    // constructs a Future[List[Int]]
+    val futureIntList: Future[List[Int]] = 
+      Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1))
+
+    val traversalSum = futureIntList.map(_.sum).get
+
+    println("Traversal sum: "+ traversalSum)
+  }
+
+}