Return-Path: X-Original-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 55844CD10 for ; Sat, 19 May 2012 07:58:43 +0000 (UTC) Received: (qmail 51650 invoked by uid 500); 19 May 2012 07:58:43 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 51559 invoked by uid 500); 19 May 2012 07:58:40 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 51511 invoked by uid 99); 19 May 2012 07:58:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 May 2012 07:58:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 May 2012 07:58:34 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9236423889BF; Sat, 19 May 2012 07:58:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1340369 - in /incubator/hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/type/ core/src/test/java/org/apache/hama/ core/src/test/java/org/apache/hama/bsp/ graph/ graph/src/main/jav... Date: Sat, 19 May 2012 07:58:13 -0000 To: hama-commits@incubator.apache.org From: tjungblut@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120519075814.9236423889BF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tjungblut Date: Sat May 19 07:58:12 2012 New Revision: 1340369 URL: http://svn.apache.org/viewvc?rev=1340369&view=rev Log: [HAMA-576]: Improve sendMessages in Vertex Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/core/pom.xml incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java incubator/hama/trunk/graph/pom.xml incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Sat May 19 07:58:12 2012 @@ -17,7 +17,8 @@ Release 0.5 - April 10, 2012 BUG FIXES IMPROVEMENTS - + + HAMA-576: Improve sendMessages in Vertex (tjungblut) HAMA-575: Generify graph package (tjungblut) HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut) HAMA-521: Improve message buffering to save memory (Thomas Jungblut via edwardyoon) Modified: incubator/hama/trunk/core/pom.xml URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/pom.xml?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/pom.xml (original) +++ incubator/hama/trunk/core/pom.xml Sat May 19 07:58:12 2012 @@ -204,6 +204,18 @@ + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + test-jar + + + + + org.codehaus.mojo build-helper-maven-plugin Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Sat May 19 07:58:12 2012 @@ -150,7 +150,7 @@ public final class BSPTask extends Task } @SuppressWarnings("unchecked") - private final void runBSP( + private final static void runBSP( final BSPJob job, BSPPeerImpl bspPeer, final BytesWritable rawSplit, final BSPPeerProtocol umbilical) @@ -190,10 +190,12 @@ public final class BSPTask extends Task } } + @Override public final BSPJob getConf() { return conf; } + @Override public final void setConf(BSPJob conf) { this.conf = conf; } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sat May 19 07:58:12 2012 @@ -61,6 +61,7 @@ import org.apache.hama.HamaConfiguration import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.ipc.BSPPeerProtocol; import org.apache.hama.ipc.GroomProtocol; +import org.apache.hama.ipc.HamaRPCProtocolVersion; import org.apache.hama.ipc.MasterProtocol; import org.apache.hama.monitor.Monitor; import org.apache.hama.util.BSPNetUtils; @@ -143,6 +144,7 @@ public class GroomServer implements Runn private class DispatchTasksHandler implements DirectiveHandler { + @Override public void handle(Directive directive) throws DirectiveException { GroomServerAction[] actions = ((DispatchTasksDirective) directive) .getActions(); @@ -206,6 +208,7 @@ public class GroomServer implements Runn } } + @Override public void run() { while (true) { try { @@ -237,6 +240,7 @@ public class GroomServer implements Runn conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3)); } + @Override public void run() { getObliviousTasks(outOfContactTasks); @@ -353,7 +357,7 @@ public class GroomServer implements Runn // establish the communication link to bsp master this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class, - MasterProtocol.versionID, bspMasterAddr, conf); + HamaRPCProtocolVersion.versionID, bspMasterAddr, conf); // enroll in bsp master if (-1 == rpcPort || null == rpcAddr) @@ -638,7 +642,7 @@ public class GroomServer implements Runn launchTaskForJob(tip, jobConf); } - private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) { + private static void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) { try { tip.setJobConf(jobConf); tip.launchTask(); @@ -749,6 +753,7 @@ public class GroomServer implements Runn return result; } + @Override public void run() { try { initialize(); @@ -798,6 +803,7 @@ public class GroomServer implements Runn close(); } + @Override public synchronized void close() throws IOException { try { zk.close(); @@ -1011,9 +1017,9 @@ public class GroomServer implements Runn public long getProtocolVersion(String protocol, long clientVersion) throws IOException { if (protocol.equals(GroomProtocol.class.getName())) { - return GroomProtocol.versionID; + return HamaRPCProtocolVersion.versionID; } else if (protocol.equals(BSPPeerProtocol.class.getName())) { - return BSPPeerProtocol.versionID; + return HamaRPCProtocolVersion.versionID; } else { throw new IOException("Unknown protocol to GroomServer: " + protocol); } @@ -1068,7 +1074,7 @@ public class GroomServer implements Runn // ////////////////// BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, BSPPeerProtocol.versionID, address, + BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, address, defaultConf); final BSPTask task = (BSPTask) umbilical.getTask(taskid); Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Sat May 19 07:58:12 2012 @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.RunJar; /** @@ -50,7 +51,7 @@ public class TaskRunner extends Thread { .getProperty("path.separator"); private enum LogType { - STDOUT, ERROR + STDOUT, ERROR, CONSOLE } boolean bspKilled = false; @@ -59,7 +60,7 @@ public class TaskRunner extends Thread { private Thread infoLog; private final Task task; - private final BSPJob conf; + private final BSPJob bspJob; private final GroomServer groomServer; private File logDir; @@ -92,22 +93,29 @@ public class TaskRunner extends Thread { this.future.get().get(); } + @Override public Object call() throws Exception { + final boolean consoleRedirect = bspJob.getConf().getBoolean( + "hama.child.redirect.log.console", false); ProcessBuilder builder = new ProcessBuilder(commands); builder.directory(workDir); try { bspProcess = builder.start(); errorLog = new Thread() { + @Override public void run() { - logStream(bspProcess.getErrorStream(), LogType.ERROR); + logStream(bspProcess.getErrorStream(), + consoleRedirect ? LogType.CONSOLE : LogType.ERROR); } }; errorLog.start(); infoLog = new Thread() { + @Override public void run() { - logStream(bspProcess.getInputStream(), LogType.STDOUT); + logStream(bspProcess.getInputStream(), + consoleRedirect ? LogType.CONSOLE : LogType.STDOUT); } }; infoLog.start(); @@ -130,7 +138,7 @@ public class TaskRunner extends Thread { public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) { this.task = bspTask; - this.conf = conf; + this.bspJob = conf; this.groomServer = groom; } @@ -156,7 +164,7 @@ public class TaskRunner extends Thread { return workDir; } - private String assembleClasspath(BSPJob jobConf, File workDir) { + private static String assembleClasspath(BSPJob jobConf, File workDir) { StringBuffer classPath = new StringBuffer(); // start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); @@ -222,12 +230,13 @@ public class TaskRunner extends Thread { /** * Build working environment and launch BSPPeer processes. */ + @Override public void run() { File workDir = createWorkDirectory(); logDir = createLogDirectory(); - String classPath = assembleClasspath(conf, workDir); + String classPath = assembleClasspath(bspJob, workDir); LOG.debug("Spawned child's classpath " + classPath); - List bspArgs = buildJvmArgs(conf, classPath, + List bspArgs = buildJvmArgs(bspJob, classPath, GroomServer.BSPPeerChild.class); BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir); @@ -285,6 +294,14 @@ public class TaskRunner extends Thread { * @param stdout type of the log */ private void logStream(InputStream input, LogType type) { + if (type == LogType.CONSOLE) { + try { + IOUtils.copyBytes(input, System.out, bspJob.getConf()); + } catch (IOException e) { + // gracefully ignore any occuring exceptions here + } + return; + } // STDOUT file can be found under LOG_DIR/task_attempt_id.log // ERROR file can be found under LOG_DIR/task_attempt_id.err File taskLogFile = new File(logDir, task.getTaskAttemptId() @@ -322,7 +339,7 @@ public class TaskRunner extends Thread { * @param type * @return an ending, including a dot. */ - private String getFileEndingForType(LogType type) { + private static String getFileEndingForType(LogType type) { if (type != LogType.ERROR) return ".err"; else Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java Sat May 19 07:58:12 2012 @@ -28,7 +28,7 @@ public abstract class BSPMessage impleme public BSPMessage() { } - + /** * BSP messages are typically identified with tags. This allows to get the tag * of data. @@ -43,7 +43,7 @@ public abstract class BSPMessage impleme public abstract Object getData(); public abstract void setTag(Object tag); - + public abstract void setData(Object data); - + } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java Sat May 19 07:58:12 2012 @@ -22,12 +22,12 @@ import java.io.DataOutput; import java.io.IOException; /** - * A message that consists of a string tag and a boolean value. + * A message that consists of a string tag and a boolean value. */ public class BooleanMessage extends BSPMessage { - String tag; - boolean data; + public String tag; + public boolean data; public BooleanMessage() { super(); @@ -70,4 +70,4 @@ public class BooleanMessage extends BSPM public void setData(Object data) { this.data = (Boolean) data; } -} \ No newline at end of file +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java Sat May 19 07:58:12 2012 @@ -26,8 +26,8 @@ import java.io.IOException; */ public class ByteMessage extends BSPMessage { - private byte[] tag; - private byte[] data; + public byte[] tag; + public byte[] data; public ByteMessage() { super(); Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java Sat May 19 07:58:12 2012 @@ -22,12 +22,12 @@ import java.io.DataOutput; import java.io.IOException; /** - * A message that consists of a string tag and a double data. + * A message that consists of a string tag and a double data. */ public class DoubleMessage extends BSPMessage { - private String tag; - private Double data; + public String tag; + public Double data; public DoubleMessage() { super(); Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java Sat May 19 07:58:12 2012 @@ -22,12 +22,12 @@ import java.io.DataOutput; import java.io.IOException; /** - * A message that consists of a int tag and a double data. + * A message that consists of a int tag and a double data. */ public class IntegerDoubleMessage extends BSPMessage { - int tag; - double data; + public int tag; + public double data; public IntegerDoubleMessage() { super(); Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java Sat May 19 07:58:12 2012 @@ -22,12 +22,12 @@ import java.io.DataOutput; import java.io.IOException; /** - * A message that consists of a string tag and a int data. + * A message that consists of a string tag and a int data. */ public class IntegerMessage extends BSPMessage { - String tag; - int data; + public String tag; + public int data; public IntegerMessage() { super(); @@ -71,4 +71,4 @@ public class IntegerMessage extends BSPM this.data = (Integer) data; } -} \ No newline at end of file +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java Sat May 19 07:58:12 2012 @@ -26,8 +26,8 @@ import java.io.IOException; */ public class LongMessage extends BSPMessage { - private String tag; - private long data; + public String tag; + public long data; public LongMessage() { super(); Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java Sat May 19 07:58:12 2012 @@ -37,6 +37,7 @@ public abstract class HamaCluster extend super(startDfs); } + @Override protected void setUp() throws Exception { super.setUp(); } Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java Sat May 19 07:58:12 2012 @@ -58,6 +58,7 @@ public class MiniBSPCluster { throw new NullPointerException("No Configuration for BSPMaster."); } + @Override public void run(){ try{ LOG.info("Starting BSP Master."); @@ -97,6 +98,7 @@ public class MiniBSPCluster { this.conf = conf; } + @Override public void run(){ try{ this.gs = GroomServer.constructGroomServer(GroomServer.class, conf); @@ -203,7 +205,7 @@ public class MiniBSPCluster { } - private void randomPort(HamaConfiguration conf){ + private static void randomPort(HamaConfiguration conf){ try{ ServerSocket skt = new ServerSocket(0); int p = skt.getLocalPort(); Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Sat May 19 07:58:12 2012 @@ -19,9 +19,6 @@ */ package org.apache.hama.bsp; -import java.io.IOException; -import java.util.ArrayList; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,27 +33,20 @@ import org.apache.hama.HamaCluster; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.message.DiskQueue; import org.apache.hama.examples.ClassSerializePrinting; -import org.apache.hama.zookeeper.QuorumPeer; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; public class TestBSPMasterGroomServer extends HamaCluster { - private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class); - static String TMP_OUTPUT = "/tmp/test-example/"; + protected static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class); + public static String TMP_OUTPUT = "/tmp/test-example/"; public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue"; - static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout"); + public static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout"); - private HamaConfiguration configuration; + protected HamaConfiguration configuration; public TestBSPMasterGroomServer() { configuration = new HamaConfiguration(); configuration.set("bsp.master.address", "localhost"); + configuration.set("hama.child.redirect.log.console", "true"); assertEquals("Make sure master addr is set to localhost:", "localhost", configuration.get("bsp.master.address")); configuration.set("bsp.local.dir", "/tmp/hama-test"); @@ -68,10 +58,12 @@ public class TestBSPMasterGroomServer ex .getCanonicalName()); } + @Override public void setUp() throws Exception { super.setUp(); } + @Override public void tearDown() throws Exception { super.tearDown(); } Modified: incubator/hama/trunk/graph/pom.xml URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/pom.xml?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/graph/pom.xml (original) +++ incubator/hama/trunk/graph/pom.xml Sat May 19 07:58:12 2012 @@ -37,6 +37,13 @@ hama-core ${project.version} + + org.apache.hama + hama-core + test-jar + test + ${project.version} + hama-graph-${project.version} Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Sat May 19 07:58:12 2012 @@ -30,7 +30,7 @@ public class GraphJob extends BSPJob { public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class"; public final static String VERTEX_ID_CLASS_ATTR = "hama.graph.vertex.id.class"; - public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.id.class"; + public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.value.class"; public final static String VERTEX_EDGE_VALUE_CLASS_ATTR = "hama.graph.vertex.edge.value.class"; public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class"; Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1340369&view=auto ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (added) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Sat May 19 07:58:12 2012 @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * A message that is either MapWritable (for meta communication purposes) or a + * real message (vertex ID and value). It can be extended by adding flags, for + * example for a graph repair call. + */ +public final class GraphJobMessage implements Writable { + + public static final int MAP_FLAG = 0x01; + public static final int VERTEX_FLAG = 0x02; + public static final int REPAIR_FLAG = 0x04; + + // staticly defined because it is process-wide information, therefore in caps + // considered as a constant + public static Class VERTEX_ID_CLASS; + public static Class VERTEX_VALUE_CLASS; + + private int flag = MAP_FLAG; + private MapWritable map; + private Writable vertexId; + private Writable vertexValue; + + public GraphJobMessage() { + } + + public GraphJobMessage(MapWritable map) { + this.flag = MAP_FLAG; + this.map = map; + } + + public GraphJobMessage(Writable vertexId) { + this.flag = REPAIR_FLAG; + this.vertexId = vertexId; + } + + public GraphJobMessage(Writable vertexId, Writable vertexValue) { + this.flag = VERTEX_FLAG; + this.vertexId = vertexId; + this.vertexValue = vertexValue; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeByte(this.flag); + if (isVertexMessage()) { + // we don't need to write the classes because the other side has the same + // classes for the two entities. + vertexId.write(out); + vertexValue.write(out); + } else if (isMapMessage()) { + map.write(out); + } else { + vertexId.write(out); + } + + } + + @Override + public void readFields(DataInput in) throws IOException { + flag = in.readByte(); + if (isVertexMessage()) { + vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null); + vertexId.readFields(in); + vertexValue = ReflectionUtils.newInstance(VERTEX_VALUE_CLASS, null); + vertexValue.readFields(in); + } else if (isMapMessage()) { + map = new MapWritable(); + map.readFields(in); + } else { + vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null); + vertexId.readFields(in); + } + + } + + public MapWritable getMap() { + return map; + } + + public Writable getVertexId() { + return vertexId; + } + + public Writable getVertexValue() { + return vertexValue; + } + + public boolean isMapMessage() { + return flag == MAP_FLAG; + } + + public boolean isVertexMessage() { + return flag == VERTEX_FLAG; + } + + public boolean isRepairMessage() { + return flag == REPAIR_FLAG; + } + + @Override + public String toString() { + return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId=" + + vertexId + ", vertexValue=" + vertexValue + "]"; + } + +} Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sat May 19 07:58:12 2012 @@ -51,7 +51,7 @@ import org.apache.hama.util.KeyValuePair */ public class GraphJobRunner extends - BSP, VertexArrayWritable, Writable, Writable, Writable> { + BSP, VertexArrayWritable, Writable, Writable, GraphJobMessage> { private static final Log LOG = LogFactory.getLog(GraphJobRunner.class); @@ -100,7 +100,7 @@ public class GraphJobRunner, VertexArrayWritable, Writable, Writable, Writable> peer) + BSPPeer, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException { this.conf = peer.getConfiguration(); VertexWritable.CONFIGURATION = conf; @@ -115,6 +115,9 @@ public class GraphJobRunner, VertexArrayWritable, Writable, Writable, Writable> peer) + BSPPeer, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException { maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", @@ -199,7 +202,7 @@ public class GraphJobRunner 1) { peer.sync(); - MapWritable updatedValues = (MapWritable) peer.getCurrentMessage(); + MapWritable updatedValues = peer.getCurrentMessage().getMap(); globalAggregatorResult = updatedValues.get(FLAG_AGGREGATOR_VALUE); globalAggregatorIncrement = (IntWritable) updatedValues .get(FLAG_AGGREGATOR_INCREMENT); @@ -262,54 +265,62 @@ public class GraphJobRunner> parseMessages( - BSPPeer, VertexArrayWritable, Writable, Writable, Writable> peer) + BSPPeer, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer) throws IOException { - MapWritable msg = null; + GraphJobMessage msg = null; Map> msgMap = new HashMap>(); - while ((msg = (MapWritable) peer.getCurrentMessage()) != null) { - for (Entry e : msg.entrySet()) { - VERTEX_ID vertexID = (VERTEX_ID) e.getKey(); - if (FLAG_MESSAGE_COUNTS.equals(vertexID)) { - if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { - updated = false; - } else { - globalUpdateCounts += ((IntWritable) e.getValue()).get(); - } - } else if (aggregator != null && FLAG_AGGREGATOR_VALUE.equals(vertexID)) { - masterAggregator.aggregate((VERTEX_VALUE) e.getValue()); - } else if (aggregator != null - && FLAG_AGGREGATOR_INCREMENT.equals(vertexID)) { - if (isAbstractAggregator) { - ((AbstractAggregator) masterAggregator) - .addTimesAggregated(((IntWritable) e.getValue()).get()); - } + while ((msg = peer.getCurrentMessage()) != null) { + // either this is a vertex message or a directive that must be read as map + if (msg.isVertexMessage()) { + VERTEX_ID vertexID = (VERTEX_ID) msg.getVertexId(); + VERTEX_VALUE value = (VERTEX_VALUE) msg.getVertexValue(); + if (msgMap.containsKey(vertexID)) { + LinkedList msgs = msgMap.get(vertexID); + msgs.add(value); + msgMap.put(vertexID, msgs); } else { - VERTEX_VALUE value = (VERTEX_VALUE) e.getValue(); - if (msgMap.containsKey(vertexID)) { - LinkedList msgs = msgMap.get(vertexID); - msgs.add(value); - msgMap.put(vertexID, msgs); - } else { - LinkedList msgs = new LinkedList(); - msgs.add(value); - msgMap.put(vertexID, msgs); + LinkedList msgs = new LinkedList(); + msgs.add(value); + msgMap.put(vertexID, msgs); + } + } else if (msg.isMapMessage()) { + for (Entry e : msg.getMap().entrySet()) { + VERTEX_ID vertexID = (VERTEX_ID) e.getKey(); + if (FLAG_MESSAGE_COUNTS.equals(vertexID)) { + if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { + updated = false; + } else { + globalUpdateCounts += ((IntWritable) e.getValue()).get(); + } + } else if (aggregator != null + && FLAG_AGGREGATOR_VALUE.equals(vertexID)) { + masterAggregator.aggregate((VERTEX_VALUE) e.getValue()); + } else if (aggregator != null + && FLAG_AGGREGATOR_INCREMENT.equals(vertexID)) { + if (isAbstractAggregator) { + ((AbstractAggregator) masterAggregator) + .addTimesAggregated(((IntWritable) e.getValue()).get()); + } } } + } else { + throw new UnsupportedOperationException("Unknown message type? " + msg); } + } return msgMap; } @SuppressWarnings("unchecked") private void loadVertices( - BSPPeer, VertexArrayWritable, Writable, Writable, Writable> peer, + BSPPeer, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer, boolean repairNeeded) throws IOException { LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class")); boolean selfReference = conf.getBoolean("hama.graph.self.ref", false); @@ -360,7 +371,8 @@ public class GraphJobRunner entry : entries) { List> outEdges = entry.getOutEdges(); for (Edge e : outEdges) { - peer.send(e.getDestinationPeerName(), e.getDestinationVertexID()); + peer.send(e.getDestinationPeerName(), + new GraphJobMessage(e.getDestinationVertexID())); } } try { @@ -369,8 +381,9 @@ public class GraphJobRunner vertex = (Vertex) ReflectionUtils .newInstance( @@ -400,7 +413,7 @@ public class GraphJobRunner, VertexArrayWritable, Writable, Writable, Writable> peer) + BSPPeer, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer) throws IOException { for (Entry> e : vertices .entrySet()) { @@ -415,7 +428,7 @@ public class GraphJobRunner, VertexArrayWritable, Writable, Writable, Writable> peer) { + BSPPeer, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer) { return peer.getPeerName().equals(masterTask); } Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1340369&r1=1340368&r2=1340369&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Sat May 19 07:58:12 2012 @@ -22,7 +22,6 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSPPeer; @@ -32,7 +31,7 @@ public abstract class Vertex runner; - protected BSPPeer, VertexArrayWritable, Writable, Writable, Writable> peer; + protected BSPPeer, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer; public List> edges; public Configuration getConf() { @@ -51,9 +50,8 @@ public abstract class Vertex e, MSG_TYPE msg) throws IOException { - MapWritable message = new MapWritable(); - message.put(e.getDestinationVertexID(), msg); - peer.send(e.getDestinationPeerName(), message); + peer.send(e.getDestinationPeerName(), + new GraphJobMessage(e.getDestinationVertexID(), msg)); } @Override Added: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1340369&view=auto ============================================================================== --- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (added) +++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Sat May 19 07:58:12 2012 @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hama.Constants; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.bsp.TestBSPMasterGroomServer; +import org.apache.hama.graph.example.PageRank; + +public class TestSubmitGraphJob extends TestBSPMasterGroomServer { + + private static final Map, VertexArrayWritable> tmp = new HashMap, VertexArrayWritable>(); + static { + Configuration conf = new HamaConfiguration(); + VertexWritable.CONFIGURATION = conf; + // our first entry is null, because our indices in hama 3.0 pre calculated + // example starts at 1. + // FIXME This is really ugly. + String[] pages = new String[] { null, "twitter.com", "google.com", + "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com", + "youtube.com" }; + String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6", + "5;4;6", "6;4", "7;2;4" }; + + for (int i = 0; i < lineArray.length; i++) { + + String[] adjacencyStringArray = lineArray[i].split(";"); + int vertexId = Integer.parseInt(adjacencyStringArray[0]); + String name = pages[vertexId]; + @SuppressWarnings("unchecked") + VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length - 1]; + for (int j = 1; j < adjacencyStringArray.length; j++) { + arr[j - 1] = new VertexWritable( + new DoubleWritable(0.0d), new Text( + pages[Integer.parseInt(adjacencyStringArray[j])]), Text.class, + DoubleWritable.class); + } + VertexArrayWritable wr = new VertexArrayWritable(); + wr.set(arr); + tmp.put(new VertexWritable(name), wr); + } + } + + private static String INPUT = "/tmp/pagerank-real-tmp.seq"; + private static String OUTPUT = "/tmp/pagerank-real-out"; + + @Override + public void testSubmitJob() throws Exception { + + generateSeqTestData(tmp); + + GraphJob bsp = new GraphJob(configuration, PageRank.class); + bsp.setInputPath(new Path(INPUT)); + bsp.setOutputPath(new Path(OUTPUT)); + BSPJobClient jobClient = new BSPJobClient(configuration); + configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); + ClusterStatus cluster = jobClient.getClusterStatus(false); + assertEquals(this.numOfGroom, cluster.getGroomServers()); + bsp.setNumBspTask(2); + LOG.info("Client finishes execution job."); + bsp.setJobName("Pagerank"); + bsp.setVertexClass(PageRank.PageRankVertex.class); + // set the defaults + bsp.setMaxIteration(30); + bsp.set("hama.pagerank.alpha", "0.85"); + // we need to include a vertex in its adjacency list, + // otherwise the pagerank result has a constant loss + bsp.set("hama.graph.self.ref", "true"); + bsp.setAggregatorClass(AverageAggregator.class); + + bsp.setVertexIDClass(Text.class); + bsp.setVertexValueClass(DoubleWritable.class); + bsp.setEdgeValueClass(NullWritable.class); + + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setPartitioner(HashPartitioner.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setOutputKeyClass(Text.class); + bsp.setOutputValueClass(DoubleWritable.class); + + long startTime = System.currentTimeMillis(); + if (bsp.waitForCompletion(true)) { + verifyResult(); + LOG.info("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); + } else { + fail(); + } + } + + private void verifyResult() throws IOException { + double sum = 0.0; + FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*")); + for (FileStatus fts : globStatus) { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, fts.getPath(), + conf); + Text key = new Text(); + DoubleWritable value = new DoubleWritable(); + + while (reader.next(key, value)) { + sum += value.get(); + } + } + LOG.info("Sum is: " + sum); + assertTrue(sum > 0.99d && sum <= 1.1d); + } + + private void generateSeqTestData( + Map, VertexArrayWritable> tmp) + throws IOException { + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path( + INPUT), VertexWritable.class, VertexArrayWritable.class); + for (Entry, VertexArrayWritable> e : tmp + .entrySet()) { + writer.append(e.getKey(), e.getValue()); + } + writer.close(); + } +} Added: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1340369&view=auto ============================================================================== --- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (added) +++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Sat May 19 07:58:12 2012 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph.example; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.graph.Vertex; + +public class PageRank { + public static class PageRankVertex extends + Vertex { + + static double DAMPING_FACTOR = 0.85; + static double MAXIMUM_CONVERGENCE_ERROR = 0.001; + + int numEdges; + + @Override + public void setup(Configuration conf) { + String val = conf.get("hama.pagerank.alpha"); + if (val != null) { + DAMPING_FACTOR = Double.parseDouble(val); + } + val = conf.get("hama.graph.max.convergence.error"); + if (val != null) { + MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val); + } + numEdges = this.getOutEdges().size(); + } + + @Override + public void compute(Iterator messages) throws IOException { + // initialize this vertex to 1 / count of global vertices in this graph + if (this.getSuperstepCount() == 0) { + this.setValue(new DoubleWritable(1.0 / this.getNumVertices())); + } + + // in the first superstep, there are no messages to check + if (this.getSuperstepCount() >= 1) { + double sum = 0; + while (messages.hasNext()) { + DoubleWritable msg = messages.next(); + sum += msg.get(); + } + double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices(); + this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum))); + } + + // if we have not reached our global error yet, then proceed. + DoubleWritable globalError = getLastAggregatedValue(); + if (globalError != null && this.getSuperstepCount() > 2 + && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { + return; + } + // in each superstep we are going to send a new rank to our neighbours + sendMessageToNeighbors(new DoubleWritable(this.getValue().get() + / numEdges)); + } + } +}