Return-Path: X-Original-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE32E9BD5 for ; Thu, 22 Sep 2011 01:44:26 +0000 (UTC) Received: (qmail 52489 invoked by uid 500); 22 Sep 2011 01:44:26 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 52444 invoked by uid 500); 22 Sep 2011 01:44:26 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 52280 invoked by uid 99); 22 Sep 2011 01:44:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Sep 2011 01:44:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Sep 2011 01:44:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 30E77238890A; Thu, 22 Sep 2011 01:43:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1173927 - in /incubator/hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/BSPMaster.java core/src/main/java/org/apache/hama/bsp/BSPPeer.java Date: Thu, 22 Sep 2011 01:43:56 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110922014357.30E77238890A@eris.apache.org> Author: edwardyoon Date: Thu Sep 22 01:43:56 2011 New Revision: 1173927 URL: http://svn.apache.org/viewvc?rev=1173927&view=rev Log: Fixed barrier problem. Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1173927&r1=1173926&r2=1173927&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Thu Sep 22 01:43:56 2011 @@ -9,6 +9,7 @@ Release 0.4 - Unreleased BUG FIXES + HAMA-387: Fixed barrier synchronization problem (ChiaHung Lin via edwardyoon) HAMA-436: Web Interface does not update Superstep Count (Thomas Jungblut) HAMA-429: Groom statuses should be reported periodically (ChiaHung Lin via edwardyoon) HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon) Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1173927&r1=1173926&r2=1173927&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Thu Sep 22 01:43:56 2011 @@ -474,6 +474,9 @@ public class BSPMaster implements JobSub public void clearZKNodes() { try { for (String node : zk.getChildren(bspRoot, this)) { + for (String subnode : zk.getChildren(bspRoot + "/" + node, this)) { + zk.delete(bspRoot + "/" + node + "/" + subnode, 0); + } zk.delete(bspRoot + "/" + node, 0); } } catch (KeeperException e) { Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1173927&r1=1173926&r2=1173927&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Sep 22 01:43:56 2011 @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.UnknownHostException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -49,6 +50,7 @@ import org.apache.hama.ipc.BSPPeerProtoc import org.apache.hama.util.Bytes; import org.apache.hama.zookeeper.QuorumPeer; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -59,7 +61,7 @@ import org.apache.zookeeper.ZooDefs.Ids; * This class represents a BSP peer. */ public class BSPPeer implements Watcher, BSPPeerInterface { - + public static final Log LOG = LogFactory.getLog(BSPPeer.class); private final Configuration conf; @@ -304,7 +306,6 @@ public class BSPPeer implements Watcher, @Override public void sync() throws IOException, KeeperException, InterruptedException { enterBarrier(); - long startTime = System.currentTimeMillis(); Iterator>> it = this.outgoingQueues .entrySet().iterator(); @@ -331,15 +332,10 @@ public class BSPPeer implements Watcher, peer.put(bundle); } - if ((System.currentTimeMillis() - startTime) < 200) { - Thread.sleep(200); - } - leaveBarrier(); currentTaskStatus.incrementSuperstepCount(); umbilical.incrementSuperstepCount(taskid); - startTime = System.currentTimeMillis(); // Clear outgoing queues. clearOutgoingQueues(); @@ -351,53 +347,124 @@ public class BSPPeer implements Watcher, // Switch local queues. localQueue = localQueueForNextIteration; localQueueForNextIteration = new ConcurrentLinkedQueue(); + } + + private void createZnode(final String path) throws KeeperException, + InterruptedException { + createZnode(path, CreateMode.PERSISTENT); + } + + private void createEphemeralZnode(final String path) throws KeeperException, + InterruptedException { + createZnode(path, CreateMode.EPHEMERAL); + } - // TODO: This is a quite temporary solution of HAMA-387. - // If zk.getChildren() response is slower than 200 milliseconds, - // BSP system will be hanged. - - // We have to consider another way to avoid this problem. - if ((System.currentTimeMillis() - startTime) < 200) { - Thread.sleep(200); // at least wait + private void createZnode(final String path, final CreateMode mode) throws KeeperException, + InterruptedException { + Stat s = zk.exists(path, false); + if(null == s) { + try { + zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode); + } catch(KeeperException.NodeExistsException nee) { + LOG.warn("Ignore because znode may be already created at "+path, nee); + } } } protected boolean enterBarrier() throws KeeperException, InterruptedException { - LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " - + this.getSuperstepCount()); - zk.create(getNodeName(), Bytes.toBytes(this.getSuperstepCount()), - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + if(LOG.isDebugEnabled()) { + LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + + this.getSuperstepCount()); + } - while (true) { - synchronized (mutex) { - List list = zk.getChildren(bspRoot, true); + createZnode(bspRoot); - if (list.size() < jobConf.getNumBspTask()) { - mutex.wait(); - } else { - return true; + final String pathToJobIdZnode = + bspRoot + "/" + taskid.getJobID().toString(); + createZnode(pathToJobIdZnode); + + final String pathToSuperstepZnode = + pathToJobIdZnode + "/" + getSuperstepCount(); + createZnode(pathToSuperstepZnode); + + zk.exists(pathToSuperstepZnode+"/ready", new Watcher() { + @Override + public void process(WatchedEvent event) { + synchronized(mutex) { + try { + Stat s = zk.exists(pathToSuperstepZnode+"/ready", false); + if(null != s) { + zk.delete(pathToSuperstepZnode+"/ready", 0); + } + } catch(KeeperException.NoNodeException nne) { + LOG.warn("Ignore because znode may be deleted.", nne); + } catch(Exception e) { + throw new RuntimeException(e); + } + mutex.notifyAll(); } } + }); + zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + synchronized(mutex) { + List znodes = zk.getChildren(pathToSuperstepZnode, false); + if(LOG.isDebugEnabled()) + LOG.debug("enterBarrier() znode size within "+pathToSuperstepZnode+" is "+ + znodes.size()+". Znodes include " +znodes); + if (znodes.size() < jobConf.getNumBspTask()) { + mutex.wait(); + } else { + createEphemeralZnode(pathToSuperstepZnode+"/ready"); + } } + return true; } protected boolean leaveBarrier() throws KeeperException, InterruptedException { - zk.delete(getNodeName(), 0); - while (true) { + final String pathToSuperstepZnode = + bspRoot + "/" + taskid.getJobID().toString() + "/" + getSuperstepCount(); + while(true) { synchronized (mutex) { - List list = zk.getChildren(bspRoot, true); - - if (list.size() > 0) { - mutex.wait(); - } else { + final List znodes = zk.getChildren(pathToSuperstepZnode, false); + final int size = znodes.size(); + if(null == znodes || znodes.isEmpty()) return true; + if(1 == size) { + zk.delete(getNodeName(), 0); return true; } + Collections.sort(znodes); + final String lowest = znodes.get(0); + final String highest = znodes.get(size-1); + if (getNodeName().equals(pathToSuperstepZnode+"/"+lowest)) { + Stat s = zk.exists(pathToSuperstepZnode+"/"+highest, new Watcher() { + @Override + public void process(WatchedEvent event) { + synchronized(mutex) { + mutex.notifyAll(); + } + } + }); + if(null != s) mutex.wait(); + }else{ + Stat s1 = zk.exists(getNodeName(), false); + if(null != s1) zk.delete(getNodeName(), 0); + Stat s2 = zk.exists(pathToSuperstepZnode+"/"+lowest, new Watcher() { + @Override + public void process(WatchedEvent event) { + synchronized(mutex) { + mutex.notifyAll(); + } + } + }); + if(null != s2) mutex.wait(); + } } } } private String getNodeName() { - return bspRoot + "/" + taskid.getJobID().toString() + "_" + getPeerName(); + return bspRoot + "/" + taskid.getJobID().toString() + "/" + getSuperstepCount() + "/" + taskid.toString() ; } @Override