Monday, April 27, 2015

Giraph: Page Rank

Problem Description:
Assign a weight to each node of a graph which represents its relative importance inside the graph. PageRank usually refers to a set of webpages, and tries to measure which ones are the most important in comparison with the rest from the set. The importance of a webpage is measured by the number of incoming links, i.e. references it receives from other webpages.
Here is a very good reference:
http://marsty5.com/2013/05/29/run-example-in-giraph-pagerank/

Code (Giraph):
package org.apache.giraph.examples;

import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;

import java.io.IOException;

/**
 * My simplified Google page rank example.
 */
@Algorithm(
    name = "Page Rank",
    description = "My simplified page rank"
)

public class MyPageRankComputation extends BasicComputation<
    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {

  public static final int MAX_SUPERSTEPS = 2;
  
  @Override
  public void compute(Vertex<LongWritable, DoubleWritable, FloatWritable> vertex, 
      Iterable<DoubleWritable> messages) throws IOException {
  
    if (getSuperstep() >= 1) {
      double sum = 0;
      for (DoubleWritable message : messages) {
        sum += message.get();
      }
      vertex.setValue(new DoubleWritable(sum));
    }

    if (getSuperstep() < MAX_SUPERSTEPS) {
      int numEdges = vertex.getNumEdges();
      DoubleWritable message = new DoubleWritable(vertex.getValue().get() / numEdges);
      for (Edge<LongWritable, FloatWritable> edge: vertex.getEdges()) {
        sendMessage(edge.getTargetVertexId(), message);
      }
      //sendMessageToAllEdges(vertex, message);
    }
    vertex.voteToHalt();
  }  
}

Advanced features of the Giraph: Master compute and aggregators
Some useful link:
http://giraph.apache.org/aggregators.html

According to the Google Pregel's paper: aggregators are a mechanism for global communication, monitoring and data. Each vertex can provide a value to an aggregator in superstep S, the system combines those values using a reduction operator, and the resulting value is made available to all vertices in superstep S + 1. 


What are aggregators?

Aggregators enable global computation in your application. You can use them, for example, to check whether a global condition is satisfied or to keep some statistics.
During a superstep, vertices provide values to aggregators. These values get aggregated by the system and the results become available to all vertices in the following superstep. Providing a value to aggregator is done by calling:
aggregate(aggregatorName, aggregatedValue)
and to get the value aggregated during previous superstep you should call:
getAggregatedValue(aggregatorName)
Aggregator operations should be commutative and associative since there is no guaranteed ordering in which the aggregations will be performed.

Regular vs persistent aggregator

Aggregators come in two flavors: regular and persistent aggregators. The value of a regular aggregator will be reset to the initial value in each superstep, whereas the value of persistent aggregator will live through the application.
As an example, consider LongSumAggregator being used by each vertex adding 1 to it during compute(). If this is a regular aggregator, you'll be able to read the number of vertices in the previous superstep from it. If this is persistent aggregator, it will hold the sum of the number of vertices from all of the previous supersteps.

Registering aggregators

Before using any aggregator, you MUST register it on the master. You can do this by extending (and setting) MasterCompute class, and calling:
registerAggregator(aggregatorName, aggregatorClass)
or:
registerPersistentAggregator(aggregatorName, aggregatorClass)
depending on what kind of aggregator you want to have. You can register aggregators either in MasterCompute.initialize() - in that case the registered aggregator will be available through whole application, or you can do it in MasterCompute.compute() - the aggregator will then be available in current and each of the following supersteps.

Aggregators and MasterCompute

The first thing that gets executed in a superstep is MasterCompute.compute(). In this method you are able to read aggregated values from previous superstep by calling:
getAggregatedValue(aggregatorName)
and you are able to change the value of the aggregator by calling:
setAggregatedValue(aggregatorName, aggregatedValue)

Implementations

In the package org.apache.giraph.aggregators you can find many common aggregators already implemented.
If you need to create your own, you can extend BasicAggregator or implement Aggregator. Methods which you will need to implement are:
aggregate(value)
which describes how to add another value to the aggregator, and:
createInitialValue()
The initial value will be applied to all aggregator objects when they are created. When it's added to an aggregator the value of the aggregator shouldn't change. For example, for sum aggregator this value should be zero, for min aggregator it should be the value corresponding to positive infinity, etc.
If you need some parameters from the configuration in your aggregators, your aggregator class can implement ImmutableClassesGiraphConfigurable.

Advanced options

If you are using multiple threads for computation (giraph.numComputeThreads), you should consider turning on giraph.useThreadLocalAggregators option. Using thread local aggregators allows every worker thread to have it's own local aggregator copy, rather than a single aggregator copy for the entire worker. The downside of this approach is that it will use more memory - you'll have several copies of each of the aggregators. So if you have a lot of aggregators, or aggregated values are very large objects, this option could be bad. But otherwise, it will likely speed up your application since it will remove the need to perform synchronization when aggregating values.

Implementation details

During a superstep, values provided to the aggregators are being aggregated to some worker-local aggregator objects. In the end of the superstep, all of these values need to be aggregated together, given to the master, and after MasterCompute.compute is performed, distributed back to all the workers. In applications which use a lot of aggregators, if all the aggregations were to be done on master, this could cause a serious bottleneck, both from the computation and network communication standpoint, because master would be receiving, processing and sending (number of workers * total size of aggregators) amount of data. This was the motivation for implementing sharded aggregators in Giraph.
In the end of the superstep, each aggregator is assigned to one of the workers, and that worker receives and aggregates values for that aggregator from all other workers. Then worker sends all its aggregators to master, so master doesn't have to perform any aggregation, and receives only final value for each of the aggregators. After MasterCompute.compute, master doesn't do the distribution of all aggregators to all workers, but aggregators again have their owners. Master only sends each aggregator to its owner, and then each worker distributes the aggregators which it owns to all other workers.

Code (Java):
package org.apache.giraph.examples;

import org.apache.giraph.aggregators.DoubleMaxAggregator;
import org.apache.giraph.aggregators.DoubleMinAggregator;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;

import com.google.common.collect.Lists;

import java.io.IOException;
import java.util.List;
public class MySimplePageRankComputationWithAggregator extends BasicComputation<LongWritable,
    DoubleWritable, FloatWritable, DoubleWritable> {
  /** Number of supersteps for this test */
  public static final int MAX_SUPERSTEPS = 30;
  /** Logger */
  private static final Logger LOG = 
      Logger.getLogger(MySimplePageRankComputationWithAggregator.class);
  /** Sum aggregator name */
  private static String SUM_AGG = "sum";
  /** Min aggregator name */
  private static String MIN_AGG = "min";
  /** Max aggregator name */
  private static String MAX_AGG = "max";

  @Override
  public void compute(
      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex, 
      Iterable<DoubleWritable> messages) throws IOException {
    if (getSuperstep() > 0) {
      double sum = 0;
      for (DoubleWritable message : messages) {
        sum += message.get();
      }
      DoubleWritable vertexValue = 
          new DoubleWritable(0.15f / getTotalNumVertices() + 0.85f * sum);
      vertex.setValue(vertexValue);
      aggregate(MAX_AGG, vertexValue);
      aggregate(MIN_AGG, vertexValue);
      aggregate(SUM_AGG, new LongWritable(1));
      LOG.info(vertex.getId() + ": PageRank=" + vertexValue + 
          " max=" + getAggregatedValue(MAX_AGG) + 
          " min=" + getAggregatedValue(MIN_AGG));
    }

    if (getSuperstep() < MAX_SUPERSTEPS) {
      long edges = vertex.getNumEdges();
      sendMessageToAllEdges(vertex, 
          new DoubleWritable(vertex.getValue().get() / edges));
    } else {
      vertex.voteToHalt();
    }
  }

  /**
   * Master compte 
   * It registers required aggregators
   */
  public static class MySimplePageRankMasterCompute extends 
      DefaultMasterCompute {
    @Override
    public void initialize() throws InstantiationException,
        IllegalAccessException {
      registerAggregator(SUM_AGG, LongSumAggregator.class);
      registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
      registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
    }
  }
}

Input:
[0,1,[[1,1],[3,3]]]
[1,2,[[0,1],[2,2],[3,1]]]
[2,3,[[1,2],[4,4]]]
[3,4,[[0,3],[1,1],[4,4]]]

[4,5,[[3,4],[2,4]]]

Expected output: 
0       0.18589980877086507
2       0.19005494651531296
1       0.2704106097936198
3       0.2703977512806641

4       0.19006780502826862


Thursday, April 23, 2015

Giraph: Single Source Shortest Path (SSSP)

Problem description:
Given a directed graph, find the shortest path from a given source vertex to all connected vertices. Use Dijkstra algorithm suppose all edges are non-negative. 

Example Input:
[0,0,[[1,1],[3,3]]]
[1,0,[[0,1],[2,2],[3,1]]]
[2,0,[[1,2],[4,4]]]
[3,0,[[0,3],[1,1],[4,4]]]
[4,0,[[3,4],[2,4]]]
Each line above has the format [source_id,source_value,[[dest_id, edge_value],...]]. In this graph, there are 5 nodes and 12 directed edge. 

Output:
0       1.0
2       2.0
1       0.0
3       1.0
4       5.0

Understand the problem:
In this problem, we assume the vertex value is the shortest distance from the source vertex to the current vertex. Initially, each vertex value is initialized to MAX. 

In each superstep,  each vertex first receives messages from its neighbors, updated the potential minimum distance from the source vertex. If the minimum value is less than the current vertex value, then the vertex updates its value and send updates to its immediate neighbors. These neighbors in turn will update their values and send messages, resulting in a wavefront of updates through the graph. 

The algorithm terminates when no updates occur, when no messages sent through the graph. Thus all vertexes denotes its status to inactive. Finally the value associated with each vertex is the shortest path from the source vertex. Termination is guaranteed if all edge weights are non-negative, as required by the Dijkstra algorithm.  

Code (Java):
package org.apache.giraph.examples;

import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;

import java.io.IOException;

/**
 * Demonstrates the basic Pregel shortest paths implementation.
 */
@Algorithm(
    name = "Shortest paths",
    description = "Finds all shortest paths from a selected vertex"
)
/**
 *  @param LongWritable I  Vertex Id
 *  @param DoubleWritable V Vertex data
 *  @param FloatWritable E  Edge data
 *  @param DoubleWritable M Message data
 */
public class SimpleShortestPathsComputation extends BasicComputation<
    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
  /** The shortest paths id */
  /**
   * @param String  key 
   * @param long    defaultValue
   * @param String  description
   */
  public static final LongConfOption SOURCE_ID =
      new LongConfOption("SimpleShortestPathsVertex.sourceId", 1,
          "The shortest paths id");

  /** Class logger */
  private static final Logger LOG =
      Logger.getLogger(SimpleShortestPathsComputation.class);

  /**
   * Is this vertex the source id?
   *
   * @param vertex Vertex
   * @return True if the source id
   */
  /**
   * Vertex class
   * @param LongWritavle I Vertex Id
   * @param ?   V          Vertex data
   * @param ?   E          Edge data
   */
  private boolean isSource(Vertex<LongWritable, ?, ?> vertex) {
    return vertex.getId().get() == SOURCE_ID.get(getConf());
  }

  @Override
  /**
   * compte(Vertex<I,V,E> vertex, Iterable<M> messages)
   */
  public void compute(
      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
      Iterable<DoubleWritable> messages) throws IOException {
    if (getSuperstep() == 0) {
      vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
    for (DoubleWritable message : messages) {
      minDist = Math.min(minDist, message.get());
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Vertex " + vertex.getId() + " got minDist = " + minDist +
          " vertex value = " + vertex.getValue());
    }
    if (minDist < vertex.getValue().get()) {
      vertex.setValue(new DoubleWritable(minDist));
      /**
       * Edge class
       * @param LongWritable I Target vertex idex
       * @param FloatWritable E Edge value
       */
      for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
        double distance = minDist + edge.getValue().get();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Vertex " + vertex.getId() + " sent to " +
              edge.getTargetVertexId() + " = " + distance);
        }
        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
      }
    }
    vertex.voteToHalt();
  }
}


Discussion:
We could animate the process of the program execution using the example input for each superstep. 
superstep 0, 
since minDist in Vertex 1 is 0 while the rests are MAX, only the source vertex updates its  value to 0 and sends distance to its neighbors
Each vertex updates its value:
V   vertexVal           
0    INF                
1    INF -> 0            
2    INF                
3    INF                   
4    INF         

Each vertex sends messages to its neighbors:
V    (targetVertexId, distance)
0       -            - 
1    <(0,1) (2,2) (3,1)>    
2       - 
3       - 
4       -

superstep 1, 
Each vertex receives incoming messages from the last superstep:
V   message
0    1 (from Vertex 1)
1    -
2    2 (from Vertex 1)
3    1 (from Vertex 1)
4    -

Each vertex updates its value:
V   vertexVal           
0    INF -> 1            
1     0                  
2    INF -> 2               
3    INF -> 1                 
4    INF              

Each vertex sends messages to its neighbors: 
V    (targetVertexId, distance)
0    (1,2) (3,4)
1      -
2    (1,4) (4,6)  
3    (0,4) (1,2) (4,5) 
4       -

superstep 2: 
Each vertex receives incoming messages from the last superstep:
V    messages
0     4 (from vertex 3)
1     2 (from vertex 0), 4 (from vertex 2), 2 (from vertex 3)
2     -
3     4 (from vertex 0) 
4     6 (from vertex 2), 5 (vertex 3)

Each vertex updates its value:
V    vertexVal
0      1 (discard 4)
1      0 (discard 2, 4, 2)
2      2  (no messages)
3      1  (discard 4)
4     INF -> 5

Each vertex sends messages to its neighbors: 
V    (targetVertexId, distance)
0    -
1    -
2    -
3    -
4    (2,9) (3,9)

superstep 3:
Each vertex receives incoming messages from the last superstep:
V    messages
0    -
1    -
2    9 (from vertex 4)
3    9 (from vertex 4)
4    -

Each vertex updates its value:
V    vertexVal
0    1
1    0
2    2 (discard 9)
3    1 (discard 9)
4    5

Send messages to its neighbors:
V    (targetVertexId, distance)
0     -
1     -
2     -
3     -
4     -

superstep 4:
Terminates since no messages received for all vertexes, all vertexes become inactive. 
Final vertex values:
V    vertexVal
0      1
1      0
2      2
3      1
4      5
Note that the order of the output is not determined. 

Summary:
1. Giraph is based on BSP model, so make sure you understand the running process in each super step. 
2. Some useful APIs:
Vertex class:
/**
 * Vertex class
 * @param I    Vertex Id
 * @param V    Vertex data
 * @param E    Edge data
 */
Interface Vertex<I extends org.apache.hadoop.io.WritableComparable,
    V extends org.apache.hadoop.io.Writable,
    E extends org.apache.hadoop.io.Writable>


// Get a read-only view of the out-edges of this vertex.
Iterable<Edge<I,E>> getEdges();

/** 
 * Return the value of the first edge with the given target vertex id, 
 * or null if there is no such edge. 
 */ 
E getEdgeValue(I targetVertexId);

// Get the vertex id.
I getId();

// Get the vertex value (data stored with vertex)
V getValue();

// Get the number of outgoing edges on this vertex. 
int getNumEdges();

// Set the outgoing edges for this vertex.
void setEdges(Iterable<Edge<I,E>> edges)

// If an edge to the target vertex exists, set it to the given edge value.
void setEdgeValue(I targetVertexId, E edgeValue)

/** Set the vertex data (immediately visible in the computation) */
void setValue(V value)

/** After this is called, the compute() code will no longer be called 
  * for this vertex unless a message is sent to it. 
  */
void voteToHalt()

/** 
  * Is this vertex done? 
  */
boolean isHalted()

Edge class:
/**
 * Edge class
 * @param I Vertex index
 * @param E Edge value 
 */
Interface Edge<I extends org.apache.hadoop.io.WritableComparable,
    E extends org.apache.hadoop.io.Writable>

// Get the target vertex index of this edge
I getTargetVertexId()

// Get the edge value of the edge
E getValue()

BasicComputation class:
/**
 * BasicComputation Class
 * @param I Vertex id
 * @param V Vertex data
 * @param E Edge data
 * @param M Message data
 */
Class BasicComputation<I extends org.apache.hadoop.io.WritableComparable,
    V extends org.apache.hadoop.io.Writable,
    E extends org.apache.hadoop.io.Writable,
    M extends org.apache.hadoop.io.Writable>
    
// Must be defined by user to do computation on a single Vertex.
public abstract void compute(Vertex<I,V,E> vertex,
           Iterable<M1> messages)
           throws IOException
           
// Retrive the current superstep, starting from 0
public long getSuperstep();

Thursday, April 2, 2015

Java: Chapter 4: Everything is an object

Chapter 4: Everything is an object
  1. Java is not "pass-by-reference", Java passes on object reference by values.
  2. Java put objects created by using new into heap.
  3. Primitive data types are in the stack.
  4. Wrapper class: allows to make non-primitive object on the heap to represent that primitive type. 
    1. Why wrapper primitives? Shown in later chapter. 
  5. Autoboxing: automatically converts from primitive types to wrapper class, and verse vera. 
  6. BigInteger, BigDecimal: arbitrary-precision integers or fixed-point numbers. Used to more accurate calculations. 
  7. Array in Java is guaranteed to be initialized and cannot be accessed outside of its range. The range checking comes at price of a small amount of memory overhead on each array, as well as verifying the index at run time. But the assumption is that the safety and increased productivity are worth the expense. 
  8. Garbage collector looks at all the objects that were created with new and figures out which ones are not being referenced anymore. Then it releases the memory for those objects, so the memory can be used for new objects. 
  9. A Java class consists of fields and methods. Fields are sometimes called data members. Methods are sometimes called member functions. 
  10. Primitive type as a data member will be always initialized. However, local variables are not automatically initialized. If you do so, you will get a compile-time error. 
  11. Signature: method name and the argument list are called signature of a method because it could uniquely identify that method. 
  12. static keyword in Java:
    1. There are two situations in which the static is needed. 
      1. Static data members: if you wanna have only a single piece of storage for a particular field, regardless of how many objects of that class that created, or even if no objects are created. 
      2. Static methods: if you need a method that isn't associated with any particular object of this class. That is, you need a method that you can call even if no objects are crated. 
    2. static filed (data members): 
class StaticTest {
    static int i = 47;
}
// Access to the data members can be either of the two ways:
StaticTest st1 = new StaticTest();
st1.i++; // OR
StaticTest.i++; // The second way is more preferred.