incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hama Wiki] Trivial Update of "SerializePrinting" by edwardyoon
Date Tue, 08 Feb 2011 13:13:09 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.

The "SerializePrinting" page has been changed by edwardyoon.
http://wiki.apache.org/hama/SerializePrinting?action=diff&rev1=10&rev2=11

--------------------------------------------------

  This example will help you to understand the concepts of the BSP computing model.
  
   * Each task gets its own hostname (hostname:port pair) and a sorted list containing the
hostnames of all the other peers.
-  * Each task prints the LOG string "Hello BSP" only when its turn comes at intervals of
5 seconds.
+  * Each task prints the string "Hello BSP" only when its turn comes at intervals of 5 seconds.
  
  == BSP implementation of Serialize Printing of "Hello BSP" ==
  
@@ -14, +14 @@

  
  {{{
  public class SerializePrinting {
-   
+   private static String TMP_OUTPUT = "/tmp/test-example/";
+ 
    public static class HelloBSP extends BSP {
      public static final Log LOG = LogFactory.getLog(HelloBSP.class);
      private Configuration conf;
      private final static int PRINT_INTERVAL = 5000;
  
-     @Override
-     public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+     public void bsp(BSPPeerProtocol bspPeer) throws IOException,
-         InterruptedException {
+         KeeperException, InterruptedException {
        int num = Integer.parseInt(conf.get("bsp.peers.num"));
+       FileSystem fileSys = FileSystem.get(conf);
  
        int i = 0;
        for (String otherPeer : bspPeer.getAllPeerNames()) {
          if (bspPeer.getPeerName().equals(otherPeer)) {
+ 
+           SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+               new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+               CompressionType.NONE);
+           writer.append(new LongWritable(System.currentTimeMillis()), new Text(
-           LOG.info("Hello BSP from " + i + " of " + num + ": "
+               "Hello BSP from " + (i + 1) + " of " + num + ": "
-               + bspPeer.getPeerName());
+                   + bspPeer.getPeerName()));
+           writer.close();
+ 
          }
-         
+ 
          Thread.sleep(PRINT_INTERVAL);
          bspPeer.sync();
          i++;
        }
      }
  
-     @Override
      public Configuration getConf() {
        return conf;
      }
  
-     @Override
      public void setConf(Configuration conf) {
        this.conf = conf;
      }
@@ -54, +60 @@

        IOException {
      // BSP job configuration
      HamaConfiguration conf = new HamaConfiguration();
-     // Execute locally
-     // conf.set("bsp.master.address", "local");
  
      BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
      // Set the job name
      bsp.setJobName("serialize printing");
      bsp.setBspClass(HelloBSP.class);
  
+     // Set the task size as a number of GroomServer
+     BSPJobClient jobClient = new BSPJobClient(conf);
+     ClusterStatus cluster = jobClient.getClusterStatus(false);
+     bsp.setNumBspTask(cluster.getGroomServers());
+ 
+     FileSystem fileSys = FileSystem.get(conf);
+     if (fileSys.exists(new Path(TMP_OUTPUT))) {
+       fileSys.delete(new Path(TMP_OUTPUT), true);
+     }
      BSPJobClient.runJob(bsp);
+ 
+     System.out.println("Each task printed the \"Hello World\" as below:");
+     for (int i = 0; i < cluster.getGroomServers(); i++) {
+       SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+           TMP_OUTPUT + i), conf);
+       LongWritable timestamp = new LongWritable();
+       Text message = new Text();
+       reader.next(timestamp, message);
+       System.out.println(new Date(timestamp.get()) + ": " + message);
+       reader.close();
+     }
    }
  }
  }}}

Mime
View raw message