Sunday, December 14, 2014

System Design: Introduction to Giraph

1. What is Giraph?
Apache Giraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google and described in a 2010 paper. Both systems are inspired by the Bulk Synchronous Parallel model of distributed computation introduced by Leslie Valiant. Giraph adds several features beyond the basic Pregel model, including master computation, sharded aggregators, edge-oriented input, out-of-core computation, and more. With a steady development cycle and a growing community of users worldwide, Giraph is a natural choice for unleashing the potential of structured datasets at a massive scale. 

2. Existing Solutions:
  1. MapReduce in Hadoop
    1. Classic map reduce overhead (job startup / shutdown, reloading data from HDFS, shuffling)
    2. Immediate results are stored in to disks. Too much disk accesses. 
    3. MapReduce programming model not a good fit for graph algorithms. 
  2. Google's Pregel:
    1. Requires its own computing infrastructure
    2. Not open sourced
  3. MPI
    1. not fault-tolerant 
    2. Too generic
3. MPI v.s Giraph

  • Giraph vertex centric API is higher level (and narrower) than MPI
    • Vertex message queue v.s process level messaging  
    • Data distribution, check-pointing handled by Giraph 
    • Giraph aggreators are user level MPI_Allreduce reductions.
    • Scheduled via Hadoop clusters.

3. Bulk Synchronous Parallel Moel (BSP):
  • Computation consists of a series of super steps
    • Superstep is an atomic unit of computation where operations can happen in parallel
    • During a superstep, components are assigned tasks and receive unordered messages from previous superstep. 

  • Components communicate through point-to-point messaging.
  • All (or a subset of) components can be synchronized through the superstep concept

4. Giraph Architecture Overview

5. Three applications:
  • Label propagation
  • page rank
  • k-means clustering
6. Hive IO

7. Components:
Overall: split input data => VertexSplit => VertexRange
  • Master
    • Only one active master at a time.
    • Runs the VertexInputFormat.getSplits() to get the appropriate number of VertexSplit objects for the application and writes it into the zookeeper. (Divide the input data into VertexSplit objects and serialize to ZooKeeper). 
    • Coordinates applications
      • Synchronize supersteps, end of application
      • Handles changes that occurs within supersteps (i.e. vertex movement, change in number of workers, etc.)

  • Worker 
    • Reads vertices from one or more VertexSplit objects, further splitting them into smaller VertexRanges 
    • The main reason for splitting VertexSplit into smaller VertexRange objects is to create hierarchy, which is easier to move from one worker to another 
    •  Execute the computes() method for every Vertex it is assigned once per superstep
    • Buffers the incoming messages to every Vertex it is assigned for next superstep

  • ZooKeeper
    • Manage a server that is a part of the ZooKeeper quorum (maintains global application state )
8. Fault Tolerance

  • No single point of failure from BSP threads
    • With multiple master threads, if the current master dies, a new one will automatically take over. 
    • If a work thread dies, the application is rolled back to previously checked pointer superstep, the next superstep will begin with new amount of workers. 
  • Hadoop single point of failure still exist. 
    • Namenode, jobtracker.
    • Restarting manually from a checkpoint is always possible.
  • Master thread fault tolerance
    • A queue of master thread
    • One active master, with spare masters taking over the event of an active master failure
    • All active master state is stored in ZooKeeper so that a spare master can immediately step in where an active master fails.
    • "Active " master implemented as a queue in ZooKeeper
  • Worker threads fault tolerance
    • A single worker failure causes the superstep to fail
    • In order to disrupt the application, a worker must be registered and chosen for the current super step.
    • Application reverts the last committed superstep automatically 
      • Master detects worker failure during any superstep through the loss of a ZooKeeper "health" znode.
      • Master choose the last commited supersteps and sends a command through ZooKeeper for all worker to restart from that superstep. 

9. Optional Features
  • Combiner
    • Similar to Map Reduce combiners 

    • Combiner: The pipeline showed earlier omits a processing step which can be used for optimizing bandwidth usage by your MapReduce job. Called the Combiner, this pass runs after the Mapper and before the Reducer. Usage of the Combiner is optional. If this pass is suitable for your job, instances of the Combiner class are run on every node that has run map tasks. The Combiner will receive as input all data emitted by the Mapper instances on a given node. The output from the Combiner is then sent to the Reducers, instead of the output from the Mappers. The Combiner is a "mini-reduce" process which operates only on data generated by one machine.

    • Users implement a combine() method that can reduce the amount of messages sent and received
    • Run on both server and client side
      • Client side saves memory and message traffic
      • Server side saves memory
  • Aggregrator 
    • Similar to MPI aggregation routines 
    • Users write their own aggreators
    • Communicative and associate operations are performed globally 

No comments:

Post a Comment