Return-Path: X-Original-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8C6039893 for ; Tue, 28 Feb 2012 05:02:29 +0000 (UTC) Received: (qmail 9549 invoked by uid 500); 28 Feb 2012 05:02:29 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 9230 invoked by uid 500); 28 Feb 2012 05:02:25 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 9162 invoked by uid 99); 28 Feb 2012 05:02:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Feb 2012 05:02:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Feb 2012 05:02:20 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2E0AC23889ED; Tue, 28 Feb 2012 05:01:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1294462 - /incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Date: Tue, 28 Feb 2012 05:01:59 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120228050159.2E0AC23889ED@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Tue Feb 28 05:01:58 2012 New Revision: 1294462 URL: http://svn.apache.org/viewvc?rev=1294462&view=rev Log: Exit if there's no update made Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1294462&r1=1294461&r2=1294462&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Feb 28 05:01:58 2012 @@ -28,6 +28,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -41,6 +42,8 @@ import org.apache.hama.util.KeyValuePair public class GraphJobRunner extends BSP { public static final Log LOG = LogFactory.getLog(GraphJobRunner.class); private Map vertices = new HashMap(); + private String masterTask; + private String FLAG_MESSAGE = "hama.graph.msg.counts"; @SuppressWarnings("unchecked") @Override @@ -52,6 +55,7 @@ public class GraphJobRunner extends BSP boolean updated = true; int iteration = 0; while (updated && iteration < maxIteration) { + int globalUpdateCounts = 0; peer.sync(); MapWritable msg = null; @@ -60,27 +64,50 @@ public class GraphJobRunner extends BSP for (Entry e : msg.entrySet()) { String vertexID = ((Text) e.getKey()).toString(); - Writable value = e.getValue(); - if (msgMap.containsKey(vertexID)) { - LinkedList msgs = msgMap.get(vertexID); - msgs.add(value); - msgMap.put(vertexID, msgs); + if (vertexID.toString().equals(FLAG_MESSAGE)) { + if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { + updated = false; + } else { + globalUpdateCounts += ((IntWritable) e.getValue()).get(); + } } else { - LinkedList msgs = new LinkedList(); - msgs.add(value); - msgMap.put(vertexID, msgs); - } + Writable value = e.getValue(); + if (msgMap.containsKey(vertexID)) { + LinkedList msgs = msgMap.get(vertexID); + msgs.add(value); + msgMap.put(vertexID, msgs); + } else { + LinkedList msgs = new LinkedList(); + msgs.add(value); + msgMap.put(vertexID, msgs); + } + } } } - if (msgMap.size() < 1) { - updated = false; + // exit if there's no update made + if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask) + && peer.getSuperstepCount() > 1) { + MapWritable updatedCnt = new MapWritable(); + updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable( + Integer.MIN_VALUE)); + + for (String peerName : peer.getAllPeerNames()) { + peer.send(peerName, updatedCnt); + } } + // send msgCounts to the master task + MapWritable updatedCnt = new MapWritable(); + updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(msgMap.size())); + peer.send(masterTask, updatedCnt); + for (Map.Entry> e : msgMap.entrySet()) { - vertices.get(e.getKey()).compute(e.getValue().iterator()); + if (e.getValue().size() > 0) { + vertices.get(e.getKey()).compute(e.getValue().iterator()); + } } iteration++; } @@ -90,6 +117,8 @@ public class GraphJobRunner extends BSP public void setup(BSPPeer peer) throws IOException, SyncException, InterruptedException { Configuration conf = peer.getConfiguration(); + // Choose one as a master to collect global updates + masterTask = peer.getPeerName(0); LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class")); KeyValuePair next = null;