Commits

Jason Baldridge  committed 11a833d

Actor based implementation of PageRank added.

  • Participants
  • Parent commits 10a5869

Comments (0)

Files changed (9)

     JARS_MANAGED=`find $FOGBOW_DIR/lib_managed -name '*.jar' -print | tr '\n' ':'`
 fi
 
-SCALA_LIB="$FOGBOW_DIR/project/boot/scala-2.9.0/lib/scala-library.jar"
+SCALA_LIB="$HOME/.sbt/boot/scala-2.9.1/lib/scala-library.jar"
 
 CP=$FOGBOW_DIR/target/classes:$JARS:$JARS_MANAGED:$SCALA_LIB:$CLASSPATH
 
 help()
 {
 cat <<EOF
-Fogbow 0.1.2 commands: 
+Fogbow 0.1.3 commands: 
 
   build         build Fogbow with SBT
   wordcount     do the standard word count example
     if test -f ~/.sbtconfig; then
 	. ~/.sbtconfig
     fi
-    java -Dfile.encoding=UTF8 -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256m ${SBT_OPTS} -jar $FOGBOW_DIR/bin/sbt-launch.jar "$@"
+    java -Dfile.encoding=UTF8 -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256m ${SBT_OPTS} -jar $FOGBOW_DIR/bin/sbt-launch-*.jar "$@"
 
 else 
 

File bin/sbt-launch-0.11.1.jar

Binary file added.

File bin/sbt-launch.jar

Binary file removed.
+import AssemblyKeys._
+
 name := "Fogbow"
 
-version := "0.1.2"
+version := "0.1.3"
 
 organization := "The University of Texas at Austin"
 
-scalaVersion := "2.9.0"
+scalaVersion := "2.9.1"
 
 crossPaths := false
 
 retrieveManaged := true
 
-libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "0.20.2"
+resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
 
-seq(sbtassembly.Plugin.assemblySettings: _*)
+libraryDependencies ++= Seq(
+  "org.apache.hadoop" % "hadoop-core" % "0.20.2",
+  "se.scalablesolutions.akka" % "akka-actor" % "1.2",
+  "net.sf.trove4j" % "trove4j" % "3.0.1"
+)
 
-jarName in Assembly := "fogbow-assembly.jar"
+seq(assemblySettings: _*)
 
+jarName in assembly := "fogbow-assembly.jar"

File lib/trove-3.0.0.jar

Binary file removed.

File project/build.properties

-sbt.version=0.10.1

File project/plugins.sbt

+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.7.2")

File project/plugins/build.sbt

-scalaVersion := "2.8.1"
-
-libraryDependencies <+= (sbtVersion) { sv => "com.eed3si9n" %% "sbt-assembly" % ("sbt" + sv + "_0.6") }
-
-

File src/main/scala/fogbow/graph/PageRank.scala

+package fogbow.graph
+
+import akka.actor.{Actor, ActorRef, PoisonPill}
+import akka.dispatch.Future
+import Actor._
+
+/**
+ * An implementation of PageRank using Akka actors. 
+ *
+ * @author Jason Baldridge
+ */
+object PageRank {
+
+  case object NextStep
+  case object Stop
+  case class SetNeighbors(neighbors: List[ActorRef])
+  case object SpreadMass
+  case class TakeMass(mass: Double)
+  case class Update(jumpTimesUniform: Double, oneMinusJumpFactor: Double, mass: Double)
+  case class Advance(worker: ActorRef)
+  case object GetPageRank
+
+  def main (args: Array[String]) {
+    val inputGraphFile = args(0)
+    val jumpFactor = if (args.length > 1) args(1).toDouble else .15
+    val graph = io.Source.fromFile(inputGraphFile).getLines.map {
+      line => line.split("\\t").toList
+    } map {
+      vertices => (vertices.head, vertices.tail)
+    } toList
+    
+    apply(graph, jumpFactor, 5)
+  }
+
+  /**
+   * Entry point into actor-based PageRank: create the actors for the graph and its
+   * vertices and start the work.
+   */
+  def apply (graph: List[(String, List[String])], jumpFactor: Double, maxIters: Int) {
+    val clock = actorOf(new Clock(maxIters)).start
+    val prGraph = actorOf(new PageRankGraph(clock, graph, jumpFactor)).start
+    prGraph ! NextStep
+  }
+
+  /**
+   * An actor that controls the entire graph. Stores all the vertex actors
+   * and dispatches work for them to do, and communicates with the Clock to
+   * keep each iteration of work separate.
+   */
+  class PageRankGraph (clock: ActorRef, graph: List[(String, List[String])], jumpFactor: Double) 
+  extends Actor {
+
+    val numVertices = graph.length
+    val uniformProbability = 1.0/numVertices
+    val jumpTimesUniform = jumpFactor/numVertices
+    val oneMinusJumpFactor = 1.0-jumpFactor
+
+    // Create and start the vertex actors
+    val idsToActors = graph.map {
+      case(vertexId, adjList) => 
+        (vertexId, actorOf(new Vertex(vertexId, adjList.length, uniformProbability)).start)
+    } toMap
+
+    // Have each vertex set its actor neighbors
+    graph foreach {
+      case(vertexId, adjacencyList) => {
+        val adjacentActors = adjacencyList.map(neighborId => idsToActors(neighborId))
+        idsToActors(vertexId) ! SetNeighbors(adjacentActors)
+      }
+    }
+
+    val vertices = idsToActors.values.toIndexedSeq
+
+    def receive = {
+
+      // Broadcast to all the vertices that they should push their
+      // labels to their neighbors
+      case NextStep => {
+
+        val totalMissingMass =
+          Future.traverse(vertices)(v => (v ? SpreadMass).mapTo[Double]).map(_.sum).get
+
+        val perVertexRedistributedMass = totalMissingMass / numVertices
+
+        val updates = Future.traverse(vertices)(
+          v => (v ? Update(jumpTimesUniform, 
+                           oneMinusJumpFactor, 
+                           perVertexRedistributedMass)).mapTo[(Double, Double)]).get
+        
+        
+        val (pageranks, diffs) = updates.unzip
+        println("Average diff: " + diffs.sum/numVertices)
+        println("Total PageRank mass (should == 1.0): " + pageranks.sum)
+        clock ! Advance(self)
+      }
+
+      // Output ten highest ranked vertices and then tell all the
+      // vertices to stop and then stop itself.
+      case Stop => 
+        val pageranks = Future.traverse(vertices)(
+          vertex => (vertex ? GetPageRank).mapTo[(String, Double)]).get
+
+        pageranks.sortBy(_._2).reverse.take(10).foreach {
+          case(id, pagerank) => println(pagerank + "\t" + id)
+        }
+        
+        vertices.foreach(vertex => vertex ! PoisonPill)
+        self.stop()
+    }
+  }
+
+  /**
+   * An actor for a vertex in the graph.
+   */
+  class Vertex (id: String, outdegree: Int, initialMass: Double) extends Actor {
+
+    var neighbors: List[ActorRef] = _
+    var pagerank = initialMass
+    var receivedMass = 0.0
+
+    def receive = {
+
+      // Set the neighbors of this node
+      case SetNeighbors (neighborActorRefs) => 
+        neighbors = neighborActorRefs
+
+      // Send pagerank mass to neighbors
+      case SpreadMass =>
+        if (outdegree == 0) {
+          self reply pagerank
+        } else {
+          val amountPerNeighbor = pagerank / outdegree
+          neighbors.foreach(_ ! TakeMass(amountPerNeighbor))
+          self reply 0.0
+        }
+
+      // Accumulate mass received from neighbors
+      case TakeMass(contribution) =>
+        receivedMass += contribution
+
+      // Update pagerank value based on received mass, jump factor,
+      // and redistributed mass from dangling nodes.
+      case Update(jumpTimesUniform, oneMinusJumpFactor, redistributedMass) =>
+        val updatedPagerank = 
+          jumpTimesUniform + oneMinusJumpFactor*(redistributedMass + receivedMass)
+        val diff = math.abs(pagerank-updatedPagerank)
+        pagerank = updatedPagerank
+        receivedMass = 0.0
+        self reply ((pagerank, diff))
+
+      // Return one's id and current pagerank
+      case GetPageRank =>
+        self reply ((id, pagerank))
+
+    }
+  }
+
+  /**
+   * A time keeper that makes sure that each iteration is distinct from the next, by
+   * ensuring that the graph controller (MadGraph actor) doesn't broadcast the next
+   * label pushing event until all vertices are done updating.
+   */
+  class Clock (maxSteps: Int) extends Actor {
+    var currentStep = 0
+
+    def receive = {
+      case Advance(worker) =>
+        currentStep += 1
+        println("Step: " + currentStep)
+        if (currentStep == maxSteps) {
+          worker ! Stop
+          self.stop()
+        } else {
+          worker ! NextStep
+        }
+    }
+
+  }
+
+}
+
+