Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D124510330 for ; Wed, 5 Mar 2014 11:35:26 +0000 (UTC) Received: (qmail 57000 invoked by uid 500); 5 Mar 2014 11:35:26 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 56953 invoked by uid 500); 5 Mar 2014 11:35:26 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 56945 invoked by uid 99); 5 Mar 2014 11:35:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Mar 2014 11:35:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Mar 2014 11:35:21 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E513623889BB; Wed, 5 Mar 2014 11:35:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1574446 - in /hama/trunk/src/site/xdoc: hama_bsp_tutorial.xml hama_graph_tutorial.xml Date: Wed, 05 Mar 2014 11:35:01 -0000 To: commits@hama.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140305113501.E513623889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Wed Mar 5 11:35:01 2014 New Revision: 1574446 URL: http://svn.apache.org/r1574446 Log: HAMA-884: Add Combiners and Aggregators API guide to website Modified: hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml hama/trunk/src/site/xdoc/hama_graph_tutorial.xml Modified: hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml URL: http://svn.apache.org/viewvc/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml?rev=1574446&r1=1574445&r2=1574446&view=diff ============================================================================== --- hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml (original) +++ hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml Wed Mar 5 11:35:01 2014 @@ -149,7 +149,7 @@ xsi:schemaLocation="http://maven.apache. peer.sync(); } - +

Synchronization

When all the processes have entered the barrier via the sync() method, @@ -173,7 +173,44 @@ xsi:schemaLocation="http://maven.apache. } } - +

Combiners

+

Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors:

+
+    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) throws IOException,
+        SyncException, InterruptedException {
+
+      for (int i = 0; i < 1000; i++) {
+        peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
+      }
+      peer.sync();
+
+      if (peer.getPeerName().equals(masterTask)) {
+        IntegerMessage received;
+        while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
+          sum += received.getData();
+        }
+      }
+    }
+

If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.

+
+  public static class SumCombiner extends Combiner {
+
+    @Override
+    public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
+      BSPMessageBundle bundle = new BSPMessageBundle();
+      int sum = 0;
+
+      Iterator<BSPMessage> it = messages.iterator();
+      while (it.hasNext()) {
+        sum += ((IntegerMessage) it.next()).getData();
+      }
+
+      bundle.addMessage(new IntegerMessage("Sum", sum));
+      return bundle;
+    }
+
+  }
+

Hama provides several command for BSP job administration:

Modified: hama/trunk/src/site/xdoc/hama_graph_tutorial.xml URL: http://svn.apache.org/viewvc/hama/trunk/src/site/xdoc/hama_graph_tutorial.xml?rev=1574446&r1=1574445&r2=1574446&view=diff ============================================================================== --- hama/trunk/src/site/xdoc/hama_graph_tutorial.xml (original) +++ hama/trunk/src/site/xdoc/hama_graph_tutorial.xml Wed Mar 5 11:35:01 2014 @@ -27,7 +27,7 @@ xsi:schemaLocation="http://maven.apache.

Hama includes the Graph package for vertex-centric graph computations. Hama's Graph package allows you to program Google's Pregel style applications with simple programming interface.

- +

Writing a Hama graph application involves subclassing the predefined Vertex class. Its template arguments define three value types, associated with vertices, edges, and messages.

@@ -41,7 +41,7 @@ xsi:schemaLocation="http://maven.apache.
 
    

The user overrides the Compute() method, which will be executed at each active vertex in every superstep. Predefined Vertex methods allow Compute() to query information about the current vertex and its edges, and to send messages to other vertices. Compute() can inspect the value associated with its vertex via GetValue().

- +

You can create your own VertexReader for your data format by exending org.apache.hama.graph.VertexInputReader class. For example, an sequence file contains a linked list of Vertex, can be parse as following: @@ -63,7 +63,40 @@ xsi:schemaLocation="http://maven.apache. } }

+ +

Sending a message to another vertex that exists on a different machine has some overhead. However if the algorithm doesn't require each message explicitly but a function of it (example sum) then combiners can be used.

+

Write your own Combiner

+

To write your own combiner, you have to extend Combiner class and implement the methods of #combine(Iterable<M> messages). + For more, please see the implementation of MinIntCombiner in org.apache.hama.example.SSSP example.

+ +

Aggregators are a mechanism for global communication, monitoring, and data. Each vertex can provide a value to an aggregator in superstep S, the system combines those values using a reduction operator, and the resulting value is made available to all vertices in superstep S + 1. +

+

Registering aggregators

+

To start using aggregators, you must declare them in your GraphJob:

+
+  HamaConfiguration conf = new HamaConfiguration(new Configuration());
+  GraphJob graphJob = new GraphJob(conf, MyClass.class);
+
+  // To add an average aggregator
+  graphJob.setAggregatorClass(AverageAggregator.class);
+
+  // To add a sum aggregator
+  graphJob.setAggregatorClass(SumAggregator.class);
+

There are multiple different aggregators and you can also make your own. You can look for already implemented aggregators in org.apache.hama.graph package.

+

Start working with aggregators

+

In order to aggregate values from your vertices, use:

+
+  this.aggregate(index,value);
+ +

This method is called from inside each vertex. Though it's not mandatory all vertices to make use of this method. The index parameter of this method is a number that is equivalent to the order of the registered aggregator. (The first registered aggregator has index 0, second has index 1 etc.)

+

Get results

+

Inside your vertex, you can get the results of each aggregator by using the method:

+
+  this.getAggregatedValue(index);
+ +

Write your own aggregators

+

To write your own aggregator, you have to extend AbstractAggregator class and implement the methods of #aggregate(M value) and #getValue(). For more, please see the default implementation of aggregators in org.apache.hama.graph package.

To solve the Page Rank problem using Hama Graph, you can extends the Vertex class to create a PageRankVertex class. In this example, the algorithm described Google's Pregel paper was used. The value of a vertex represents the tentative page rank of the vertex. The graph is intialized with each vertex value equal to 1/numOfVertices. In each of the first 30 supersteps, each vertex sends its tentative page rank along all of its outgoing edges. @@ -77,25 +110,29 @@ From Superstep 1 to 30, each vertex sums @Override public void compute(Iterator<DoubleWritable> messages) throws IOException { + // initialize this vertex to 1 / count of global vertices in this graph if (this.getSuperstepCount() == 0) { - this.setValue(new DoubleWritable(1.0 / (double) this.getNumVertices())); - } - - if (this.getSuperstepCount() >= 1) { + setValue(new DoubleWritable(1.0 / this.getNumVertices())); + } else if (this.getSuperstepCount() >= 1) { double sum = 0; - while (messages.hasNext()) { - DoubleWritable msg = messages.next(); + for (DoubleWritable msg : messages) { sum += msg.get(); } - - double ALPHA = (1 - 0.85) / (double) this.getNumVertices(); - this.setValue(new DoubleWritable(ALPHA + (0.85 * sum))); + double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices(); + setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR))); + aggregate(0, this.getValue()); } - if (this.getSuperstepCount() < this.getMaxIteration()) { - int numEdges = this.getOutEdges().size(); + // if we have not reached our global error yet, then proceed. + DoubleWritable globalError = getAggregatedValue(0); + + if (globalError != null && this.getSuperstepCount() > 2 + && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { + voteToHalt(); + } else { + // in each superstep we are going to send a new rank to our neighbours sendMessageToNeighbors(new DoubleWritable(this.getValue().get() - / numEdges)); + / this.getEdges().size())); } } }