incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1173927 - in /incubator/hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/BSPMaster.java core/src/main/java/org/apache/hama/bsp/BSPPeer.java
Date Thu, 22 Sep 2011 01:43:56 GMT
Author: edwardyoon
Date: Thu Sep 22 01:43:56 2011
New Revision: 1173927

URL: http://svn.apache.org/viewvc?rev=1173927&view=rev
Log:
Fixed barrier problem.

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1173927&r1=1173926&r2=1173927&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Sep 22 01:43:56 2011
@@ -9,6 +9,7 @@ Release 0.4 - Unreleased
 
   BUG FIXES
 
+    HAMA-387: Fixed barrier synchronization problem (ChiaHung Lin via edwardyoon)
     HAMA-436: Web Interface does not update Superstep Count (Thomas Jungblut)
     HAMA-429: Groom statuses should be reported periodically (ChiaHung Lin via edwardyoon)
     HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1173927&r1=1173926&r2=1173927&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Thu Sep 22
01:43:56 2011
@@ -474,6 +474,9 @@ public class BSPMaster implements JobSub
   public void clearZKNodes() {
     try {
       for (String node : zk.getChildren(bspRoot, this)) {
+        for (String subnode : zk.getChildren(bspRoot + "/" + node, this)) {
+          zk.delete(bspRoot + "/" + node + "/" + subnode, 0);
+        }
         zk.delete(bspRoot + "/" + node, 0);
       }
     } catch (KeeperException e) {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1173927&r1=1173926&r2=1173927&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Sep 22 01:43:56
2011
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -49,6 +50,7 @@ import org.apache.hama.ipc.BSPPeerProtoc
 import org.apache.hama.util.Bytes;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -59,7 +61,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
  * This class represents a BSP peer.
  */
 public class BSPPeer implements Watcher, BSPPeerInterface {
-
+  
   public static final Log LOG = LogFactory.getLog(BSPPeer.class);
 
   private final Configuration conf;
@@ -304,7 +306,6 @@ public class BSPPeer implements Watcher,
   @Override
   public void sync() throws IOException, KeeperException, InterruptedException {
     enterBarrier();
-    long startTime = System.currentTimeMillis();
     Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>>
it = this.outgoingQueues
         .entrySet().iterator();
 
@@ -331,15 +332,10 @@ public class BSPPeer implements Watcher,
       peer.put(bundle);
     }
 
-    if ((System.currentTimeMillis() - startTime) < 200) {
-      Thread.sleep(200);
-    }
-
     leaveBarrier();
     currentTaskStatus.incrementSuperstepCount();
     umbilical.incrementSuperstepCount(taskid);
 
-    startTime = System.currentTimeMillis();
     // Clear outgoing queues.
     clearOutgoingQueues();
 
@@ -351,53 +347,124 @@ public class BSPPeer implements Watcher,
     // Switch local queues.
     localQueue = localQueueForNextIteration;
     localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  }
+
+  private void createZnode(final String path) throws KeeperException, 
+      InterruptedException {
+    createZnode(path, CreateMode.PERSISTENT);
+  }
+
+  private void createEphemeralZnode(final String path) throws KeeperException, 
+      InterruptedException {
+    createZnode(path, CreateMode.EPHEMERAL);
+  }
 
-    // TODO: This is a quite temporary solution of HAMA-387.
-    // If zk.getChildren() response is slower than 200 milliseconds,
-    // BSP system will be hanged.
-
-    // We have to consider another way to avoid this problem.
-    if ((System.currentTimeMillis() - startTime) < 200) {
-      Thread.sleep(200); // at least wait
+  private void createZnode(final String path, final CreateMode mode) throws KeeperException,

+      InterruptedException {
+    Stat s = zk.exists(path, false);
+    if(null == s) {
+      try {
+        zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+      } catch(KeeperException.NodeExistsException nee) {
+        LOG.warn("Ignore because znode may be already created at "+path, nee);
+      }
     }
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
-    LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
-        + this.getSuperstepCount());
-    zk.create(getNodeName(), Bytes.toBytes(this.getSuperstepCount()),
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + 
+      this.getSuperstepCount());
+    }
 
-    while (true) {
-      synchronized (mutex) {
-        List<String> list = zk.getChildren(bspRoot, true);
+    createZnode(bspRoot); 
 
-        if (list.size() < jobConf.getNumBspTask()) {
-          mutex.wait();
-        } else {
-          return true;
+    final String pathToJobIdZnode = 
+      bspRoot + "/" + taskid.getJobID().toString();
+    createZnode(pathToJobIdZnode);
+
+    final String pathToSuperstepZnode = 
+      pathToJobIdZnode + "/" + getSuperstepCount();
+    createZnode(pathToSuperstepZnode); 
+    
+    zk.exists(pathToSuperstepZnode+"/ready", new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        synchronized(mutex) {
+          try {
+            Stat s = zk.exists(pathToSuperstepZnode+"/ready", false);
+            if(null != s) {
+              zk.delete(pathToSuperstepZnode+"/ready", 0);
+            }
+          } catch(KeeperException.NoNodeException nne) {
+            LOG.warn("Ignore because znode may be deleted.", nne);
+          } catch(Exception e) {
+            throw new RuntimeException(e);
+          }
+          mutex.notifyAll();
         }
       }
+    }); 
+    zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+    synchronized(mutex) {
+      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+      if(LOG.isDebugEnabled()) 
+        LOG.debug("enterBarrier() znode size within "+pathToSuperstepZnode+" is "+
+        znodes.size()+". Znodes include " +znodes);
+      if (znodes.size() < jobConf.getNumBspTask()) {
+        mutex.wait();
+      } else {
+        createEphemeralZnode(pathToSuperstepZnode+"/ready");
+      }
     }
+    return true;
   }
 
   protected boolean leaveBarrier() throws KeeperException, InterruptedException {
-    zk.delete(getNodeName(), 0);
-    while (true) {
+    final String pathToSuperstepZnode = 
+      bspRoot + "/" + taskid.getJobID().toString() + "/" + getSuperstepCount();
+    while(true) {
       synchronized (mutex) {
-        List<String> list = zk.getChildren(bspRoot, true);
-
-        if (list.size() > 0) {
-          mutex.wait();
-        } else {
+        final List<String> znodes = zk.getChildren(pathToSuperstepZnode, false); 
+        final int size = znodes.size();
+        if(null == znodes || znodes.isEmpty()) return true;
+        if(1 == size) {
+          zk.delete(getNodeName(), 0); 
           return true;
         }
+        Collections.sort(znodes);
+        final String lowest = znodes.get(0);
+        final String highest = znodes.get(size-1);
+        if (getNodeName().equals(pathToSuperstepZnode+"/"+lowest)) { 
+          Stat s = zk.exists(pathToSuperstepZnode+"/"+highest, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+              synchronized(mutex) {
+                mutex.notifyAll();
+              }
+            }
+          });
+          if(null != s) mutex.wait();
+        }else{
+          Stat s1 = zk.exists(getNodeName(), false);
+          if(null != s1) zk.delete(getNodeName(), 0);
+          Stat s2 = zk.exists(pathToSuperstepZnode+"/"+lowest, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+              synchronized(mutex) {
+                mutex.notifyAll();
+              }
+            }
+          });
+          if(null != s2) mutex.wait();
+        }
       }
     }
   }
 
   private String getNodeName() {
-    return bspRoot + "/" + taskid.getJobID().toString() + "_" + getPeerName();
+    return bspRoot + "/" + taskid.getJobID().toString() + "/" + getSuperstepCount() + "/"
+ taskid.toString() ;
   }
 
   @Override



Mime
View raw message