incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1185644 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/
Date Tue, 18 Oct 2011 13:22:49 GMT
Author: edwardyoon
Date: Tue Oct 18 13:22:49 2011
New Revision: 1185644

URL: http://svn.apache.org/viewvc?rev=1185644&view=rev
Log:
Add getPeerName(int index) and getNumPeers()

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1185644&r1=1185643&r2=1185644&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Oct 18 13:22:49 2011
@@ -3,6 +3,8 @@ Hama Change Log
 Release 0.4 - Unreleased
 
   NEW FEATURES
+  
+   HAMA-456: Add getPeerName(int index) and getNumPeers() (edwardyoon)
    HAMA-431: MapReduce NG integration (tjungblut)
    HAMA-449: Add tasks num of Job to web UI (edwardyoon)
    HAMA-428: Create a separate maven module and add basic structure for the Graph (edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1185644&r1=1185643&r2=1185644&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Tue Oct 18
13:22:49 2011
@@ -495,9 +495,7 @@ public class BSPMaster implements JobSub
         zk.delete(bspRoot + "/" + node, 0);
       }
     } catch (KeeperException e) {
-      e.printStackTrace();
     } catch (InterruptedException e) {
-      e.printStackTrace();
     }
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1185644&r1=1185643&r2=1185644&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Oct 18 13:22:49
2011
@@ -85,23 +85,34 @@ public interface BSPPeer extends HamaRPC
   public long getSuperstepCount();
 
   /**
-   * @return The name of this peer in the format "hostname:port".
+   * @return the name of this peer in the format "hostname:port".
    */
   public String getPeerName();
 
   /**
-   * @return The names of all the peers executing tasks from the same job
+   * @param index
+   * @return the name of n-th peer from sorted array by name.
+   */
+  public String getPeerName(int index);
+  
+  /**
+   * @return the names of all the peers executing tasks from the same job
    *         (including this peer).
    */
   public String[] getAllPeerNames();
 
   /**
+   * @return the number of peers
+   */
+  public int getNumPeers();
+  
+  /**
    * Clears all queues entries.
    */
   public void clear();
 
   /**
-   * @return The jobs configuration
+   * @return the jobs configuration
    */
   public Configuration getConfiguration();
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1185644&r1=1185643&r2=1185644&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Oct 18
13:22:49 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -82,11 +83,13 @@ public class BSPPeerImpl implements Watc
   private InetSocketAddress peerAddress;
   private TaskStatus currentTaskStatus;
 
-  private TaskAttemptID taskid;
+  private TaskAttemptID taskId;
   private BSPPeerProtocol umbilical;
 
   private final BSPMessageSerializer messageSerializer;
 
+  private String[] allPeers;
+
   public static final class BSPSerializableMessage implements Writable {
     final AtomicReference<String> path = new AtomicReference<String>();
     final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
@@ -206,12 +209,12 @@ public class BSPPeerImpl implements Watc
    * 
    * @param conf is the configuration file containing bsp peer host, port, etc.
    * @param umbilical is the bsp protocol used to contact its parent process.
-   * @param taskid is the id that current process holds.
+   * @param taskId is the id that current process holds.
    */
-  public BSPPeerImpl(Configuration conf, TaskAttemptID taskid,
+  public BSPPeerImpl(Configuration conf, TaskAttemptID taskId,
       BSPPeerProtocol umbilical) throws IOException {
     this.conf = conf;
-    this.taskid = taskid;
+    this.taskId = taskId;
     this.umbilical = umbilical;
 
     String bindAddress = conf.get(Constants.PEER_HOST,
@@ -221,13 +224,12 @@ public class BSPPeerImpl implements Watc
     bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
         Constants.DEFAULT_ZOOKEEPER_ROOT);
     quorumServers = QuorumPeer.getZKQuorumServersString(conf);
-    if (LOG.isDebugEnabled())
-      LOG.debug("Quorum  " + quorumServers);
+    LOG.debug("Quorum  " + quorumServers);
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
     BSPMessageSerializer msgSerializer = null;
     if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
-      msgSerializer = new BSPMessageSerializer(conf.getInt(
-          "bsp.checkpoint.port", Integer
+      msgSerializer = new BSPMessageSerializer(conf
+          .getInt("bsp.checkpoint.port", Integer
               .valueOf(CheckpointRunner.DEFAULT_PORT)));
     }
     this.messageSerializer = msgSerializer;
@@ -252,6 +254,17 @@ public class BSPPeerImpl implements Watc
     } catch (IOException e) {
       LOG.error("Fail while reinitializing zookeeeper!", e);
     }
+
+    try {
+      allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this)
+          .toArray(new String[0]);
+    } catch (KeeperException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    Arrays.sort(allPeers);
   }
 
   @Override
@@ -292,7 +305,7 @@ public class BSPPeerImpl implements Watc
   private String checkpointedPath() {
     String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
     String ckptPath = backup + jobConf.getJobID().toString() + "/"
-        + getSuperstepCount() + "/" + this.taskid.toString();
+        + getSuperstepCount() + "/" + this.taskId.toString();
     if (LOG.isDebugEnabled())
       LOG.debug("Messages are to be saved to " + ckptPath);
     return ckptPath;
@@ -312,15 +325,7 @@ public class BSPPeerImpl implements Watc
       Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
           .next();
 
-      BSPPeer peer = peers.get(entry.getKey());
-      if (peer == null) {
-        try {
-          peer = getBSPPeerConnection(entry.getKey());
-        } catch (NullPointerException ne) {
-          umbilical.fatalError(taskid, entry.getKey().getHostName()
-              + " doesn't exists.");
-        }
-      }
+      BSPPeer peer = getBSPPeerConnection(entry.getKey());
       Iterable<BSPMessage> messages = entry.getValue();
       BSPMessageBundle bundle = new BSPMessageBundle();
       for (BSPMessage message : messages) {
@@ -338,7 +343,7 @@ public class BSPPeerImpl implements Watc
 
     leaveBarrier();
     currentTaskStatus.incrementSuperstepCount();
-    umbilical.statusUpdate(taskid, currentTaskStatus);
+    umbilical.statusUpdate(taskId, currentTaskStatus);
 
     // Clear outgoing queues.
     clearOutgoingQueues();
@@ -389,8 +394,6 @@ public class BSPPeerImpl implements Watc
     public void process(WatchedEvent event) {
       this.complete = true;
       synchronized (mutex) {
-        LOG.debug(">>>>>>>>>>>>>>> at superstep
" + getSuperstepCount()
-            + " taskid:" + taskid.toString() + " is notified.");
         mutex.notifyAll();
       }
     }
@@ -405,7 +408,7 @@ public class BSPPeerImpl implements Watc
     synchronized (zk) {
       createZnode(bspRoot);
       final String pathToJobIdZnode = bspRoot + "/"
-          + taskid.getJobID().toString();
+          + taskId.getJobID().toString();
       createZnode(pathToJobIdZnode);
       final String pathToSuperstepZnode = pathToJobIdZnode + "/"
           + getSuperstepCount();
@@ -431,8 +434,8 @@ public class BSPPeerImpl implements Watc
             + " is " + znodes.size() + ". Znodes include " + znodes);
 
       if (size < jobConf.getNumBspTask()) {
-        LOG.info("xxxx 1. At superstep: " + getSuperstepCount()
-            + " which task is waiting? " + taskid.toString()
+        LOG.info("1. At superstep: " + getSuperstepCount()
+            + " which task is waiting? " + taskId.toString()
             + " stat is null? " + readyStat);
         while (!barrierWatcher.isComplete()) {
           if (!hasReady) {
@@ -441,11 +444,11 @@ public class BSPPeerImpl implements Watc
             }
           }
         }
-        LOG.debug("xxxx 2. at superstep: " + getSuperstepCount()
-            + " after waiting ..." + taskid.toString());
+        LOG.debug("2. at superstep: " + getSuperstepCount()
+            + " after waiting ..." + taskId.toString());
       } else {
         LOG.debug("---> at superstep: " + getSuperstepCount()
-            + " task that is creating /ready znode:" + taskid.toString());
+            + " task that is creating /ready znode:" + taskId.toString());
         createEphemeralZnode(pathToSuperstepZnode + "/ready");
       }
     }
@@ -454,7 +457,7 @@ public class BSPPeerImpl implements Watc
 
   protected boolean leaveBarrier() throws KeeperException, InterruptedException {
     final String pathToSuperstepZnode = bspRoot + "/"
-        + taskid.getJobID().toString() + "/" + getSuperstepCount();
+        + taskId.getJobID().toString() + "/" + getSuperstepCount();
     while (true) {
       List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
       LOG
@@ -464,8 +467,10 @@ public class BSPPeerImpl implements Watc
         znodes.remove("ready");
       }
       final int size = znodes.size();
-      LOG.info("leaveBarrier() at superstep:" + getSuperstepCount()
+
+      LOG.debug("leaveBarrier() at superstep:" + getSuperstepCount()
           + " znode size: (" + size + ") znodes:" + znodes);
+
       if (null == znodes || znodes.isEmpty())
         return true;
       if (1 == size) {
@@ -484,7 +489,7 @@ public class BSPPeerImpl implements Watc
       final String highest = znodes.get(size - 1);
 
       LOG.info("leaveBarrier() at superstep: " + getSuperstepCount()
-          + " taskid:" + taskid.toString() + " lowest: " + lowest + " highest:"
+          + " taskid:" + taskId.toString() + " lowest: " + lowest + " highest:"
           + highest);
       synchronized (mutex) {
 
@@ -495,7 +500,7 @@ public class BSPPeerImpl implements Watc
                 public void process(WatchedEvent event) {
                   synchronized (mutex) {
                     LOG.debug("leaveBarrier() at superstep: "
-                        + getSuperstepCount() + " taskid:" + taskid.toString()
+                        + getSuperstepCount() + " taskid:" + taskId.toString()
                         + " highest notify lowest.");
                     mutex.notifyAll();
                   }
@@ -504,7 +509,7 @@ public class BSPPeerImpl implements Watc
 
           if (null != s) {
             LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
-                + " taskid:" + taskid.toString() + " wait for higest notify.");
+                + " taskid:" + taskId.toString() + " wait for higest notify.");
             mutex.wait();
           }
         } else {
@@ -512,7 +517,7 @@ public class BSPPeerImpl implements Watc
 
           if (null != s1) {
             LOG.info("leaveBarrier() znode at superstep:" + getSuperstepCount()
-                + " taskid:" + taskid.toString() + " exists, so delete it.");
+                + " taskid:" + taskId.toString() + " exists, so delete it.");
             try {
               zk.delete(getNodeName(), 0);
             } catch (KeeperException.NoNodeException nne) {
@@ -526,7 +531,7 @@ public class BSPPeerImpl implements Watc
                 public void process(WatchedEvent event) {
                   synchronized (mutex) {
                     LOG.debug("leaveBarrier() at superstep: "
-                        + getSuperstepCount() + " taskid:" + taskid.toString()
+                        + getSuperstepCount() + " taskid:" + taskId.toString()
                         + " lowest notify other nodes.");
                     mutex.notifyAll();
                   }
@@ -534,7 +539,7 @@ public class BSPPeerImpl implements Watc
               });
           if (null != s2) {
             LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
-                + " taskid:" + taskid.toString() + " wait for lowest notify.");
+                + " taskid:" + taskId.toString() + " wait for lowest notify.");
             mutex.wait();
           }
         }
@@ -543,8 +548,8 @@ public class BSPPeerImpl implements Watc
   }
 
   private String getNodeName() {
-    return bspRoot + "/" + taskid.getJobID().toString() + "/"
-        + getSuperstepCount() + "/" + taskid.toString();
+    return bspRoot + "/" + taskId.getJobID().toString() + "/"
+        + getSuperstepCount() + "/" + taskId.toString();
   }
 
   @Override
@@ -593,17 +598,14 @@ public class BSPPeerImpl implements Watc
   protected BSPPeer getBSPPeerConnection(InetSocketAddress addr)
       throws NullPointerException, IOException {
     BSPPeer peer;
-    synchronized (this.peers) {
-      peer = peers.get(addr);
-
-      if(peer == null) {
-       peer = (BSPPeer) RPC.getProxy(BSPPeer.class,
-            BSPPeer.versionID, addr, this.conf);
+    peer = peers.get(addr);
+    if (peer == null) {
+      synchronized (this.peers) {
+        peer = (BSPPeer) RPC.getProxy(BSPPeer.class, BSPPeer.versionID, addr,
+            this.conf);
+        this.peers.put(addr, peer);
       }
-
-      this.peers.put(addr, peer);
     }
-
     return peer;
   }
 
@@ -627,17 +629,17 @@ public class BSPPeerImpl implements Watc
 
   @Override
   public String[] getAllPeerNames() {
-    String[] result = null;
-    try {
-      result = zk.getChildren("/" + jobConf.getJobID().toString(), this)
-          .toArray(new String[0]);
-    } catch (KeeperException e) {
-      e.printStackTrace();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
+    return allPeers;
+  }
 
-    return result;
+  @Override
+  public String getPeerName(int index) {
+    return allPeers[index];
+  }
+
+  @Override
+  public int getNumPeers() {
+    return allPeers.length;
   }
 
   /**

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1185644&r1=1185643&r2=1185644&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Tue Oct 18
13:22:49 2011
@@ -153,23 +153,13 @@ public class GroomServer implements Runn
         assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
         int i = 0;
 
-        // add peers to BSPMaster.
+        // add peers to Zookeeper.
         // TODO find another way to manage all activate peers.
         for (GroomServerAction action : actions) {
           Task t = ((LaunchTaskAction) action).getTask();
 
           int peerPort = (Constants.DEFAULT_PEER_PORT + i);
-
-          try {
-            zk.create("/" + t.getJobID().toString() + "/" + groomHostName + ":"
-                + peerPort, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                CreateMode.EPHEMERAL);
-          } catch (KeeperException e) {
-            e.printStackTrace();
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-
+          registerPeerAddress(t.getJobID(), peerPort);
           assignedPeerNames.put(t.getTaskID(), peerPort);
 
           i++;
@@ -198,6 +188,24 @@ public class GroomServer implements Runn
         }
       }
     }
+
+    /**
+     * Register peer address to share all addresses among tasks.
+     * 
+     * @param jobID
+     * @param peerPort
+     */
+    private void registerPeerAddress(BSPJobID jobID, int peerPort) {
+      try {
+        zk.create(
+            "/" + jobID.toString() + "/" + groomHostName + ":" + peerPort,
+            new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+      } catch (KeeperException e) {
+        e.printStackTrace();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
   }
 
   private class Instructor extends Thread {
@@ -841,8 +849,8 @@ public class GroomServer implements Runn
     }
 
     public void reportProgress(TaskStatus taskStatus) {
-//      LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + "% "
-//          + taskStatus.getStateString());
+      // LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + "% "
+      // + taskStatus.getStateString());
 
       if (this.done) {
         LOG.info(task.getTaskID()
@@ -948,11 +956,9 @@ public class GroomServer implements Runn
       bspPeer.reinitialize();
       bspPeer.setJobConf(job);
 
-      TaskStatus taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(),
-          0, TaskStatus.State.RUNNING, "running", host,
-          TaskStatus.Phase.STARTING);
-
-      bspPeer.setCurrentTaskStatus(taskStatus);
+      bspPeer.setCurrentTaskStatus(new TaskStatus(task.getJobID(), task
+          .getTaskID(), 0, TaskStatus.State.RUNNING, "running", host,
+          TaskStatus.Phase.STARTING));
 
       try {
         // use job-specified working directory

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1185644&r1=1185643&r2=1185644&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
Tue Oct 18 13:22:49 2011
@@ -36,7 +36,6 @@ import org.apache.hama.bsp.DoubleMessage
 import org.apache.zookeeper.KeeperException;
 
 public class PiEstimator {
-  private static String MASTER_TASK = "master.task.";
   private static Path TMP_OUTPUT = new Path("/tmp/pi-example/output");
 
   public static class MyEstimator extends BSP {
@@ -46,7 +45,8 @@ public class PiEstimator {
 
     @Override
     public void setup(BSPPeer peer) {
-      this.masterTask = conf.get(MASTER_TASK);
+      // Choose one as a master
+      this.masterTask = peer.getPeerName(0);
     }
 
     @Override
@@ -131,12 +131,6 @@ public class PiEstimator {
       bsp.setNumBspTask(cluster.getMaxTasks());
     }
 
-    // Choose one as a master
-    for (String hostName : cluster.getActiveGroomNames().keySet()) {
-      conf.set(MASTER_TASK, hostName);
-      break;
-    }
-
     FileSystem fileSys = FileSystem.get(conf);
     initTempDir(fileSys);
 



Mime
View raw message