hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1340369 - in /incubator/hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/type/ core/src/test/java/org/apache/hama/ core/src/test/java/org/apache/hama/bsp/ graph/ graph/src/main/jav...
Date Sat, 19 May 2012 07:58:13 GMT
Author: tjungblut
Date: Sat May 19 07:58:12 2012
New Revision: 1340369

URL: http://svn.apache.org/viewvc?rev=1340369&view=rev
Log:
[HAMA-576]: Improve sendMessages in Vertex

Added:
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/
    incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/pom.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/graph/pom.xml
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sat May 19 07:58:12 2012
@@ -17,7 +17,8 @@ Release 0.5 - April 10, 2012 
   BUG FIXES
 
   IMPROVEMENTS
-
+    
+    HAMA-576: Improve sendMessages in Vertex (tjungblut)
     HAMA-575: Generify graph package (tjungblut)    
     HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
     HAMA-521: Improve message buffering to save memory (Thomas Jungblut via edwardyoon)

Modified: incubator/hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/pom.xml?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/pom.xml (original)
+++ incubator/hama/trunk/core/pom.xml Sat May 19 07:58:12 2012
@@ -204,6 +204,18 @@
         </executions>
       </plugin>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
         <executions>

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Sat May 19 07:58:12 2012
@@ -150,7 +150,7 @@ public final class BSPTask extends Task 
   }
 
   @SuppressWarnings("unchecked")
-  private final <KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable> void runBSP(
+  private final static <KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable> void runBSP(
       final BSPJob job,
       BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bspPeer,
       final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
@@ -190,10 +190,12 @@ public final class BSPTask extends Task 
     }
   }
 
+  @Override
   public final BSPJob getConf() {
     return conf;
   }
 
+  @Override
   public final void setConf(BSPJob conf) {
     this.conf = conf;
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sat May 19 07:58:12 2012
@@ -61,6 +61,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.GroomProtocol;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.ipc.MasterProtocol;
 import org.apache.hama.monitor.Monitor;
 import org.apache.hama.util.BSPNetUtils;
@@ -143,6 +144,7 @@ public class GroomServer implements Runn
 
   private class DispatchTasksHandler implements DirectiveHandler {
 
+    @Override
     public void handle(Directive directive) throws DirectiveException {
       GroomServerAction[] actions = ((DispatchTasksDirective) directive)
           .getActions();
@@ -206,6 +208,7 @@ public class GroomServer implements Runn
       }
     }
 
+    @Override
     public void run() {
       while (true) {
         try {
@@ -237,6 +240,7 @@ public class GroomServer implements Runn
           conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
     }
 
+    @Override
     public void run() {
 
       getObliviousTasks(outOfContactTasks);
@@ -353,7 +357,7 @@ public class GroomServer implements Runn
 
     // establish the communication link to bsp master
     this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class,
-        MasterProtocol.versionID, bspMasterAddr, conf);
+        HamaRPCProtocolVersion.versionID, bspMasterAddr, conf);
 
     // enroll in bsp master
     if (-1 == rpcPort || null == rpcAddr)
@@ -638,7 +642,7 @@ public class GroomServer implements Runn
     launchTaskForJob(tip, jobConf);
   }
 
-  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
+  private static void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
     try {
       tip.setJobConf(jobConf);
       tip.launchTask();
@@ -749,6 +753,7 @@ public class GroomServer implements Runn
     return result;
   }
 
+  @Override
   public void run() {
     try {
       initialize();
@@ -798,6 +803,7 @@ public class GroomServer implements Runn
     close();
   }
 
+  @Override
   public synchronized void close() throws IOException {
     try {
       zk.close();
@@ -1011,9 +1017,9 @@ public class GroomServer implements Runn
   public long getProtocolVersion(String protocol, long clientVersion)
       throws IOException {
     if (protocol.equals(GroomProtocol.class.getName())) {
-      return GroomProtocol.versionID;
+      return HamaRPCProtocolVersion.versionID;
     } else if (protocol.equals(BSPPeerProtocol.class.getName())) {
-      return BSPPeerProtocol.versionID;
+      return HamaRPCProtocolVersion.versionID;
     } else {
       throw new IOException("Unknown protocol to GroomServer: " + protocol);
     }
@@ -1068,7 +1074,7 @@ public class GroomServer implements Runn
 
       // //////////////////
       BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
-          BSPPeerProtocol.class, BSPPeerProtocol.versionID, address,
+          BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, address,
           defaultConf);
 
       final BSPTask task = (BSPTask) umbilical.getTask(taskid);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Sat May 19 07:58:12 2012
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.RunJar;
 
 /**
@@ -50,7 +51,7 @@ public class TaskRunner extends Thread {
       .getProperty("path.separator");
 
   private enum LogType {
-    STDOUT, ERROR
+    STDOUT, ERROR, CONSOLE
   }
 
   boolean bspKilled = false;
@@ -59,7 +60,7 @@ public class TaskRunner extends Thread {
   private Thread infoLog;
 
   private final Task task;
-  private final BSPJob conf;
+  private final BSPJob bspJob;
   private final GroomServer groomServer;
 
   private File logDir;
@@ -92,22 +93,29 @@ public class TaskRunner extends Thread {
       this.future.get().get();
     }
 
+    @Override
     public Object call() throws Exception {
+      final boolean consoleRedirect = bspJob.getConf().getBoolean(
+          "hama.child.redirect.log.console", false);
       ProcessBuilder builder = new ProcessBuilder(commands);
       builder.directory(workDir);
       try {
         bspProcess = builder.start();
 
         errorLog = new Thread() {
+          @Override
           public void run() {
-            logStream(bspProcess.getErrorStream(), LogType.ERROR);
+            logStream(bspProcess.getErrorStream(),
+                consoleRedirect ? LogType.CONSOLE : LogType.ERROR);
           }
         };
         errorLog.start();
 
         infoLog = new Thread() {
+          @Override
           public void run() {
-            logStream(bspProcess.getInputStream(), LogType.STDOUT);
+            logStream(bspProcess.getInputStream(),
+                consoleRedirect ? LogType.CONSOLE : LogType.STDOUT);
           }
         };
         infoLog.start();
@@ -130,7 +138,7 @@ public class TaskRunner extends Thread {
 
   public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
     this.task = bspTask;
-    this.conf = conf;
+    this.bspJob = conf;
     this.groomServer = groom;
   }
 
@@ -156,7 +164,7 @@ public class TaskRunner extends Thread {
     return workDir;
   }
 
-  private String assembleClasspath(BSPJob jobConf, File workDir) {
+  private static String assembleClasspath(BSPJob jobConf, File workDir) {
     StringBuffer classPath = new StringBuffer();
     // start with same classpath as parent process
     classPath.append(System.getProperty("java.class.path"));
@@ -222,12 +230,13 @@ public class TaskRunner extends Thread {
   /**
    * Build working environment and launch BSPPeer processes.
    */
+  @Override
   public void run() {
     File workDir = createWorkDirectory();
     logDir = createLogDirectory();
-    String classPath = assembleClasspath(conf, workDir);
+    String classPath = assembleClasspath(bspJob, workDir);
     LOG.debug("Spawned child's classpath " + classPath);
-    List<String> bspArgs = buildJvmArgs(conf, classPath,
+    List<String> bspArgs = buildJvmArgs(bspJob, classPath,
         GroomServer.BSPPeerChild.class);
 
     BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
@@ -285,6 +294,14 @@ public class TaskRunner extends Thread {
    * @param stdout type of the log
    */
   private void logStream(InputStream input, LogType type) {
+    if (type == LogType.CONSOLE) {
+      try {
+        IOUtils.copyBytes(input, System.out, bspJob.getConf());
+      } catch (IOException e) {
+        // gracefully ignore any occuring exceptions here
+      }
+      return;
+    }
     // STDOUT file can be found under LOG_DIR/task_attempt_id.log
     // ERROR file can be found under LOG_DIR/task_attempt_id.err
     File taskLogFile = new File(logDir, task.getTaskAttemptId()
@@ -322,7 +339,7 @@ public class TaskRunner extends Thread {
    * @param type
    * @return an ending, including a dot.
    */
-  private String getFileEndingForType(LogType type) {
+  private static String getFileEndingForType(LogType type) {
     if (type != LogType.ERROR)
       return ".err";
     else

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BSPMessage.java Sat May 19 07:58:12 2012
@@ -28,7 +28,7 @@ public abstract class BSPMessage impleme
 
   public BSPMessage() {
   }
-  
+
   /**
    * BSP messages are typically identified with tags. This allows to get the tag
    * of data.
@@ -43,7 +43,7 @@ public abstract class BSPMessage impleme
   public abstract Object getData();
 
   public abstract void setTag(Object tag);
-  
+
   public abstract void setData(Object data);
-  
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/BooleanMessage.java Sat May 19 07:58:12 2012
@@ -22,12 +22,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 /**
- * A message that consists of a string tag and a boolean value. 
+ * A message that consists of a string tag and a boolean value.
  */
 public class BooleanMessage extends BSPMessage {
 
-  String tag;
-  boolean data;
+  public String tag;
+  public boolean data;
 
   public BooleanMessage() {
     super();
@@ -70,4 +70,4 @@ public class BooleanMessage extends BSPM
   public void setData(Object data) {
     this.data = (Boolean) data;
   }
-}
\ No newline at end of file
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/ByteMessage.java Sat May 19 07:58:12 2012
@@ -26,8 +26,8 @@ import java.io.IOException;
  */
 public class ByteMessage extends BSPMessage {
 
-  private byte[] tag;
-  private byte[] data;
+  public byte[] tag;
+  public byte[] data;
 
   public ByteMessage() {
     super();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/DoubleMessage.java Sat May 19 07:58:12 2012
@@ -22,12 +22,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 /**
- * A message that consists of a string tag and a double data. 
+ * A message that consists of a string tag and a double data.
  */
 public class DoubleMessage extends BSPMessage {
 
-  private String tag;
-  private Double data;
+  public String tag;
+  public Double data;
 
   public DoubleMessage() {
     super();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerDoubleMessage.java Sat May 19 07:58:12 2012
@@ -22,12 +22,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 /**
- * A message that consists of a int tag and a double data. 
+ * A message that consists of a int tag and a double data.
  */
 public class IntegerDoubleMessage extends BSPMessage {
 
-  int tag;
-  double data;
+  public int tag;
+  public double data;
 
   public IntegerDoubleMessage() {
     super();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/IntegerMessage.java Sat May 19 07:58:12 2012
@@ -22,12 +22,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 /**
- * A message that consists of a string tag and a int data. 
+ * A message that consists of a string tag and a int data.
  */
 public class IntegerMessage extends BSPMessage {
 
-  String tag;
-  int data;
+  public String tag;
+  public int data;
 
   public IntegerMessage() {
     super();
@@ -71,4 +71,4 @@ public class IntegerMessage extends BSPM
     this.data = (Integer) data;
   }
 
-}
\ No newline at end of file
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/type/LongMessage.java Sat May 19 07:58:12 2012
@@ -26,8 +26,8 @@ import java.io.IOException;
  */
 public class LongMessage extends BSPMessage {
 
-  private String tag;
-  private long data;
+  public String tag;
+  public long data;
 
   public LongMessage() {
     super();

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java Sat May 19 07:58:12 2012
@@ -37,6 +37,7 @@ public abstract class HamaCluster extend
     super(startDfs);
   }
 
+  @Override
   protected void setUp() throws Exception {
     super.setUp();
   }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java Sat May 19 07:58:12 2012
@@ -58,6 +58,7 @@ public class MiniBSPCluster {
         throw new NullPointerException("No Configuration for BSPMaster.");
     }  
 
+    @Override
     public void run(){
       try{
         LOG.info("Starting BSP Master.");
@@ -97,6 +98,7 @@ public class MiniBSPCluster {
       this.conf = conf;
     }
  
+    @Override
     public void run(){
       try{
         this.gs = GroomServer.constructGroomServer(GroomServer.class, conf);
@@ -203,7 +205,7 @@ public class MiniBSPCluster {
 
   }
 
-  private void randomPort(HamaConfiguration conf){
+  private static void randomPort(HamaConfiguration conf){
     try{
       ServerSocket skt = new ServerSocket(0);
       int p = skt.getLocalPort(); 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Sat May 19 07:58:12 2012
@@ -19,9 +19,6 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.IOException;
-import java.util.ArrayList;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,27 +33,20 @@ import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.message.DiskQueue;
 import org.apache.hama.examples.ClassSerializePrinting;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
 
 public class TestBSPMasterGroomServer extends HamaCluster {
 
-  private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
-  static String TMP_OUTPUT = "/tmp/test-example/";
+  protected static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
+  public static String TMP_OUTPUT = "/tmp/test-example/";
   public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
-  static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout");
+  public static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout");
 
-  private HamaConfiguration configuration;
+  protected HamaConfiguration configuration;
 
   public TestBSPMasterGroomServer() {
     configuration = new HamaConfiguration();
     configuration.set("bsp.master.address", "localhost");
+    configuration.set("hama.child.redirect.log.console", "true");
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
     configuration.set("bsp.local.dir", "/tmp/hama-test");
@@ -68,10 +58,12 @@ public class TestBSPMasterGroomServer ex
             .getCanonicalName());
   }
 
+  @Override
   public void setUp() throws Exception {
     super.setUp();
   }
 
+  @Override
   public void tearDown() throws Exception {
     super.tearDown();
   }

Modified: incubator/hama/trunk/graph/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/pom.xml?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/graph/pom.xml (original)
+++ incubator/hama/trunk/graph/pom.xml Sat May 19 07:58:12 2012
@@ -37,6 +37,13 @@
       <artifactId>hama-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <finalName>hama-graph-${project.version}</finalName>

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Sat May 19 07:58:12 2012
@@ -30,7 +30,7 @@ public class GraphJob extends BSPJob {
 
   public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
   public final static String VERTEX_ID_CLASS_ATTR = "hama.graph.vertex.id.class";
-  public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.id.class";
+  public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.value.class";
   public final static String VERTEX_EDGE_VALUE_CLASS_ATTR = "hama.graph.vertex.edge.value.class";
 
   public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";

Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1340369&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Sat May 19 07:58:12 2012
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A message that is either MapWritable (for meta communication purposes) or a
+ * real message (vertex ID and value). It can be extended by adding flags, for
+ * example for a graph repair call.
+ */
+public final class GraphJobMessage implements Writable {
+
+  public static final int MAP_FLAG = 0x01;
+  public static final int VERTEX_FLAG = 0x02;
+  public static final int REPAIR_FLAG = 0x04;
+
+  // staticly defined because it is process-wide information, therefore in caps
+  // considered as a constant
+  public static Class<? extends Writable> VERTEX_ID_CLASS;
+  public static Class<? extends Writable> VERTEX_VALUE_CLASS;
+
+  private int flag = MAP_FLAG;
+  private MapWritable map;
+  private Writable vertexId;
+  private Writable vertexValue;
+
+  public GraphJobMessage() {
+  }
+
+  public GraphJobMessage(MapWritable map) {
+    this.flag = MAP_FLAG;
+    this.map = map;
+  }
+
+  public GraphJobMessage(Writable vertexId) {
+    this.flag = REPAIR_FLAG;
+    this.vertexId = vertexId;
+  }
+
+  public GraphJobMessage(Writable vertexId, Writable vertexValue) {
+    this.flag = VERTEX_FLAG;
+    this.vertexId = vertexId;
+    this.vertexValue = vertexValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(this.flag);
+    if (isVertexMessage()) {
+      // we don't need to write the classes because the other side has the same
+      // classes for the two entities.
+      vertexId.write(out);
+      vertexValue.write(out);
+    } else if (isMapMessage()) {
+      map.write(out);
+    } else {
+      vertexId.write(out);
+    }
+
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    flag = in.readByte();
+    if (isVertexMessage()) {
+      vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
+      vertexId.readFields(in);
+      vertexValue = ReflectionUtils.newInstance(VERTEX_VALUE_CLASS, null);
+      vertexValue.readFields(in);
+    } else if (isMapMessage()) {
+      map = new MapWritable();
+      map.readFields(in);
+    } else {
+      vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
+      vertexId.readFields(in);
+    }
+
+  }
+
+  public MapWritable getMap() {
+    return map;
+  }
+
+  public Writable getVertexId() {
+    return vertexId;
+  }
+
+  public Writable getVertexValue() {
+    return vertexValue;
+  }
+
+  public boolean isMapMessage() {
+    return flag == MAP_FLAG;
+  }
+
+  public boolean isVertexMessage() {
+    return flag == VERTEX_FLAG;
+  }
+
+  public boolean isRepairMessage() {
+    return flag == REPAIR_FLAG;
+  }
+
+  @Override
+  public String toString() {
+    return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
+        + vertexId + ", vertexValue=" + vertexValue + "]";
+  }
+
+}

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sat May 19 07:58:12 2012
@@ -51,7 +51,7 @@ import org.apache.hama.util.KeyValuePair
  */
 public class GraphJobRunner<VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE_TYPE extends Writable>
     extends
-    BSP<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> {
+    BSP<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> {
 
   private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
 
@@ -100,7 +100,7 @@ public class GraphJobRunner<VERTEX_ID ex
   @Override
   @SuppressWarnings("unchecked")
   public void setup(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer)
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
     this.conf = peer.getConfiguration();
     VertexWritable.CONFIGURATION = conf;
@@ -115,6 +115,9 @@ public class GraphJobRunner<VERTEX_ID ex
         GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class,
         Writable.class);
 
+    GraphJobMessage.VERTEX_ID_CLASS = vertexIdClass;
+    GraphJobMessage.VERTEX_VALUE_CLASS = vertexValueClass;
+
     boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
 
     if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
@@ -163,7 +166,7 @@ public class GraphJobRunner<VERTEX_ID ex
 
   @Override
   public void bsp(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer)
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
     maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
@@ -199,7 +202,7 @@ public class GraphJobRunner<VERTEX_ID ex
           }
         }
         for (String peerName : peer.getAllPeerNames()) {
-          peer.send(peerName, updatedCnt);
+          peer.send(peerName, new GraphJobMessage(updatedCnt));
         }
       }
       // if we have an aggregator defined, we must make an additional sync
@@ -207,7 +210,7 @@ public class GraphJobRunner<VERTEX_ID ex
       if (aggregator != null && iteration > 1) {
         peer.sync();
 
-        MapWritable updatedValues = (MapWritable) peer.getCurrentMessage();
+        MapWritable updatedValues = peer.getCurrentMessage().getMap();
         globalAggregatorResult = updatedValues.get(FLAG_AGGREGATOR_VALUE);
         globalAggregatorIncrement = (IntWritable) updatedValues
             .get(FLAG_AGGREGATOR_INCREMENT);
@@ -262,54 +265,62 @@ public class GraphJobRunner<VERTEX_ID ex
                   .getTimesAggregated());
         }
       }
-      peer.send(masterTask, updatedCnt);
+      peer.send(masterTask, new GraphJobMessage(updatedCnt));
       iteration++;
     }
   }
 
   @SuppressWarnings("unchecked")
   private Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> parseMessages(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer)
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    MapWritable msg = null;
+    GraphJobMessage msg = null;
     Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> msgMap = new HashMap<VERTEX_ID, LinkedList<VERTEX_VALUE>>();
-    while ((msg = (MapWritable) peer.getCurrentMessage()) != null) {
-      for (Entry<Writable, Writable> e : msg.entrySet()) {
-        VERTEX_ID vertexID = (VERTEX_ID) e.getKey();
-        if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
-          if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
-            updated = false;
-          } else {
-            globalUpdateCounts += ((IntWritable) e.getValue()).get();
-          }
-        } else if (aggregator != null && FLAG_AGGREGATOR_VALUE.equals(vertexID)) {
-          masterAggregator.aggregate((VERTEX_VALUE) e.getValue());
-        } else if (aggregator != null
-            && FLAG_AGGREGATOR_INCREMENT.equals(vertexID)) {
-          if (isAbstractAggregator) {
-            ((AbstractAggregator<VERTEX_VALUE>) masterAggregator)
-                .addTimesAggregated(((IntWritable) e.getValue()).get());
-          }
+    while ((msg = peer.getCurrentMessage()) != null) {
+      // either this is a vertex message or a directive that must be read as map
+      if (msg.isVertexMessage()) {
+        VERTEX_ID vertexID = (VERTEX_ID) msg.getVertexId();
+        VERTEX_VALUE value = (VERTEX_VALUE) msg.getVertexValue();
+        if (msgMap.containsKey(vertexID)) {
+          LinkedList<VERTEX_VALUE> msgs = msgMap.get(vertexID);
+          msgs.add(value);
+          msgMap.put(vertexID, msgs);
         } else {
-          VERTEX_VALUE value = (VERTEX_VALUE) e.getValue();
-          if (msgMap.containsKey(vertexID)) {
-            LinkedList<VERTEX_VALUE> msgs = msgMap.get(vertexID);
-            msgs.add(value);
-            msgMap.put(vertexID, msgs);
-          } else {
-            LinkedList<VERTEX_VALUE> msgs = new LinkedList<VERTEX_VALUE>();
-            msgs.add(value);
-            msgMap.put(vertexID, msgs);
+          LinkedList<VERTEX_VALUE> msgs = new LinkedList<VERTEX_VALUE>();
+          msgs.add(value);
+          msgMap.put(vertexID, msgs);
+        }
+      } else if (msg.isMapMessage()) {
+        for (Entry<Writable, Writable> e : msg.getMap().entrySet()) {
+          VERTEX_ID vertexID = (VERTEX_ID) e.getKey();
+          if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
+            if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
+              updated = false;
+            } else {
+              globalUpdateCounts += ((IntWritable) e.getValue()).get();
+            }
+          } else if (aggregator != null
+              && FLAG_AGGREGATOR_VALUE.equals(vertexID)) {
+            masterAggregator.aggregate((VERTEX_VALUE) e.getValue());
+          } else if (aggregator != null
+              && FLAG_AGGREGATOR_INCREMENT.equals(vertexID)) {
+            if (isAbstractAggregator) {
+              ((AbstractAggregator<VERTEX_VALUE>) masterAggregator)
+                  .addTimesAggregated(((IntWritable) e.getValue()).get());
+            }
           }
         }
+      } else {
+        throw new UnsupportedOperationException("Unknown message type? " + msg);
       }
+
     }
     return msgMap;
   }
 
   @SuppressWarnings("unchecked")
   private void loadVertices(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer,
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer,
       boolean repairNeeded) throws IOException {
     LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
     boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
@@ -360,7 +371,8 @@ public class GraphJobRunner<VERTEX_ID ex
       for (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> entry : entries) {
         List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> outEdges = entry.getOutEdges();
         for (Edge<VERTEX_ID, EDGE_VALUE_TYPE> e : outEdges) {
-          peer.send(e.getDestinationPeerName(), e.getDestinationVertexID());
+          peer.send(e.getDestinationPeerName(),
+              new GraphJobMessage(e.getDestinationVertexID()));
         }
       }
       try {
@@ -369,8 +381,9 @@ public class GraphJobRunner<VERTEX_ID ex
         // we can't really recover from that, so fail this task
         throw new RuntimeException(e);
       }
-      VERTEX_ID vertexName = null;
-      while ((vertexName = (VERTEX_ID) peer.getCurrentMessage()) != null) {
+      GraphJobMessage msg = null;
+      while ((msg = peer.getCurrentMessage()) != null) {
+        VERTEX_ID vertexName = (VERTEX_ID) msg.getVertexId();
         if (!vertices.containsKey(vertexName)) {
           Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
               .newInstance(
@@ -400,7 +413,7 @@ public class GraphJobRunner<VERTEX_ID ex
    */
   @Override
   public void cleanup(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer)
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
         .entrySet()) {
@@ -415,7 +428,7 @@ public class GraphJobRunner<VERTEX_ID ex
   }
 
   private boolean isMasterTask(
-      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer) {
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer) {
     return peer.getPeerName().equals(masterTask);
   }
 

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1340369&r1=1340368&r2=1340369&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Sat May 19 07:58:12 2012
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPPeer;
 
@@ -32,7 +31,7 @@ public abstract class Vertex<ID_TYPE ext
   private MSG_TYPE value;
   private ID_TYPE vertexID;
   protected GraphJobRunner<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> runner;
-  protected BSPPeer<VertexWritable<ID_TYPE, MSG_TYPE>, VertexArrayWritable, Writable, Writable, Writable> peer;
+  protected BSPPeer<VertexWritable<ID_TYPE, MSG_TYPE>, VertexArrayWritable, Writable, Writable, GraphJobMessage> peer;
   public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
 
   public Configuration getConf() {
@@ -51,9 +50,8 @@ public abstract class Vertex<ID_TYPE ext
   @Override
   public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)
       throws IOException {
-    MapWritable message = new MapWritable();
-    message.put(e.getDestinationVertexID(), msg);
-    peer.send(e.getDestinationPeerName(), message);
+    peer.send(e.getDestinationPeerName(),
+        new GraphJobMessage(e.getDestinationVertexID(), msg));
   }
 
   @Override

Added: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1340369&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (added)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Sat May 19 07:58:12 2012
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.bsp.TestBSPMasterGroomServer;
+import org.apache.hama.graph.example.PageRank;
+
+public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
+
+  private static final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>();
+  static {
+    Configuration conf = new HamaConfiguration();
+    VertexWritable.CONFIGURATION = conf;
+    // our first entry is null, because our indices in hama 3.0 pre calculated
+    // example starts at 1.
+    // FIXME This is really ugly.
+    String[] pages = new String[] { null, "twitter.com", "google.com",
+        "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
+        "youtube.com" };
+    String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
+        "5;4;6", "6;4", "7;2;4" };
+
+    for (int i = 0; i < lineArray.length; i++) {
+
+      String[] adjacencyStringArray = lineArray[i].split(";");
+      int vertexId = Integer.parseInt(adjacencyStringArray[0]);
+      String name = pages[vertexId];
+      @SuppressWarnings("unchecked")
+      VertexWritable<Text, DoubleWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+      for (int j = 1; j < adjacencyStringArray.length; j++) {
+        arr[j - 1] = new VertexWritable<Text, DoubleWritable>(
+            new DoubleWritable(0.0d), new Text(
+                pages[Integer.parseInt(adjacencyStringArray[j])]), Text.class,
+            DoubleWritable.class);
+      }
+      VertexArrayWritable wr = new VertexArrayWritable();
+      wr.set(arr);
+      tmp.put(new VertexWritable<Text, DoubleWritable>(name), wr);
+    }
+  }
+
+  private static String INPUT = "/tmp/pagerank-real-tmp.seq";
+  private static String OUTPUT = "/tmp/pagerank-real-out";
+
+  @Override
+  public void testSubmitJob() throws Exception {
+
+    generateSeqTestData(tmp);
+
+    GraphJob bsp = new GraphJob(configuration, PageRank.class);
+    bsp.setInputPath(new Path(INPUT));
+    bsp.setOutputPath(new Path(OUTPUT));
+    BSPJobClient jobClient = new BSPJobClient(configuration);
+    configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    assertEquals(this.numOfGroom, cluster.getGroomServers());
+    bsp.setNumBspTask(2);
+    LOG.info("Client finishes execution job.");
+    bsp.setJobName("Pagerank");
+    bsp.setVertexClass(PageRank.PageRankVertex.class);
+    // set the defaults
+    bsp.setMaxIteration(30);
+    bsp.set("hama.pagerank.alpha", "0.85");
+    // we need to include a vertex in its adjacency list,
+    // otherwise the pagerank result has a constant loss
+    bsp.set("hama.graph.self.ref", "true");
+    bsp.setAggregatorClass(AverageAggregator.class);
+
+    bsp.setVertexIDClass(Text.class);
+    bsp.setVertexValueClass(DoubleWritable.class);
+    bsp.setEdgeValueClass(NullWritable.class);
+
+    bsp.setInputFormat(SequenceFileInputFormat.class);
+    bsp.setPartitioner(HashPartitioner.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(Text.class);
+    bsp.setOutputValueClass(DoubleWritable.class);
+
+    long startTime = System.currentTimeMillis();
+    if (bsp.waitForCompletion(true)) {
+      verifyResult();
+      LOG.info("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    } else {
+      fail();
+    }
+  }
+
+  private void verifyResult() throws IOException {
+    double sum = 0.0;
+    FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    for (FileStatus fts : globStatus) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, fts.getPath(),
+          conf);
+      Text key = new Text();
+      DoubleWritable value = new DoubleWritable();
+
+      while (reader.next(key, value)) {
+        sum += value.get();
+      }
+    }
+    LOG.info("Sum is: " + sum);
+    assertTrue(sum > 0.99d && sum <= 1.1d);
+  }
+
+  private void generateSeqTestData(
+      Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp)
+      throws IOException {
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
+        INPUT), VertexWritable.class, VertexArrayWritable.class);
+    for (Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
+        .entrySet()) {
+      writer.append(e.getKey(), e.getValue());
+    }
+    writer.close();
+  }
+}

Added: incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1340369&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (added)
+++ incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Sat May 19 07:58:12 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Vertex;
+
+public class PageRank {
+  public static class PageRankVertex extends
+      Vertex<Text, DoubleWritable, NullWritable> {
+
+    static double DAMPING_FACTOR = 0.85;
+    static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
+
+    int numEdges;
+
+    @Override
+    public void setup(Configuration conf) {
+      String val = conf.get("hama.pagerank.alpha");
+      if (val != null) {
+        DAMPING_FACTOR = Double.parseDouble(val);
+      }
+      val = conf.get("hama.graph.max.convergence.error");
+      if (val != null) {
+        MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
+      }
+      numEdges = this.getOutEdges().size();
+    }
+
+    @Override
+    public void compute(Iterator<DoubleWritable> messages) throws IOException {
+      // initialize this vertex to 1 / count of global vertices in this graph
+      if (this.getSuperstepCount() == 0) {
+        this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
+      }
+
+      // in the first superstep, there are no messages to check
+      if (this.getSuperstepCount() >= 1) {
+        double sum = 0;
+        while (messages.hasNext()) {
+          DoubleWritable msg = messages.next();
+          sum += msg.get();
+        }
+        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+        this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+      }
+
+      // if we have not reached our global error yet, then proceed.
+      DoubleWritable globalError = getLastAggregatedValue();
+      if (globalError != null && this.getSuperstepCount() > 2
+          && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
+        return;
+      }
+      // in each superstep we are going to send a new rank to our neighbours
+      sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
+          / numEdges));
+    }
+  }
+}



Mime
View raw message