incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1022855 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Date Fri, 15 Oct 2010 08:17:26 GMT
Author: edwardyoon
Date: Fri Oct 15 08:17:25 2010
New Revision: 1022855

URL: http://svn.apache.org/viewvc?rev=1022855&view=rev
Log:
Add serialize printing to ExampleDriver

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1022855&r1=1022854&r2=1022855&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Oct 15 08:17:25 2010
@@ -50,6 +50,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
     HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
     HAMA-299: Remove unused field 'task' from GroomServer (edwardyoon)
     HAMA-297: Add missing Apache License header to source files. (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java?rev=1022855&r1=1022854&r2=1022855&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java Fri Oct
15 08:17:25 2010
@@ -27,6 +27,7 @@ public class ExampleDriver {
     ProgramDriver pgd = new ProgramDriver();
     try {
       pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
+      pgd.addClass("test", SerializePrinting.class, "Serialize Printing Test");
       
       pgd.driver(args);
     } catch (Throwable e) {

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1022855&r1=1022854&r2=1022855&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Fri
Oct 15 08:17:25 2010
@@ -18,7 +18,10 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
+import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
@@ -30,6 +33,7 @@ import org.apache.zookeeper.KeeperExcept
 public class SerializePrinting {
   
   public static class HelloBSP extends BSP {
+    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
     private Configuration conf;
 
     @Override
@@ -37,16 +41,17 @@ public class SerializePrinting {
         InterruptedException {
       int num = Integer.parseInt(conf.get("bsp.peers.num"));
 
-      for (int i = 0; i < num; i++) {
-        if (bspPeer.getId() == i) {
-          System.out.println("Hello BSP from " + i + " of " + num + ": "
-              + bspPeer.getServerName());
+      int i = 0;
+      for(Map.Entry<String, String> e : bspPeer.getAllPeers().entrySet()) {
+        if(bspPeer.getHostName().equals(e.getValue())) {
+          LOG.info("Hello BSP from " + i + " of " + num + ": "
+              + bspPeer.getHostName());
         }
-
-        Thread.sleep(100);
+        
+        Thread.sleep(200);
         bspPeer.sync();
+        i++;
       }
-
     }
 
     @Override

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1022855&r1=1022854&r2=1022855&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Fri Oct 15 08:17:25 2010
@@ -51,7 +51,7 @@ public class BSPPeer implements Watcher,
   protected ZooKeeper zk = null;
   protected volatile Integer mutex = 0;
 
-  protected final String serverName;
+  protected final String peerAddr;
   protected final String bindAddress;
   protected final int bindPort;
   protected final String bspRoot;
@@ -60,7 +60,6 @@ public class BSPPeer implements Watcher,
   protected final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress,
BSPPeerInterface>();
   protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues
= new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
   protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
-  protected int id;
   protected Map<String, String> allPeers = new HashMap<String, String>();
   protected InetSocketAddress peerAddress;
 
@@ -70,11 +69,10 @@ public class BSPPeer implements Watcher,
   public BSPPeer(Configuration conf) throws IOException {
     this.conf = conf;
 
-    serverName = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST)
+    peerAddr = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST)
         + ":" + conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
     bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
     bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
-    id = conf.getInt(Constants.PEER_ID, 0);
     bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
         Constants.DEFAULT_ZOOKEEPER_ROOT);
     zookeeperAddr = conf.get(Constants.ZOOKEEPER_QUORUM)
@@ -188,9 +186,9 @@ public class BSPPeer implements Watcher,
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
-    LOG.debug("[" + serverName + "] enter the enterbarrier");
+    LOG.debug("[" + peerAddr + "] enter the enterbarrier");
     try {
-      zk.create(bspRoot + "/" + serverName, new byte[0], Ids.OPEN_ACL_UNSAFE,
+      zk.create(bspRoot + "/" + peerAddr, new byte[0], Ids.OPEN_ACL_UNSAFE,
           CreateMode.EPHEMERAL);
     } catch (KeeperException e) {
       e.printStackTrace();
@@ -213,7 +211,7 @@ public class BSPPeer implements Watcher,
   }
 
   protected boolean leaveBarrier() throws KeeperException, InterruptedException {
-    zk.delete(bspRoot + "/" + serverName, 0);
+    zk.delete(bspRoot + "/" + peerAddr, 0);
 
     while (true) {
       synchronized (mutex) {
@@ -221,7 +219,7 @@ public class BSPPeer implements Watcher,
         if (list.size() > 0) {
           mutex.wait();
         } else {
-          LOG.debug("[" + serverName + "] leave from the leaveBarrier");
+          LOG.debug("[" + peerAddr + "] leave from the leaveBarrier");
           return true;
         }
       }
@@ -274,13 +272,11 @@ public class BSPPeer implements Watcher,
     return peer;
   }
 
-  @Override
-  public String getServerName() {
-    return this.serverName;
-  }
-
-  public int getId() {
-    return this.id;
+  /**
+   * @return the string as host:port of this Peer 
+   */
+  public String getHostName() {
+    return this.peerAddr;
   }
 
   /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1022855&r1=1022854&r2=1022855&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Fri Oct 15 08:17:25
2010
@@ -56,5 +56,7 @@ public interface BSPPeerInterface extend
   
   public boolean isRunning();
   
-  public String getServerName();
+  public InetSocketAddress getAddress();
+  
+  public String getHostName();
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1022855&r1=1022854&r2=1022855&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Fri Oct 15 08:17:25
2010
@@ -125,7 +125,7 @@ public class GroomServer implements Runn
     this.conf.set(Constants.PEER_HOST, localHostname);
     bspPeer = new BSPPeer(conf);
 
-    this.groomServerName = "groomd_" + bspPeer.getServerName().replace(':', '_');
+    this.groomServerName = "groomd_" + bspPeer.getHostName().replace(':', '_');
     LOG.info("Starting groom: " + this.groomServerName);
 
     DistributedCache.purgeCache(this.conf);
@@ -294,7 +294,7 @@ public class GroomServer implements Runn
     //
     if (status == null) {
       synchronized (this) {
-        status = new GroomServerStatus(groomServerName, bspPeer.getServerName(),
+        status = new GroomServerStatus(groomServerName, bspPeer.getHostName(),
             cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
       }
     } else {
@@ -526,6 +526,6 @@ public class GroomServer implements Runn
   }
 
   public String getServerName() {
-    return bspPeer.getServerName();
+    return bspPeer.getHostName();
   }
 }



Mime
View raw message