Commits

Jason Baldridge committed 8b63a95

Refined actor impl of PageRank to not use a graph actor -- now everything is controlled through the main method.

Comments (0)

Files changed (1)

src/main/scala/fogbow/graph/PageRank.scala

  */
 object PageRank {
 
-  case object NextStep
-  case object Stop
+  // The messages to and between vertices
   case class SetNeighbors(neighbors: List[ActorRef])
   case object SpreadMass
   case class TakeMass(mass: Double)
   case class Update(jumpTimesUniform: Double, oneMinusJumpFactor: Double, mass: Double)
-  case class Advance(worker: ActorRef)
-  case object GetPageRank
-
-  def main (args: Array[String]) {
-    val inputGraphFile = args(0)
-    val jumpFactor = if (args.length > 1) args(1).toDouble else .15
-    val graph = io.Source.fromFile(inputGraphFile).getLines.map {
-      line => line.split("\\t").toList
-    } map {
-      vertices => (vertices.head, vertices.tail)
-    } toList
-    
-    apply(graph, jumpFactor, 5)
-  }
 
   /**
    * Entry point into actor-based PageRank: create the actors for the graph and its
    * vertices and start the work.
    */
-  def apply (graph: List[(String, List[String])], jumpFactor: Double, maxIters: Int) {
-    val clock = actorOf(new Clock(maxIters)).start
-    val prGraph = actorOf(new PageRankGraph(clock, graph, jumpFactor)).start
-    prGraph ! NextStep
-  }
+  def main (args: Array[String]) {
+    val inputGraphFile = args(0)
+    val jumpFactor = if (args.length > 1) args(1).toDouble else .15
+    val maxIters = 50
+    val diffTolerance = 1E-6
 
-  /**
-   * An actor that controls the entire graph. Stores all the vertex actors
-   * and dispatches work for them to do, and communicates with the Clock to
-   * keep each iteration of work separate.
-   */
-  class PageRankGraph (clock: ActorRef, graph: List[(String, List[String])], jumpFactor: Double) 
-  extends Actor {
-
+    // Construct the graph from the source file
+    val graph: List[(String, List[String])] = 
+      io.Source.fromFile(inputGraphFile).getLines.map {
+        line => line.split("\\t").toList
+      } map {
+        vertices => (vertices.head, vertices.tail)
+      } toList
+    
+    // Precompute some values that will be used often for the updates.
     val numVertices = graph.length
     val uniformProbability = 1.0/numVertices
     val jumpTimesUniform = jumpFactor/numVertices
     val oneMinusJumpFactor = 1.0-jumpFactor
 
-    // Create and start the vertex actors
+    // Create and start the vertex actors, and put in a map so we can
+    // get them by ID.
     val idsToActors = graph.map {
       case(vertexId, adjList) => 
-        (vertexId, actorOf(new Vertex(vertexId, adjList.length, uniformProbability)).start)
+        val vertex = actorOf(new Vertex(vertexId, adjList.length, uniformProbability)).start
+        (vertexId, vertex)
     } toMap
 
     // Have each vertex set its actor neighbors
-    graph foreach {
-      case(vertexId, adjacencyList) => {
+    graph.foreach {
+      case(vertexId, adjacencyList) =>
         val adjacentActors = adjacencyList.map(neighborId => idsToActors(neighborId))
         idsToActors(vertexId) ! SetNeighbors(adjacentActors)
-      }
     }
 
-    val vertices = idsToActors.values.toIndexedSeq
+    // The list of vertex actors, used for dispatching messages to all.
+    val vertices = idsToActors.values.toList
 
-    def receive = {
+    var done = false
+    var currentIteration = 1
 
-      // Broadcast to all the vertices that they should push their
-      // labels to their neighbors
-      case NextStep => {
+    while (!done) {
 
-        val totalMissingMass =
-          Future.traverse(vertices)(v => (v ? SpreadMass).mapTo[Double]).map(_.sum).get
+      // Tell all vertices to spread their mass.
+      val totalMissingMass = Future.traverse(vertices)(
+        vertex => (vertex ? SpreadMass).mapTo[Double]).map(_.sum).get
 
-        val perVertexRedistributedMass = totalMissingMass / numVertices
+      val perVertexRedistributedMass = totalMissingMass / numVertices
 
-        val updates = Future.traverse(vertices)(
-          v => (v ? Update(jumpTimesUniform, 
-                           oneMinusJumpFactor, 
-                           perVertexRedistributedMass)).mapTo[(Double, Double)]).get
-        
-        
-        val (pageranks, diffs) = updates.unzip
-        println("Average diff: " + diffs.sum/numVertices)
-        println("Total PageRank mass (should == 1.0): " + pageranks.sum)
-        clock ! Advance(self)
-      }
+      // Tell all vertices to update their pagerank values.
+      val updates = Future.traverse(vertices)(
+        v => (v ? Update(jumpTimesUniform, 
+                         oneMinusJumpFactor, 
+                         perVertexRedistributedMass)).mapTo[((String, Double), Double)]).get
 
-      // Output ten highest ranked vertices and then tell all the
-      // vertices to stop and then stop itself.
-      case Stop => 
-        val pageranks = Future.traverse(vertices)(
-          vertex => (vertex ? GetPageRank).mapTo[(String, Double)]).get
+      val (pageranks, diffs) = updates.unzip
+      val averageDiff = diffs.sum/numVertices
 
+      println("Iteration " + currentIteration + ": average diff == " + averageDiff)
+
+      currentIteration += 1
+
+      if (currentIteration > maxIters || averageDiff < diffTolerance) {
+        done = true
+
+        // Output ten highest ranked vertices
         pageranks.sortBy(_._2).reverse.take(10).foreach {
           case(id, pagerank) => println(pagerank + "\t" + id)
         }
-        
-        vertices.foreach(vertex => vertex ! PoisonPill)
-        self.stop()
+      }
+
     }
+
+    // Tell all the vertices to stop.
+    vertices.foreach(vertex => vertex ! PoisonPill)
   }
 
   /**
         val diff = math.abs(pagerank-updatedPagerank)
         pagerank = updatedPagerank
         receivedMass = 0.0
-        self reply ((pagerank, diff))
-
-      // Return one's id and current pagerank
-      case GetPageRank =>
-        self reply ((id, pagerank))
+        self reply (((id, pagerank), diff))
 
     }
   }
 
-  /**
-   * A time keeper that makes sure that each iteration is distinct from the next, by
-   * ensuring that the graph controller (MadGraph actor) doesn't broadcast the next
-   * label pushing event until all vertices are done updating.
-   */
-  class Clock (maxSteps: Int) extends Actor {
-    var currentStep = 0
-
-    def receive = {
-      case Advance(worker) =>
-        currentStep += 1
-        println("Step: " + currentStep)
-        if (currentStep == maxSteps) {
-          worker ! Stop
-          self.stop()
-        } else {
-          worker ! NextStep
-        }
-    }
-
-  }
-
 }