Thursday, April 23, 2015

Giraph: Single Source Shortest Path (SSSP)

Problem description:
Given a directed graph, find the shortest path from a given source vertex to all connected vertices. Use Dijkstra algorithm suppose all edges are non-negative. 

Example Input:
[0,0,[[1,1],[3,3]]]
[1,0,[[0,1],[2,2],[3,1]]]
[2,0,[[1,2],[4,4]]]
[3,0,[[0,3],[1,1],[4,4]]]
[4,0,[[3,4],[2,4]]]
Each line above has the format [source_id,source_value,[[dest_id, edge_value],...]]. In this graph, there are 5 nodes and 12 directed edge. 

Output:
0       1.0
2       2.0
1       0.0
3       1.0
4       5.0

Understand the problem:
In this problem, we assume the vertex value is the shortest distance from the source vertex to the current vertex. Initially, each vertex value is initialized to MAX. 

In each superstep,  each vertex first receives messages from its neighbors, updated the potential minimum distance from the source vertex. If the minimum value is less than the current vertex value, then the vertex updates its value and send updates to its immediate neighbors. These neighbors in turn will update their values and send messages, resulting in a wavefront of updates through the graph. 

The algorithm terminates when no updates occur, when no messages sent through the graph. Thus all vertexes denotes its status to inactive. Finally the value associated with each vertex is the shortest path from the source vertex. Termination is guaranteed if all edge weights are non-negative, as required by the Dijkstra algorithm.  

Code (Java):
package org.apache.giraph.examples;

import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;

import java.io.IOException;

/**
 * Demonstrates the basic Pregel shortest paths implementation.
 */
@Algorithm(
    name = "Shortest paths",
    description = "Finds all shortest paths from a selected vertex"
)
/**
 *  @param LongWritable I  Vertex Id
 *  @param DoubleWritable V Vertex data
 *  @param FloatWritable E  Edge data
 *  @param DoubleWritable M Message data
 */
public class SimpleShortestPathsComputation extends BasicComputation<
    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
  /** The shortest paths id */
  /**
   * @param String  key 
   * @param long    defaultValue
   * @param String  description
   */
  public static final LongConfOption SOURCE_ID =
      new LongConfOption("SimpleShortestPathsVertex.sourceId", 1,
          "The shortest paths id");

  /** Class logger */
  private static final Logger LOG =
      Logger.getLogger(SimpleShortestPathsComputation.class);

  /**
   * Is this vertex the source id?
   *
   * @param vertex Vertex
   * @return True if the source id
   */
  /**
   * Vertex class
   * @param LongWritavle I Vertex Id
   * @param ?   V          Vertex data
   * @param ?   E          Edge data
   */
  private boolean isSource(Vertex<LongWritable, ?, ?> vertex) {
    return vertex.getId().get() == SOURCE_ID.get(getConf());
  }

  @Override
  /**
   * compte(Vertex<I,V,E> vertex, Iterable<M> messages)
   */
  public void compute(
      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
      Iterable<DoubleWritable> messages) throws IOException {
    if (getSuperstep() == 0) {
      vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
    for (DoubleWritable message : messages) {
      minDist = Math.min(minDist, message.get());
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Vertex " + vertex.getId() + " got minDist = " + minDist +
          " vertex value = " + vertex.getValue());
    }
    if (minDist < vertex.getValue().get()) {
      vertex.setValue(new DoubleWritable(minDist));
      /**
       * Edge class
       * @param LongWritable I Target vertex idex
       * @param FloatWritable E Edge value
       */
      for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
        double distance = minDist + edge.getValue().get();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Vertex " + vertex.getId() + " sent to " +
              edge.getTargetVertexId() + " = " + distance);
        }
        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
      }
    }
    vertex.voteToHalt();
  }
}


Discussion:
We could animate the process of the program execution using the example input for each superstep. 
superstep 0, 
since minDist in Vertex 1 is 0 while the rests are MAX, only the source vertex updates its  value to 0 and sends distance to its neighbors
Each vertex updates its value:
V   vertexVal           
0    INF                
1    INF -> 0            
2    INF                
3    INF                   
4    INF         

Each vertex sends messages to its neighbors:
V    (targetVertexId, distance)
0       -            - 
1    <(0,1) (2,2) (3,1)>    
2       - 
3       - 
4       -

superstep 1, 
Each vertex receives incoming messages from the last superstep:
V   message
0    1 (from Vertex 1)
1    -
2    2 (from Vertex 1)
3    1 (from Vertex 1)
4    -

Each vertex updates its value:
V   vertexVal           
0    INF -> 1            
1     0                  
2    INF -> 2               
3    INF -> 1                 
4    INF              

Each vertex sends messages to its neighbors: 
V    (targetVertexId, distance)
0    (1,2) (3,4)
1      -
2    (1,4) (4,6)  
3    (0,4) (1,2) (4,5) 
4       -

superstep 2: 
Each vertex receives incoming messages from the last superstep:
V    messages
0     4 (from vertex 3)
1     2 (from vertex 0), 4 (from vertex 2), 2 (from vertex 3)
2     -
3     4 (from vertex 0) 
4     6 (from vertex 2), 5 (vertex 3)

Each vertex updates its value:
V    vertexVal
0      1 (discard 4)
1      0 (discard 2, 4, 2)
2      2  (no messages)
3      1  (discard 4)
4     INF -> 5

Each vertex sends messages to its neighbors: 
V    (targetVertexId, distance)
0    -
1    -
2    -
3    -
4    (2,9) (3,9)

superstep 3:
Each vertex receives incoming messages from the last superstep:
V    messages
0    -
1    -
2    9 (from vertex 4)
3    9 (from vertex 4)
4    -

Each vertex updates its value:
V    vertexVal
0    1
1    0
2    2 (discard 9)
3    1 (discard 9)
4    5

Send messages to its neighbors:
V    (targetVertexId, distance)
0     -
1     -
2     -
3     -
4     -

superstep 4:
Terminates since no messages received for all vertexes, all vertexes become inactive. 
Final vertex values:
V    vertexVal
0      1
1      0
2      2
3      1
4      5
Note that the order of the output is not determined. 

Summary:
1. Giraph is based on BSP model, so make sure you understand the running process in each super step. 
2. Some useful APIs:
Vertex class:
/**
 * Vertex class
 * @param I    Vertex Id
 * @param V    Vertex data
 * @param E    Edge data
 */
Interface Vertex<I extends org.apache.hadoop.io.WritableComparable,
    V extends org.apache.hadoop.io.Writable,
    E extends org.apache.hadoop.io.Writable>


// Get a read-only view of the out-edges of this vertex.
Iterable<Edge<I,E>> getEdges();

/** 
 * Return the value of the first edge with the given target vertex id, 
 * or null if there is no such edge. 
 */ 
E getEdgeValue(I targetVertexId);

// Get the vertex id.
I getId();

// Get the vertex value (data stored with vertex)
V getValue();

// Get the number of outgoing edges on this vertex. 
int getNumEdges();

// Set the outgoing edges for this vertex.
void setEdges(Iterable<Edge<I,E>> edges)

// If an edge to the target vertex exists, set it to the given edge value.
void setEdgeValue(I targetVertexId, E edgeValue)

/** Set the vertex data (immediately visible in the computation) */
void setValue(V value)

/** After this is called, the compute() code will no longer be called 
  * for this vertex unless a message is sent to it. 
  */
void voteToHalt()

/** 
  * Is this vertex done? 
  */
boolean isHalted()

Edge class:
/**
 * Edge class
 * @param I Vertex index
 * @param E Edge value 
 */
Interface Edge<I extends org.apache.hadoop.io.WritableComparable,
    E extends org.apache.hadoop.io.Writable>

// Get the target vertex index of this edge
I getTargetVertexId()

// Get the edge value of the edge
E getValue()

BasicComputation class:
/**
 * BasicComputation Class
 * @param I Vertex id
 * @param V Vertex data
 * @param E Edge data
 * @param M Message data
 */
Class BasicComputation<I extends org.apache.hadoop.io.WritableComparable,
    V extends org.apache.hadoop.io.Writable,
    E extends org.apache.hadoop.io.Writable,
    M extends org.apache.hadoop.io.Writable>
    
// Must be defined by user to do computation on a single Vertex.
public abstract void compute(Vertex<I,V,E> vertex,
           Iterable<M1> messages)
           throws IOException
           
// Retrive the current superstep, starting from 0
public long getSuperstep();

No comments:

Post a Comment