incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1137857 - /incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
Date Tue, 21 Jun 2011 04:24:05 GMT
Author: edwardyoon
Date: Tue Jun 21 04:24:05 2011
New Revision: 1137857

URL: http://svn.apache.org/viewvc?rev=1137857&view=rev
Log:
Committing temporary solution of HAMA-387

Modified:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1137857&r1=1137856&r2=1137857&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Jun 21 04:24:05 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.Constants;
+import org.apache.hama.util.Bytes;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -175,6 +176,7 @@ public class BSPPeer implements Watcher,
   @Override
   public void sync() throws IOException, KeeperException, InterruptedException {
     enterBarrier();
+    long startTime = System.currentTimeMillis();
     Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>>
it = this.outgoingQueues
         .entrySet().iterator();
 
@@ -194,15 +196,17 @@ public class BSPPeer implements Watcher,
       peer.put(bundle);
     }
 
-    waitForSync();
-    Thread.sleep(100);
+    if ((System.currentTimeMillis() - startTime) < 200) {
+      Thread.sleep(200);
+    }
+
+    leaveBarrier();
+    currentTaskStatus.incrementSuperstepCount();
 
+    startTime = System.currentTimeMillis();
     // Clear outgoing queues.
     clearOutgoingQueues();
 
-    currentTaskStatus.incrementSuperstepCount();
-    leaveBarrier();
-
     // Add non-processed messages from this iteration for the next's queue.
     while (!localQueue.isEmpty()) {
       BSPMessage message = localQueue.poll();
@@ -211,18 +215,20 @@ public class BSPPeer implements Watcher,
     // Switch local queues.
     localQueue = localQueueForNextIteration;
     localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+
+    // TODO: This is a quite tricky solution of HAMA-387.
+    // When zk.getChildren() response is slower than 200 milliseconds,
+    // BSP system will be hanged.
+    if ((System.currentTimeMillis() - startTime) < 200) {
+      Thread.sleep(200); // at least wait
+    }
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
-    LOG.debug("[" + getPeerName() + "] enter the enterbarrier");
-    try {
-      zk.create(bspRoot + "/" + getPeerName(), new byte[0],
-          Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-    } catch (KeeperException e) {
-      LOG.error("Exception while entering barrier!", e);
-    } catch (InterruptedException e) {
-      LOG.error("Exception while entering barrier!", e);
-    }
+    LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
+        + this.getSuperstepCount());
+    zk.create(bspRoot + "/" + getPeerName(), Bytes.toBytes(this
+        .getSuperstepCount()), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 
     while (true) {
       synchronized (mutex) {
@@ -237,39 +243,15 @@ public class BSPPeer implements Watcher,
     }
   }
 
-  protected boolean waitForSync() throws KeeperException, InterruptedException {
-    try {
-      zk.create(bspRoot + "/" + getPeerName() + "-data", new byte[0],
-          Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-    } catch (KeeperException e) {
-      LOG.error("Exception while waiting for barrier sync!", e);
-    } catch (InterruptedException e) {
-      LOG.error("Exception while waiting for barrier sync!", e);
-    }
-
-    while (true) {
-      synchronized (mutex) {
-        List<String> list = zk.getChildren(bspRoot, true);
-        if (list.size() < (jobConf.getNumBspTask() * 2)) {
-          mutex.wait();
-        } else {
-          return true;
-        }
-      }
-    }
-  }
-
   protected boolean leaveBarrier() throws KeeperException, InterruptedException {
     zk.delete(bspRoot + "/" + getPeerName(), 0);
-    zk.delete(bspRoot + "/" + getPeerName() + "-data", 0);
-
     while (true) {
       synchronized (mutex) {
         List<String> list = zk.getChildren(bspRoot, true);
+
         if (list.size() > 0) {
           mutex.wait();
         } else {
-          LOG.debug("[" + getPeerName() + "] leave from the leaveBarrier");
           return true;
         }
       }



Mime
View raw message