hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1574446 - in /hama/trunk/src/site/xdoc: hama_bsp_tutorial.xml hama_graph_tutorial.xml
Date Wed, 05 Mar 2014 11:35:01 GMT
Author: edwardyoon
Date: Wed Mar  5 11:35:01 2014
New Revision: 1574446

URL: http://svn.apache.org/r1574446
Log:
HAMA-884: Add Combiners and Aggregators API guide to website

Modified:
    hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml
    hama/trunk/src/site/xdoc/hama_graph_tutorial.xml

Modified: hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml
URL: http://svn.apache.org/viewvc/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml?rev=1574446&r1=1574445&r2=1574446&view=diff
==============================================================================
--- hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml (original)
+++ hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml Wed Mar  5 11:35:01 2014
@@ -149,7 +149,7 @@ xsi:schemaLocation="http://maven.apache.
 
     peer.sync();
   }</pre>
-    
+  
     <h4>Synchronization</h4>
 
     <p>When all the processes have entered the barrier via the sync() method, 
@@ -173,7 +173,44 @@ xsi:schemaLocation="http://maven.apache.
     }
   }</pre>
   
-  
+    <h4>Combiners</h4>
+    <p>Combiners are used for performing message aggregation to reduce communication
overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and
average at the sender side. Suppose that you want to send the integer messages to a specific
processor from 0 to 1000 and sum all received the integer messages from all processors:</p>
+    <pre>
+    public void bsp(BSPPeer&lt;NullWritable, NullWritable, NullWritable, NullWritable&gt;
peer) throws IOException,
+        SyncException, InterruptedException {
+
+      for (int i = 0; i &lt; 1000; i++) {
+        peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
+      }
+      peer.sync();
+
+      if (peer.getPeerName().equals(masterTask)) {
+        IntegerMessage received;
+        while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
+          sum += received.getData();
+        }
+      }
+    }</pre>
+    <p>If you follow the previous example, Each bsp processor will send a bundle of
thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP
program to perform a sum Integer messages and to write more concise and maintainable as below,
that is why you use Combiners. </p>
+    <pre>
+  public static class SumCombiner extends Combiner {
+
+    @Override
+    public BSPMessageBundle combine(Iterable&lt;BSPMessage&gt; messages) {
+      BSPMessageBundle bundle = new BSPMessageBundle();
+      int sum = 0;
+
+      Iterator&lt;BSPMessage&gt; it = messages.iterator();
+      while (it.hasNext()) {
+        sum += ((IntegerMessage) it.next()).getData();
+      }
+
+      bundle.addMessage(new IntegerMessage("Sum", sum));
+      return bundle;
+    }
+
+  }</pre>
+    
     <subsection name="Shell Command Line Interfaces"></subsection>
     <p>Hama provides several command for BSP job administration:</p>
     

Modified: hama/trunk/src/site/xdoc/hama_graph_tutorial.xml
URL: http://svn.apache.org/viewvc/hama/trunk/src/site/xdoc/hama_graph_tutorial.xml?rev=1574446&r1=1574445&r2=1574446&view=diff
==============================================================================
--- hama/trunk/src/site/xdoc/hama_graph_tutorial.xml (original)
+++ hama/trunk/src/site/xdoc/hama_graph_tutorial.xml Wed Mar  5 11:35:01 2014
@@ -27,7 +27,7 @@ xsi:schemaLocation="http://maven.apache.
     <p>Hama includes the Graph package for vertex-centric graph computations.
     Hama's Graph package allows you to program Google's Pregel style applications with simple
programming interface.</p>
 
-    <subsection name="Vertex API"></subsection>    
+    <subsection name="Vertex"></subsection>    
 
     <p>Writing a Hama graph application involves subclassing the predefined Vertex
class. Its template arguments define three value types, associated with vertices, edges, and
messages.</p>
     <pre>
@@ -41,7 +41,7 @@ xsi:schemaLocation="http://maven.apache.
 
    <p>The user overrides the Compute() method, which will be executed at each active
vertex in every superstep. Predefined Vertex methods allow Compute() to query information
about the current vertex and its edges, and to send messages to other vertices. Compute()
can inspect the value associated with its vertex via GetValue().</p>
 
-   <subsection name="VertexReader API"></subsection>
+   <subsection name="VertexReader"></subsection>
    <p>You can create your own VertexReader for your data format by exending org.apache.hama.graph.<b>VertexInputReader</b>
class.
 
    For example, an sequence file contains a linked list of Vertex, can be parse as following:
@@ -63,7 +63,40 @@ xsi:schemaLocation="http://maven.apache.
     }
   }
 </pre>
+   <subsection name="Combiners"></subsection>
+   <p>Sending a message to another vertex that exists on a different machine has some
overhead. However if the algorithm doesn't require each message explicitly but a function
of it (example sum) then combiners can be used.</p>
+   <h4>Write your own Combiner</h4>
+   <p>To write your own combiner, you have to extend Combiner class and implement the
methods of #combine(Iterable&lt;M&gt; messages). 
+   For more, please see the implementation of MinIntCombiner in org.apache.hama.example.SSSP
example.</p> 
 
+   <subsection name="Aggregators"></subsection>
+   <p>Aggregators are a mechanism for global communication, monitoring, and data. Each
vertex can provide a value to an aggregator in superstep S, the system combines those values
using a reduction operator, and the resulting value is made available to all vertices in superstep
S + 1.
+   </p>
+   <h4>Registering aggregators</h4>
+   <p>To start using aggregators, you must declare them in your GraphJob:</p>
+   <pre>
+  HamaConfiguration conf = new HamaConfiguration(new Configuration());
+  GraphJob graphJob = new GraphJob(conf, MyClass.class);
+
+  // To add an average aggregator
+  graphJob.setAggregatorClass(AverageAggregator.class);
+
+  // To add a sum aggregator
+  graphJob.setAggregatorClass(SumAggregator.class);</pre>
+   <p>There are multiple different aggregators and you can also make your own. You
can look for already implemented aggregators in org.apache.hama.graph package.</p>
+   <h4>Start working with aggregators</h4>
+   <p>In order to aggregate values from your vertices, use:</p> 
+<pre>
+  this.aggregate(index,value);</pre>
+  
+  <p>This method is called from inside each vertex. Though it's not mandatory all vertices
to make use of this method. The index parameter of this method is a number that is equivalent
to the order of the registered aggregator. (The first registered aggregator has index 0, second
has index 1 etc.) </p>
+  <h4>Get results</h4>
+  <p>Inside your vertex, you can get the results of each aggregator by using the method:</p>
+  <pre>
+  this.getAggregatedValue(index);</pre>
+  
+  <h4>Write your own aggregators</h4>
+  <p>To write your own aggregator, you have to extend AbstractAggregator class and
implement the methods of #aggregate(M value) and #getValue(). For more, please see the default
implementation of aggregators in org.apache.hama.graph package.</p>
    <subsection name="Example: PageRankVertex"></subsection>
    <p>To solve the Page Rank problem using Hama Graph, you can extends the Vertex class
to create a PageRankVertex class.
 In this example, the algorithm described Google's Pregel paper was used. The value of a vertex
represents the tentative page rank of the vertex. The graph is intialized with each vertex
value equal to 1/numOfVertices. In each of the first 30 supersteps, each vertex sends its
tentative page rank along all of its outgoing edges.
@@ -77,25 +110,29 @@ From Superstep 1 to 30, each vertex sums
 
     @Override
     public void compute(Iterator&lt;DoubleWritable&gt; messages) throws IOException
{
+      // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
-        this.setValue(new DoubleWritable(1.0 / (double) this.getNumVertices()));
-      }
-
-      if (this.getSuperstepCount() >= 1) {
+        setValue(new DoubleWritable(1.0 / this.getNumVertices()));
+      } else if (this.getSuperstepCount() >= 1) {
         double sum = 0;
-        while (messages.hasNext()) {
-          DoubleWritable msg = messages.next();
+        for (DoubleWritable msg : messages) {
           sum += msg.get();
         }
-
-        double ALPHA = (1 - 0.85) / (double) this.getNumVertices();
-        this.setValue(new DoubleWritable(ALPHA + (0.85 * sum)));
+        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+        setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+        aggregate(0, this.getValue());
       }
 
-      if (this.getSuperstepCount() &lt; this.getMaxIteration()) {
-        int numEdges = this.getOutEdges().size();
+      // if we have not reached our global error yet, then proceed.
+      DoubleWritable globalError = getAggregatedValue(0);
+      
+      if (globalError != null &amp;&amp; this.getSuperstepCount() &gt; 2
+          &amp;&amp; MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
+        voteToHalt();
+      } else {
+        // in each superstep we are going to send a new rank to our neighbours
         sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
-            / numEdges));
+            / this.getEdges().size()));
       }
     }
   }</pre>



Mime
View raw message