Source

fogbow / src / main / scala / fogbow / graph / PageRank.scala

package fogbow.graph

import akka.actor.{Actor, ActorRef, PoisonPill}
import akka.dispatch.Future
import Actor._

/**
 * An implementation of PageRank using Akka actors. 
 *
 * @author Jason Baldridge
 */
object PageRank {

  case object NextStep
  case object Stop
  case class SetNeighbors(neighbors: List[ActorRef])
  case object SpreadMass
  case class TakeMass(mass: Double)
  case class Update(jumpTimesUniform: Double, oneMinusJumpFactor: Double, mass: Double)
  case class Advance(worker: ActorRef)
  case object GetPageRank

  def main (args: Array[String]) {
    val inputGraphFile = args(0)
    val jumpFactor = if (args.length > 1) args(1).toDouble else .15
    val graph = io.Source.fromFile(inputGraphFile).getLines.map {
      line => line.split("\\t").toList
    } map {
      vertices => (vertices.head, vertices.tail)
    } toList
    
    apply(graph, jumpFactor, 5)
  }

  /**
   * Entry point into actor-based PageRank: create the actors for the graph and its
   * vertices and start the work.
   */
  def apply (graph: List[(String, List[String])], jumpFactor: Double, maxIters: Int) {
    val clock = actorOf(new Clock(maxIters)).start
    val prGraph = actorOf(new PageRankGraph(clock, graph, jumpFactor)).start
    prGraph ! NextStep
  }

  /**
   * An actor that controls the entire graph. Stores all the vertex actors
   * and dispatches work for them to do, and communicates with the Clock to
   * keep each iteration of work separate.
   */
  class PageRankGraph (clock: ActorRef, graph: List[(String, List[String])], jumpFactor: Double) 
  extends Actor {

    val numVertices = graph.length
    val uniformProbability = 1.0/numVertices
    val jumpTimesUniform = jumpFactor/numVertices
    val oneMinusJumpFactor = 1.0-jumpFactor

    // Create and start the vertex actors
    val idsToActors = graph.map {
      case(vertexId, adjList) => 
        (vertexId, actorOf(new Vertex(vertexId, adjList.length, uniformProbability)).start)
    } toMap

    // Have each vertex set its actor neighbors
    graph foreach {
      case(vertexId, adjacencyList) => {
        val adjacentActors = adjacencyList.map(neighborId => idsToActors(neighborId))
        idsToActors(vertexId) ! SetNeighbors(adjacentActors)
      }
    }

    val vertices = idsToActors.values.toIndexedSeq

    def receive = {

      // Broadcast to all the vertices that they should push their
      // labels to their neighbors
      case NextStep => {

        val totalMissingMass =
          Future.traverse(vertices)(v => (v ? SpreadMass).mapTo[Double]).map(_.sum).get

        val perVertexRedistributedMass = totalMissingMass / numVertices

        val updates = Future.traverse(vertices)(
          v => (v ? Update(jumpTimesUniform, 
                           oneMinusJumpFactor, 
                           perVertexRedistributedMass)).mapTo[(Double, Double)]).get
        
        
        val (pageranks, diffs) = updates.unzip
        println("Average diff: " + diffs.sum/numVertices)
        println("Total PageRank mass (should == 1.0): " + pageranks.sum)
        clock ! Advance(self)
      }

      // Output ten highest ranked vertices and then tell all the
      // vertices to stop and then stop itself.
      case Stop => 
        val pageranks = Future.traverse(vertices)(
          vertex => (vertex ? GetPageRank).mapTo[(String, Double)]).get

        pageranks.sortBy(_._2).reverse.take(10).foreach {
          case(id, pagerank) => println(pagerank + "\t" + id)
        }
        
        vertices.foreach(vertex => vertex ! PoisonPill)
        self.stop()
    }
  }

  /**
   * An actor for a vertex in the graph.
   */
  class Vertex (id: String, outdegree: Int, initialMass: Double) extends Actor {

    var neighbors: List[ActorRef] = _
    var pagerank = initialMass
    var receivedMass = 0.0

    def receive = {

      // Set the neighbors of this node
      case SetNeighbors (neighborActorRefs) => 
        neighbors = neighborActorRefs

      // Send pagerank mass to neighbors
      case SpreadMass =>
        if (outdegree == 0) {
          self reply pagerank
        } else {
          val amountPerNeighbor = pagerank / outdegree
          neighbors.foreach(_ ! TakeMass(amountPerNeighbor))
          self reply 0.0
        }

      // Accumulate mass received from neighbors
      case TakeMass(contribution) =>
        receivedMass += contribution

      // Update pagerank value based on received mass, jump factor,
      // and redistributed mass from dangling nodes.
      case Update(jumpTimesUniform, oneMinusJumpFactor, redistributedMass) =>
        val updatedPagerank = 
          jumpTimesUniform + oneMinusJumpFactor*(redistributedMass + receivedMass)
        val diff = math.abs(pagerank-updatedPagerank)
        pagerank = updatedPagerank
        receivedMass = 0.0
        self reply ((pagerank, diff))

      // Return one's id and current pagerank
      case GetPageRank =>
        self reply ((id, pagerank))

    }
  }

  /**
   * A time keeper that makes sure that each iteration is distinct from the next, by
   * ensuring that the graph controller (MadGraph actor) doesn't broadcast the next
   * label pushing event until all vertices are done updating.
   */
  class Clock (maxSteps: Int) extends Actor {
    var currentStep = 0

    def receive = {
      case Advance(worker) =>
        currentStep += 1
        println("Step: " + currentStep)
        if (currentStep == maxSteps) {
          worker ! Stop
          self.stop()
        } else {
          worker ! NextStep
        }
    }

  }

}