Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7325D9DAE for ; Thu, 14 Jun 2012 04:06:56 +0000 (UTC) Received: (qmail 8980 invoked by uid 500); 14 Jun 2012 04:06:56 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 8892 invoked by uid 500); 14 Jun 2012 04:06:53 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 8847 invoked by uid 500); 14 Jun 2012 04:06:52 -0000 Delivered-To: apmail-incubator-hama-commits@incubator.apache.org Received: (qmail 8843 invoked by uid 99); 14 Jun 2012 04:06:52 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jun 2012 04:06:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jun 2012 04:06:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DBBD52388962; Thu, 14 Jun 2012 04:06:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1350083 - in /incubator/hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ Date: Thu, 14 Jun 2012 04:06:26 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120614040626.DBBD52388962@eris.apache.org> Author: edwardyoon Date: Thu Jun 14 04:06:25 2012 New Revision: 1350083 URL: http://svn.apache.org/viewvc?rev=1350083&view=rev Log: Add voteToHalt() mechanism in Graph API Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1350083&r1=1350082&r2=1350083&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Thu Jun 14 04:06:25 2012 @@ -4,6 +4,7 @@ Release 0.5 - April 10, 2012 NEW FEATURES + HAMA-588: Add voteToHalt() mechanism in Graph API (edwardyoon) HAMA-566: Add disk-based queue (tjungblut) HAMA-552: Add a sorted message queue (tjungblut) HAMA-556: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank (tjungblut) Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1350083&r1=1350082&r2=1350083&view=diff ============================================================================== --- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original) +++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Thu Jun 14 04:06:25 2012 @@ -66,10 +66,14 @@ public class MindistSearch { boolean updated = false; while (messages.hasNext()) { Text next = messages.next(); - if (currentComponent.compareTo(next) > 0) { - updated = true; - setValue(next); - } + if(currentComponent != null && next != null) { + if (currentComponent.compareTo(next) > 0) { + updated = true; + setValue(next); + } + } else { + this.voteToHalt(); + } } if (updated) { sendMessageToNeighbors(getValue()); Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1350083&r1=1350082&r2=1350083&view=diff ============================================================================== --- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original) +++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Thu Jun 14 04:06:25 2012 @@ -42,10 +42,10 @@ public class MindistSearchTest extends T String[] input = new String[] { "0", "1\t4\t7", "2\t3\t8", "3\t5", "4\t1", "5\t6", "6", "7", "8\t3", "9\t0" }; - private static String INPUT = "/tmp/pagerank-tmp.seq"; - private static String TEXT_INPUT = "/tmp/pagerank.txt"; - private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq"; - private static String OUTPUT = "/tmp/pagerank-out"; + private static String INPUT = "/tmp/mdst-tmp.seq"; + private static String TEXT_INPUT = "/tmp/mdst.txt"; + private static String TEXT_OUTPUT = INPUT + "mdst.txt.seq"; + private static String OUTPUT = "/tmp/mdst-out"; private Configuration conf = new HamaConfiguration(); private FileSystem fs; @@ -58,7 +58,7 @@ public class MindistSearchTest extends T public void testMindistSearch() throws Exception { generateTestData(); try { - MindistSearch.main(new String[] { INPUT, OUTPUT }); + MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "2" }); verifyResult(); } finally { Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1350083&r1=1350082&r2=1350083&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jun 14 04:06:25 2012 @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -259,33 +258,39 @@ public final class GraphJobRunner>> iterator = messages.entrySet() - .iterator(); - while (iterator.hasNext()) { - Entry> e = iterator.next(); - LinkedList msgs = e.getValue(); - if (combiner != null) { - M combined = combiner.combine(msgs); + + for (Vertex vertex : vertices.values()) { + LinkedList msgs = messages.get(vertex.getVertexID()); + if (vertex.isHalted() && msgs != null) { + vertex.votedToHalt = false; + } + if (msgs == null) { msgs = new LinkedList(); - msgs.add(combined); } - Vertex vertex = vertices.get(e.getKey()); - M lastValue = vertex.getValue(); - vertex.compute(msgs.iterator()); - if (aggregators != null) { - if (this.aggregators != null) { - for (int i = 0; i < this.aggregators.length; i++) { - Aggregator> aggregator = this.aggregators[i]; - aggregator.aggregate(vertex, vertex.getValue()); - if (isAbstractAggregator[i]) { - AbstractAggregator> intern = ((AbstractAggregator>) aggregator); - intern.aggregate(vertex, lastValue, vertex.getValue()); - intern.aggregateInternal(); + + if (!vertex.isHalted()) { + if (combiner != null) { + M combined = combiner.combine(msgs); + msgs = new LinkedList(); + msgs.add(combined); + } + M lastValue = vertex.getValue(); + vertex.compute(msgs.iterator()); + + if (aggregators != null) { + if (this.aggregators != null) { + for (int i = 0; i < this.aggregators.length; i++) { + Aggregator> aggregator = this.aggregators[i]; + aggregator.aggregate(vertex, vertex.getValue()); + if (isAbstractAggregator[i]) { + AbstractAggregator> intern = ((AbstractAggregator>) aggregator); + intern.aggregate(vertex, lastValue, vertex.getValue()); + intern.aggregateInternal(); + } } } } } - iterator.remove(); } runAggregators(peer, messagesSize); @@ -476,11 +481,11 @@ public final class GraphJobRunner Vertex newVertexInstance( + public static Vertex newVertexInstance( Class vertexClass, Configuration conf) { @SuppressWarnings("unchecked") - Vertex vertex = (Vertex) ReflectionUtils - .newInstance(vertexClass, conf); + Vertex vertex = (Vertex) ReflectionUtils.newInstance( + vertexClass, conf); return vertex; } Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1350083&r1=1350082&r2=1350083&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jun 14 04:06:25 2012 @@ -36,6 +36,8 @@ public abstract class Vertex peer; private List> edges; + protected boolean votedToHalt = false; + public Configuration getConf() { return peer.getConfiguration(); } @@ -163,6 +165,15 @@ public abstract class Vertex