incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1175640 - in /incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp: BSPMaster.java BSPPeer.java
Date Mon, 26 Sep 2011 01:06:13 GMT
Author: edwardyoon
Date: Mon Sep 26 01:06:12 2011
New Revision: 1175640

URL: http://svn.apache.org/viewvc?rev=1175640&view=rev
Log:
Refactor BSPPeer

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1175640&r1=1175639&r2=1175640&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Mon Sep 26
01:06:12 2011
@@ -485,6 +485,9 @@ public class BSPMaster implements JobSub
       for (String node : zk.getChildren(bspRoot, this)) {
         for (String subnode : zk.getChildren(bspRoot + "/" + node, this)) {
           for (String subnode2 : zk.getChildren(bspRoot + "/" + node, this)) {
+            for (String subnode3 : zk.getChildren(bspRoot + "/" + node + "/" + subnode2,
this)) {
+              zk.delete(bspRoot + "/" + node + "/" + subnode + "/" + subnode2 + "/" + subnode3,
0);
+            }
             zk.delete(bspRoot + "/" + node + "/" + subnode + "/" + subnode2, 0);
           }
           zk.delete(bspRoot + "/" + node + "/" + subnode, 0);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1175640&r1=1175639&r2=1175640&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Mon Sep 26 01:06:12
2011
@@ -227,8 +227,8 @@ public class BSPPeer implements Watcher,
     BSPMessageSerializer msgSerializer = null;
     if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
       msgSerializer = new BSPMessageSerializer(conf.getInt(
-          "bsp.checkpoint.port", Integer
-              .parseInt(CheckpointRunner.DEFAULT_PORT)));
+          "bsp.checkpoint.port",
+          Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
     }
     this.messageSerializer = msgSerializer;
   }
@@ -237,8 +237,8 @@ public class BSPPeer implements Watcher,
     try {
       if (LOG.isDebugEnabled())
         LOG.debug("reinitialize(): " + getPeerName());
-      this.server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
-          .getPort(), conf);
+      this.server = RPC.getServer(this, peerAddress.getHostName(),
+          peerAddress.getPort(), conf);
       server.start();
       LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
           + peerAddress.getPort());
@@ -365,12 +365,40 @@ public class BSPPeer implements Watcher,
 
   private void createZnode(final String path, final CreateMode mode)
       throws KeeperException, InterruptedException {
-    Stat s = zk.exists(path, false);
-    if (null == s) {
-      try {
-        zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
-      } catch (KeeperException.NodeExistsException nee) {
-        LOG.warn("Ignore because znode may be already created at " + path, nee);
+    synchronized (zk) {
+      Stat s = zk.exists(path, false);
+      if (null == s) {
+        try {
+          zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+        } catch (KeeperException.NodeExistsException nee) {
+          LOG.warn("Ignore because znode may be already created at " + path,
+              nee);
+        }
+      }
+    }
+  }
+
+  private class BarrierWatcher implements Watcher {
+    private boolean complete = false;
+
+    boolean isComplete() {
+      return this.complete;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      this.complete = true;
+      synchronized (mutex) {
+        LOG.info(">>>>>>>>>>>>>>> at superstep
" + getSuperstepCount()
+            + " taskid:" + taskid.toString() + " is notified.");
+        /*
+         * try { Stat s = zk.exists(pathToSuperstepZnode+"/ready", false);
+         * if(null != s) { zk.delete(pathToSuperstepZnode+"/ready", 0); } }
+         * catch(KeeperException.NoNodeException nne) {
+         * LOG.warn("Ignore because znode may be deleted.", nne); }
+         * catch(Exception e) { throw new RuntimeException(e); }
+         */
+        mutex.notifyAll();
       }
     }
   }
@@ -381,44 +409,50 @@ public class BSPPeer implements Watcher,
           + this.getSuperstepCount());
     }
 
-    createZnode(bspRoot);
+    synchronized (zk) {
+      createZnode(bspRoot);
+      final String pathToJobIdZnode = bspRoot + "/"
+          + taskid.getJobID().toString();
+      createZnode(pathToJobIdZnode);
+      final String pathToSuperstepZnode = pathToJobIdZnode + "/"
+          + getSuperstepCount();
+      createZnode(pathToSuperstepZnode);
+      BarrierWatcher barrierWatcher = new BarrierWatcher();
+      Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
+          barrierWatcher);
+      zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 
-    final String pathToJobIdZnode = bspRoot + "/"
-        + taskid.getJobID().toString();
-    createZnode(pathToJobIdZnode);
-
-    final String pathToSuperstepZnode = pathToJobIdZnode + "/"
-        + getSuperstepCount();
-    createZnode(pathToSuperstepZnode);
-
-    zk.exists(pathToSuperstepZnode + "/ready", new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        synchronized (mutex) {
-          try {
-            Stat s = zk.exists(pathToSuperstepZnode + "/ready", false);
-            if (null != s) {
-              zk.delete(pathToSuperstepZnode + "/ready", 0);
-            }
-          } catch (KeeperException.NoNodeException nne) {
-            LOG.warn("Ignore because znode may be deleted.", nne);
-          } catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-          mutex.notifyAll();
-        }
+      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+      int size = znodes.size(); // may contains ready
+      boolean hasReady = znodes.contains("ready");
+      if (hasReady) {
+        size--;
       }
-    });
-    zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 
-    synchronized (mutex) {
-      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+      LOG.debug("===> at superstep :" + getSuperstepCount()
+          + " current znode size: " + znodes.size() + " current znodes:"
+          + znodes);
+
       if (LOG.isDebugEnabled())
         LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
             + " is " + znodes.size() + ". Znodes include " + znodes);
-      if (znodes.size() < jobConf.getNumBspTask()) {
-        mutex.wait();
+
+      if (size < jobConf.getNumBspTask()) {
+        LOG.info("xxxx 1. At superstep: " + getSuperstepCount()
+            + " which task is waiting? " + taskid.toString()
+            + " stat is null? " + readyStat);
+        while (!barrierWatcher.isComplete()) {
+          if (!hasReady) {
+            synchronized (mutex) {
+              mutex.wait(1000);
+            }
+          }
+        }
+        LOG.debug("xxxx 2. at superstep: " + getSuperstepCount()
+            + " after waiting ..." + taskid.toString());
       } else {
+        LOG.debug("---> at superstep: " + getSuperstepCount()
+            + " task that is creating /ready znode:" + taskid.toString());
         createEphemeralZnode(pathToSuperstepZnode + "/ready");
       }
     }
@@ -429,45 +463,86 @@ public class BSPPeer implements Watcher,
     final String pathToSuperstepZnode = bspRoot + "/"
         + taskid.getJobID().toString() + "/" + getSuperstepCount();
     while (true) {
-      synchronized (mutex) {
-        final List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
-        final int size = znodes.size();
-        if (null == znodes || znodes.isEmpty())
-          return true;
-        if (1 == size) {
+      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+      LOG.info("leaveBarrier() !!! checking znodes cotnains /ready node or not: at superstep:"
+          + getSuperstepCount() + " znode:" + znodes);
+      if (znodes.contains("ready")) {
+        znodes.remove("ready");
+      }
+      final int size = znodes.size();
+      LOG.info("leaveBarrier() at superstep:" + getSuperstepCount()
+          + " znode size: (" + size + ") znodes:" + znodes);
+      if (null == znodes || znodes.isEmpty())
+        return true;
+      if (1 == size) {
+        try {
           zk.delete(getNodeName(), 0);
-          return true;
+        } catch (KeeperException.NoNodeException nne) {
+          LOG.warn(
+              "+++ (znode size is 1). Ignore because znode may disconnect.",
+              nne);
         }
-        Collections.sort(znodes);
-        final String lowest = znodes.get(0);
-        final String highest = znodes.get(size - 1);
+        return true;
+      }
+      Collections.sort(znodes);
+
+      final String lowest = znodes.get(0);
+      final String highest = znodes.get(size - 1);
+
+      LOG.info("leaveBarrier() at superstep: " + getSuperstepCount()
+          + " taskid:" + taskid.toString() + " lowest: " + lowest + " highest:"
+          + highest);
+      synchronized (mutex) {
+
         if (getNodeName().equals(pathToSuperstepZnode + "/" + lowest)) {
           Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
               new Watcher() {
                 @Override
                 public void process(WatchedEvent event) {
                   synchronized (mutex) {
+                    LOG.debug("leaveBarrier() at superstep: "
+                        + getSuperstepCount() + " taskid:" + taskid.toString()
+                        + " highest notify lowest.");
                     mutex.notifyAll();
                   }
                 }
               });
-          if (null != s)
+
+          if (null != s) {
+            LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+                + " taskid:" + taskid.toString() + " wait for higest notify.");
             mutex.wait();
+          }
         } else {
           Stat s1 = zk.exists(getNodeName(), false);
-          if (null != s1)
-            zk.delete(getNodeName(), 0);
+
+          if (null != s1) {
+            LOG.info("leaveBarrier() znode at superstep:" + getSuperstepCount()
+                + " taskid:" + taskid.toString() + " exists, so delete it.");
+            try {
+              zk.delete(getNodeName(), 0);
+            } catch (KeeperException.NoNodeException nne) {
+              LOG.warn("++++ Ignore because node may be dleted.", nne);
+            }
+          }
+
           Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
               new Watcher() {
                 @Override
                 public void process(WatchedEvent event) {
                   synchronized (mutex) {
+                    LOG.debug("leaveBarrier() at superstep: "
+                        + getSuperstepCount() + " taskid:" + taskid.toString()
+                        + " lowest notify other nodes.");
                     mutex.notifyAll();
                   }
                 }
               });
-          if (null != s2)
+          if (null != s2) {
+            LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+                + " taskid:" + taskid.toString() + " wait for lowest notify.");
             mutex.wait();
+          }
         }
       }
     }
@@ -555,8 +630,8 @@ public class BSPPeer implements Watcher,
           "Peername must consist of exactly ONE \":\"! Given peername was: "
               + peerName);
     }
-    return new InetSocketAddress(peerAddrParts[0], Integer
-        .parseInt(peerAddrParts[1]));
+    return new InetSocketAddress(peerAddrParts[0],
+        Integer.parseInt(peerAddrParts[1]));
   }
 
   @Override



Mime
View raw message