incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hama Wiki] Update of "BSPModel" by thomasjungblut
Date Fri, 24 Feb 2012 16:16:40 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.

The "BSPModel" page has been changed by thomasjungblut:
http://wiki.apache.org/hama/BSPModel?action=diff&rev1=16&rev2=17

  
  You can implement your own outputformat. It is similar to Hadoop MapReduce's output formats,
so you can use existing literature to get into it.
  
+ = Communication Model =
+ 
+ Within the bsp() function, you can use the powerful communication functions for many purposes
using BSPPeer. We tried to follow the standard library of BSP world as much as possible. The
following table describes all the functions you can use:
+ 
+ ||Function||Description||
+ ||send(String peerName, BSPMessage msg)||Send a message to another peer.||
+ ||getCurrentMessage()||Get a received message from the queue.||
+ ||getNumCurrentMessages()||Get the number of messages currently in the queue.||
+ ||sync()||Starts the barrier synchronization.||
+ ||getPeerName()||Get the peer name of this task.||
+ ||getPeerName(int index)||Gets the n-th peer name.||
+ ||getNumPeers()||Get the number of peers.||
+ ||getAllPeerNames()||Get all peer names (including "this" task). (Hint: These are always
sorted in ascending order)||
+ 
+ The send() and all the other functions are very flexible. Here is an example that sends
a message to all peers:
+ 
+ {{{
+     @Override
+     public void bsp(
+         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+         throws IOException, SyncException, InterruptedException {
+ 
+       for (String peerName : peer.getAllPeerNames()) {
+         peer.send(peerName, 
+           new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis()));
+       }
+ 
+       peer.sync();
+     }
+ }}}
+ 
+ 
+ = Synchronization =
+ 
+ When all the processes have entered the barrier via the sync() function, the Hama proceeds
to the next superstep. In the previous example, the BSP job will be finished by one synchronization
after sending a message "Hello from ..." to all peers.
+ 
+ But, keep in mind that the sync() function is not the end of the BSP job. As was previously
mentioned, all the communication functions are very flexible. For example, the sync() function
also can be called in a for loop so that you can use to program the iterative methods sequentially:
+ 
+ {{{
+     @Override
+     public void bsp(
+         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
+         throws IOException, SyncException, InterruptedException {
+ 
+       for (int i = 0; i < 100; i++) {
+         // send some messages
+         peer.sync();
+       }
+ 
+     }
+ }}}
+ 
+ The BSP job will be finished only when all processes have no more local and outgoing queues
entries and all processes done or is killed by the user.
+ 
+ = Counters =
+ 
+ Just like in Hadoop MapReduce you can use Counters.
+ 
+ Counters are basically enums that you can only increment. You can use them to track meaningful
metrics in your code, e.G. how often a loop has been executed.
+ 
+ From your BSP code you can use counters like this:
+ 
+ {{{
+     // enum definition
+     enum LoopCounter{
+       LOOPS
+     }
+ 
+     @Override
+     public void bsp(
+         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
peer)
+         throws IOException, SyncException, InterruptedException {
+       for (int i = 0; i < iterations; i++) {
+         // details ommitted
+         peer.getCounter(LoopCounter.LOOPS).increment(1L);
+       }
+       // rest ommitted
+     }
+ }}}
+ 
+ Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync
phase. Watch [[https://issues.apache.org/jira/browse/HAMA-515|HAMA-515]] for details.
+ 
+ = Setup and Cleanup =
+ 
+ 
+ Since 0.4.0 you can use Setup and Cleanup methods in your BSP code. 
+ They can be inherited from BSP class like this:
+ 
+ {{{
+  public class MyEstimator extends
+       BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
+ 
+     @Override
+     public void setup(
+         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
peer)
+         throws IOException {
+       //Setup: Choose one as a master
+       this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
+     }
+ 
+     @Override
+     public void cleanup(
+         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
peer)
+         throws IOException {
+       // your cleanup here
+     }
+ 
+     @Override
+     public void bsp(
+         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
peer)
+         throws IOException, SyncException, InterruptedException {
+       // your computation here
+     }
+   } 
+ }}}
+ 
+ Setup is called before bsp method, and cleanup is executed at the end after bsp.
+ You can do everything in setup and cleanup: sync, send, increment counters, write output
or even read from the input.
+ 
+ = Combiners =
+ Combiners are used for performing message aggregation to reduce communication overhead in
cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the
sender side. Suppose that you want to send the integer messages to a specific processor from
0 to 1000 and sum all received the integer messages from all processors.
+ 
+ {{{
+     public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable>
peer) throws IOException,
+         SyncException, InterruptedException {
+ 
+       for (int i = 0; i < 1000; i++) {
+     	peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
+       }
+       peer.sync();
+ 
+       if (peer.getPeerName().equals(masterTask)) {
+         IntegerMessage received;
+         while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
+           sum += received.getData();
+         }
+       }
+     }
+ }}}
+ 
+ If you follow the previous example, Each bsp processor will send a bundle of thousand Integer
messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform
a sum Integer messages and to write more concise and maintainable as below, that is why you
use Combiners.
+ 
+ {{{
+   public static class SumCombiner extends Combiner {
+ 
+     @Override
+     public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
+       BSPMessageBundle bundle = new BSPMessageBundle();
+       int sum = 0;
+ 
+       Iterator<BSPMessage> it = messages.iterator();
+       while (it.hasNext()) {
+         sum += ((IntegerMessage) it.next()).getData();
+       }
+ 
+       bundle.addMessage(new IntegerMessage("Sum", sum));
+       return bundle;
+     }
+ 
+   }
+ 
+ }}}
+ 
+ 
+ 
- == Implementation notes ==
+ = Implementation notes =
  
- === Internal implementation details ===
+ == Internal implementation details ==
  
  BSPJobClient
   
@@ -186, +351 @@

   1. Gets his split from Groom
   2. Initializes everything in BSPPeerImpl
  
- = Communication Model =
- 
- Within the bsp() function, you can use the powerful communication functions for many purposes
using BSPPeer. We tried to follow the standard library of BSP world as much as possible. The
following table describes all the functions you can use:
- 
- ||Function||Description||
- ||send(String peerName, BSPMessage msg)||Send a message to another peer.||
- ||getCurrentMessage()||Get a received message from the queue.||
- ||getNumCurrentMessages()||Get the number of messages currently in the queue.||
- ||sync()||Starts the barrier synchronization.||
- ||getPeerName()||Get the peer name of this task.||
- ||getPeerName(int index)||Gets the n-th peer name.||
- ||getNumPeers()||Get the number of peers.||
- ||getAllPeerNames()||Get all peer names (including "this" task). (Hint: These are always
sorted in ascending order)||
- 
- The send() and all the other functions are very flexible. Here is an example that sends
a message to all peers:
- 
- {{{
-     @Override
-     public void bsp(
-         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
-         throws IOException, SyncException, InterruptedException {
- 
-       for (String peerName : peer.getAllPeerNames()) {
-         peer.send(peerName, 
-           new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis()));
-       }
- 
-       peer.sync();
-     }
- }}}
- 
- 
- = Synchronization =
- 
- When all the processes have entered the barrier via the sync() function, the Hama proceeds
to the next superstep. In the previous example, the BSP job will be finished by one synchronization
after sending a message "Hello from ..." to all peers.
- 
- But, keep in mind that the sync() function is not the end of the BSP job. As was previously
mentioned, all the communication functions are very flexible. For example, the sync() function
also can be called in a for loop so that you can use to program the iterative methods sequentially:
- 
- {{{
-     @Override
-     public void bsp(
-         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
-         throws IOException, SyncException, InterruptedException {
- 
-       for (int i = 0; i < 100; i++) {
-         // send some messages
-         peer.sync();
-       }
- 
-     }
- }}}
- 
- The BSP job will be finished only when all processes have no more local and outgoing queues
entries and all processes done or is killed by the user.
- 
- = Counters =
- 
- Just like in Hadoop MapReduce you can use Counters.
- 
- Counters are basically enums that you can only increment. You can use them to track meaningful
metrics in your code, e.G. how often a loop has been executed.
- 
- From your BSP code you can use counters like this:
- 
- {{{
-     // enum definition
-     enum LoopCounter{
-       LOOPS
-     }
- 
-     @Override
-     public void bsp(
-         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
peer)
-         throws IOException, SyncException, InterruptedException {
-       for (int i = 0; i < iterations; i++) {
-         // details ommitted
-         peer.getCounter(LoopCounter.LOOPS).increment(1L);
-       }
-       // rest ommitted
-     }
- }}}
- 
- Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync
phase. Watch [[https://issues.apache.org/jira/browse/HAMA-515|HAMA-515]] for details.
- 
- = Setup and Cleanup =
- 
- 
- Since 0.4.0 you can use Setup and Cleanup methods in your BSP code. 
- They can be inherited from BSP class like this:
- 
- {{{
-  public class MyEstimator extends
-       BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
- 
-     @Override
-     public void setup(
-         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
peer)
-         throws IOException {
-       //Setup: Choose one as a master
-       this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
-     }
- 
-     @Override
-     public void cleanup(
-         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
peer)
-         throws IOException {
-       // your cleanup here
-     }
- 
-     @Override
-     public void bsp(
-         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
peer)
-         throws IOException, SyncException, InterruptedException {
-       // your computation here
-     }
-   } 
- }}}
- 
- Setup is called before bsp method, and cleanup is executed at the end after bsp.
- You can do everything in setup and cleanup: sync, send, increment counters, write output
or even read from the input.
- 
- = Combiners =
- Combiners are used for performing message aggregation to reduce communication overhead in
cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the
sender side. Suppose that you want to send the integer messages to a specific processor from
0 to 1000 and sum all received the integer messages from all processors.
- 
- {{{
-     public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable>
peer) throws IOException,
-         SyncException, InterruptedException {
- 
-       for (int i = 0; i < 1000; i++) {
-     	peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
-       }
-       peer.sync();
- 
-       if (peer.getPeerName().equals(masterTask)) {
-         IntegerMessage received;
-         while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
-           sum += received.getData();
-         }
-       }
-     }
- }}}
- 
- If you follow the previous example, Each bsp processor will send a bundle of thousand Integer
messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform
a sum Integer messages and to write more concise and maintainable as below, that is why you
use Combiners.
- 
- {{{
-   public static class SumCombiner extends Combiner {
- 
-     @Override
-     public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
-       BSPMessageBundle bundle = new BSPMessageBundle();
-       int sum = 0;
- 
-       Iterator<BSPMessage> it = messages.iterator();
-       while (it.hasNext()) {
-         sum += ((IntegerMessage) it.next()).getData();
-       }
- 
-       bundle.addMessage(new IntegerMessage("Sum", sum));
-       return bundle;
-     }
- 
-   }
- 
- }}}
- 

Mime
View raw message