Monday, April 27, 2015

Giraph: Page Rank

Problem Description:
Assign a weight to each node of a graph which represents its relative importance inside the graph. PageRank usually refers to a set of webpages, and tries to measure which ones are the most important in comparison with the rest from the set. The importance of a webpage is measured by the number of incoming links, i.e. references it receives from other webpages.
Here is a very good reference:
http://marsty5.com/2013/05/29/run-example-in-giraph-pagerank/

Code (Giraph):
package org.apache.giraph.examples;

import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;

import java.io.IOException;

/**
 * My simplified Google page rank example.
 */
@Algorithm(
    name = "Page Rank",
    description = "My simplified page rank"
)

public class MyPageRankComputation extends BasicComputation<
    LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {

  public static final int MAX_SUPERSTEPS = 2;
  
  @Override
  public void compute(Vertex<LongWritable, DoubleWritable, FloatWritable> vertex, 
      Iterable<DoubleWritable> messages) throws IOException {
  
    if (getSuperstep() >= 1) {
      double sum = 0;
      for (DoubleWritable message : messages) {
        sum += message.get();
      }
      vertex.setValue(new DoubleWritable(sum));
    }

    if (getSuperstep() < MAX_SUPERSTEPS) {
      int numEdges = vertex.getNumEdges();
      DoubleWritable message = new DoubleWritable(vertex.getValue().get() / numEdges);
      for (Edge<LongWritable, FloatWritable> edge: vertex.getEdges()) {
        sendMessage(edge.getTargetVertexId(), message);
      }
      //sendMessageToAllEdges(vertex, message);
    }
    vertex.voteToHalt();
  }  
}

Advanced features of the Giraph: Master compute and aggregators
Some useful link:
http://giraph.apache.org/aggregators.html

According to the Google Pregel's paper: aggregators are a mechanism for global communication, monitoring and data. Each vertex can provide a value to an aggregator in superstep S, the system combines those values using a reduction operator, and the resulting value is made available to all vertices in superstep S + 1. 


What are aggregators?

Aggregators enable global computation in your application. You can use them, for example, to check whether a global condition is satisfied or to keep some statistics.
During a superstep, vertices provide values to aggregators. These values get aggregated by the system and the results become available to all vertices in the following superstep. Providing a value to aggregator is done by calling:
aggregate(aggregatorName, aggregatedValue)
and to get the value aggregated during previous superstep you should call:
getAggregatedValue(aggregatorName)
Aggregator operations should be commutative and associative since there is no guaranteed ordering in which the aggregations will be performed.

Regular vs persistent aggregator

Aggregators come in two flavors: regular and persistent aggregators. The value of a regular aggregator will be reset to the initial value in each superstep, whereas the value of persistent aggregator will live through the application.
As an example, consider LongSumAggregator being used by each vertex adding 1 to it during compute(). If this is a regular aggregator, you'll be able to read the number of vertices in the previous superstep from it. If this is persistent aggregator, it will hold the sum of the number of vertices from all of the previous supersteps.

Registering aggregators

Before using any aggregator, you MUST register it on the master. You can do this by extending (and setting) MasterCompute class, and calling:
registerAggregator(aggregatorName, aggregatorClass)
or:
registerPersistentAggregator(aggregatorName, aggregatorClass)
depending on what kind of aggregator you want to have. You can register aggregators either in MasterCompute.initialize() - in that case the registered aggregator will be available through whole application, or you can do it in MasterCompute.compute() - the aggregator will then be available in current and each of the following supersteps.

Aggregators and MasterCompute

The first thing that gets executed in a superstep is MasterCompute.compute(). In this method you are able to read aggregated values from previous superstep by calling:
getAggregatedValue(aggregatorName)
and you are able to change the value of the aggregator by calling:
setAggregatedValue(aggregatorName, aggregatedValue)

Implementations

In the package org.apache.giraph.aggregators you can find many common aggregators already implemented.
If you need to create your own, you can extend BasicAggregator or implement Aggregator. Methods which you will need to implement are:
aggregate(value)
which describes how to add another value to the aggregator, and:
createInitialValue()
The initial value will be applied to all aggregator objects when they are created. When it's added to an aggregator the value of the aggregator shouldn't change. For example, for sum aggregator this value should be zero, for min aggregator it should be the value corresponding to positive infinity, etc.
If you need some parameters from the configuration in your aggregators, your aggregator class can implement ImmutableClassesGiraphConfigurable.

Advanced options

If you are using multiple threads for computation (giraph.numComputeThreads), you should consider turning on giraph.useThreadLocalAggregators option. Using thread local aggregators allows every worker thread to have it's own local aggregator copy, rather than a single aggregator copy for the entire worker. The downside of this approach is that it will use more memory - you'll have several copies of each of the aggregators. So if you have a lot of aggregators, or aggregated values are very large objects, this option could be bad. But otherwise, it will likely speed up your application since it will remove the need to perform synchronization when aggregating values.

Implementation details

During a superstep, values provided to the aggregators are being aggregated to some worker-local aggregator objects. In the end of the superstep, all of these values need to be aggregated together, given to the master, and after MasterCompute.compute is performed, distributed back to all the workers. In applications which use a lot of aggregators, if all the aggregations were to be done on master, this could cause a serious bottleneck, both from the computation and network communication standpoint, because master would be receiving, processing and sending (number of workers * total size of aggregators) amount of data. This was the motivation for implementing sharded aggregators in Giraph.
In the end of the superstep, each aggregator is assigned to one of the workers, and that worker receives and aggregates values for that aggregator from all other workers. Then worker sends all its aggregators to master, so master doesn't have to perform any aggregation, and receives only final value for each of the aggregators. After MasterCompute.compute, master doesn't do the distribution of all aggregators to all workers, but aggregators again have their owners. Master only sends each aggregator to its owner, and then each worker distributes the aggregators which it owns to all other workers.

Code (Java):
package org.apache.giraph.examples;

import org.apache.giraph.aggregators.DoubleMaxAggregator;
import org.apache.giraph.aggregators.DoubleMinAggregator;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;

import com.google.common.collect.Lists;

import java.io.IOException;
import java.util.List;
public class MySimplePageRankComputationWithAggregator extends BasicComputation<LongWritable,
    DoubleWritable, FloatWritable, DoubleWritable> {
  /** Number of supersteps for this test */
  public static final int MAX_SUPERSTEPS = 30;
  /** Logger */
  private static final Logger LOG = 
      Logger.getLogger(MySimplePageRankComputationWithAggregator.class);
  /** Sum aggregator name */
  private static String SUM_AGG = "sum";
  /** Min aggregator name */
  private static String MIN_AGG = "min";
  /** Max aggregator name */
  private static String MAX_AGG = "max";

  @Override
  public void compute(
      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex, 
      Iterable<DoubleWritable> messages) throws IOException {
    if (getSuperstep() > 0) {
      double sum = 0;
      for (DoubleWritable message : messages) {
        sum += message.get();
      }
      DoubleWritable vertexValue = 
          new DoubleWritable(0.15f / getTotalNumVertices() + 0.85f * sum);
      vertex.setValue(vertexValue);
      aggregate(MAX_AGG, vertexValue);
      aggregate(MIN_AGG, vertexValue);
      aggregate(SUM_AGG, new LongWritable(1));
      LOG.info(vertex.getId() + ": PageRank=" + vertexValue + 
          " max=" + getAggregatedValue(MAX_AGG) + 
          " min=" + getAggregatedValue(MIN_AGG));
    }

    if (getSuperstep() < MAX_SUPERSTEPS) {
      long edges = vertex.getNumEdges();
      sendMessageToAllEdges(vertex, 
          new DoubleWritable(vertex.getValue().get() / edges));
    } else {
      vertex.voteToHalt();
    }
  }

  /**
   * Master compte 
   * It registers required aggregators
   */
  public static class MySimplePageRankMasterCompute extends 
      DefaultMasterCompute {
    @Override
    public void initialize() throws InstantiationException,
        IllegalAccessException {
      registerAggregator(SUM_AGG, LongSumAggregator.class);
      registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
      registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
    }
  }
}

Input:
[0,1,[[1,1],[3,3]]]
[1,2,[[0,1],[2,2],[3,1]]]
[2,3,[[1,2],[4,4]]]
[3,4,[[0,3],[1,1],[4,4]]]

[4,5,[[3,4],[2,4]]]

Expected output: 
0       0.18589980877086507
2       0.19005494651531296
1       0.2704106097936198
3       0.2703977512806641

4       0.19006780502826862


1 comment:

  1. Hello I'm a beginner in giraph and I'm interested lately in how to define and use aggregators.
    I saw your program but I didn't understand "sum", "min" and "max". Have they been defined elsewhere or are they predefined in giraph?

    ReplyDelete