incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1336743 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/utils/ src/test/java/org/apache/giraph/ src/test/java/org/apache/giraph/graph/
Date Thu, 10 May 2012 15:17:44 GMT
Author: ssc
Date: Thu May 10 15:17:43 2012
New Revision: 1336743

URL: http://svn.apache.org/viewvc?rev=1336743&view=rev
Log:
GIRAPH-20 Move temporary test files from the project directory

Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/FileUtils.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestZooKeeperExt.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Thu May 10 15:17:43 2012
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-20. Move temporary test files from the project directory. (ssc)
+
   GIRAPH-37. Implement Netty-backed IPC. (aching)
 
   GIRAPH-184. Upgrade to junit4. (Devaraj K via jghoman)

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu May 10 15:17:43 2012
@@ -135,12 +135,9 @@ public class SimplePageRankVertex extend
     @Override
     public void preSuperstep() {
 
-      LongSumAggregator sumAggreg =
-          (LongSumAggregator) getAggregator("sum");
-      MinAggregator minAggreg =
-          (MinAggregator) getAggregator("min");
-      MaxAggregator maxAggreg =
-          (MaxAggregator) getAggregator("max");
+      LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
+      MinAggregator minAggreg = (MinAggregator) getAggregator("min");
+      MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
 
       if (getSuperstep() >= 3) {
         LOG.info("aggregatedNumVertices=" +

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu May 10 15:17:43 2012
@@ -130,8 +130,7 @@ public class GraphMapper<I extends Writa
     public void uncaughtException(Thread t, Throwable e) {
       LOG.fatal(
           "uncaughtException: OverrideExceptionHandler on thread " +
-              t.getName() + ", msg = " +  e.getMessage() +
-              ", exiting...", e);
+              t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
       System.exit(1);
     }
   }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java Thu May 10 15:17:43 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.google.common.base.Charsets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +43,8 @@ public class TextAggregatorWriter implem
   public static final int NEVER = 0;
   /** Signal for "write only the final values" frequency */
   public static final int AT_THE_END = -1;
+  /** Signal for "write values in every superstep" frequency */
+  public static final int ALWAYS = -1;
   /** The frequency of writing:
    *  - NEVER: never write, files aren't created at all
    *  - AT_THE_END: aggregators are written only when the computation is over
@@ -78,11 +81,10 @@ public class TextAggregatorWriter implem
       Map<String, Aggregator<Writable>> aggregators,
       long superstep) throws IOException {
     if (shouldWrite(superstep)) {
-      for (Entry<String, Aggregator<Writable>> a:
-        aggregators.entrySet()) {
-        output.writeUTF(aggregatorToString(a.getKey(),
-            a.getValue(),
-            superstep));
+      for (Entry<String, Aggregator<Writable>> a: aggregators.entrySet()) {
+        byte[] bytes = aggregatorToString(a.getKey(), a.getValue(), superstep)
+            .getBytes(Charsets.UTF_8);
+        output.write(bytes, 0, bytes.length);
       }
       output.flush();
     }

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/FileUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/FileUtils.java?rev=1336743&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/FileUtils.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/FileUtils.java Thu May 10 15:17:43 2012
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.Writer;
+
+/**
+ * Helper class for filesystem operations during testing
+ */
+public class FileUtils {
+
+  /**
+   * Utility class should not be instantiatable
+   */
+  private FileUtils() {
+  }
+
+  /**
+   * Create a temporary folder that will be removed after the test.
+   *
+   * @param vertexClass Used for generating the folder name.
+   * @return File object for the directory.
+   */
+  public static File createTestDir(Class<?> vertexClass)
+    throws IOException {
+    String systemTmpDir = System.getProperty("java.io.tmpdir");
+    long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
+    File testTempDir = new File(systemTmpDir, "giraph-" +
+        vertexClass.getSimpleName() + '-' + simpleRandomLong);
+    if (!testTempDir.mkdir()) {
+      throw new IOException("Could not create " + testTempDir);
+    }
+    testTempDir.deleteOnExit();
+    return testTempDir;
+  }
+
+  /**
+   * Make a temporary file.
+   *
+   * @param parent Parent directory.
+   * @param name File name.
+   * @return File object to temporary file.
+   * @throws IOException
+   */
+  public static File createTempFile(File parent, String name)
+    throws IOException {
+    return createTestTempFileOrDir(parent, name, false);
+  }
+
+  /**
+   * Make a temporary directory.
+   *
+   * @param parent Parent directory.
+   * @param name Directory name.
+   * @return File object to temporary file.
+   * @throws IOException
+   */
+  public static File createTempDir(File parent, String name)
+    throws IOException {
+    File dir = createTestTempFileOrDir(parent, name, true);
+    dir.delete();
+    return dir;
+  }
+
+  /**
+   * Create a test temp file or directory.
+   *
+   * @param parent Parent directory
+   * @param name Name of file
+   * @param dir Is directory?
+   * @return File object
+   * @throws IOException
+   */
+  public static File createTestTempFileOrDir(File parent, String name,
+    boolean dir) throws IOException {
+    File f = new File(parent, name);
+    f.deleteOnExit();
+    if (dir && !f.mkdirs()) {
+      throw new IOException("Could not make directory " + f);
+    }
+    return f;
+  }
+
+  /**
+   * Write lines to a file.
+   *
+   * @param file File to write lines to
+   * @param lines Strings written to the file
+   * @throws IOException
+   */
+  public static void writeLines(File file, String... lines)
+    throws IOException {
+    Writer writer = Files.newWriter(file, Charsets.UTF_8);
+    try {
+      for (String line : lines) {
+        writer.write(line);
+        writer.write('\n');
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
+
+  /**
+   * Recursively delete a directory
+   *
+   * @param dir Directory to delete
+   */
+  public static void delete(File dir) {
+    if (dir != null) {
+      new DeletingVisitor().accept(dir);
+    }
+  }
+
+  /**
+   * Deletes files.
+   */
+  private static class DeletingVisitor implements FileFilter {
+    @Override
+    public boolean accept(File f) {
+      if (!f.isFile()) {
+        f.listFiles(this);
+      }
+      f.delete();
+      return false;
+    }
+  }
+
+  /**
+   * Helper method to remove a path if it exists.
+   *
+   * @param conf Configuration to load FileSystem from
+   * @param path Path to remove
+   * @throws IOException
+   */
+  public static void deletePath(Configuration conf, String path)
+    throws IOException {
+    deletePath(conf, new Path(path));
+  }
+
+  /**
+   * Helper method to remove a path if it exists.
+   *
+   * @param conf Configuration to load FileSystem from
+   * @param path Path to remove
+   * @throws IOException
+   */
+  public static void deletePath(Configuration conf, Path path)
+    throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(path, true);
+  }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java Thu May 10 15:17:43 2012
@@ -19,7 +19,6 @@
 package org.apache.giraph.utils;
 
 import com.google.common.base.Charsets;
-import com.google.common.io.Closeables;
 import com.google.common.io.Files;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
@@ -31,9 +30,7 @@ import org.apache.zookeeper.server.ZooKe
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
-import java.io.Writer;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
@@ -99,16 +96,18 @@ public class InternalVertexRunner {
 
     File tmpDir = null;
     try {
-      // prepare input file, output folder and zookeeper folder
-      tmpDir = createTestDir(vertexClass);
-      File inputFile = createTempFile(tmpDir, "graph.txt");
-      File outputDir = createTempDir(tmpDir, "output");
-      File zkDir = createTempDir(tmpDir, "zooKeeper");
+      // Prepare input file, output folder and temporary folders
+      tmpDir = FileUtils.createTestDir(vertexClass);
+      File inputFile = FileUtils.createTempFile(tmpDir, "graph.txt");
+      File outputDir = FileUtils.createTempDir(tmpDir, "output");
+      File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
+      File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
+      File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
 
-      // write input data to disk
-      writeLines(inputFile, data);
+      // Write input data to disk
+      FileUtils.writeLines(inputFile, data);
 
-      // create and configure the job to run the vertex
+      // Create and configure the job to run the vertex
       GiraphJob job = new GiraphJob(vertexClass.getName());
       job.setVertexClass(vertexClass);
       job.setVertexInputFormatClass(vertexInputFormatClass);
@@ -125,6 +124,11 @@ public class InternalVertexRunner {
       conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" +
           String.valueOf(LOCAL_ZOOKEEPER_PORT));
 
+      conf.set(GiraphJob.ZOOKEEPER_DIR, zkDir.toString());
+      conf.set(GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY,
+          zkMgrDir.toString());
+      conf.set(GiraphJob.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+
       for (Map.Entry<String, String> param : params.entrySet()) {
         conf.set(param.getKey(), param.getValue());
       }
@@ -134,7 +138,7 @@ public class InternalVertexRunner {
       FileOutputFormat.setOutputPath(job.getInternalJob(),
                                      new Path(outputDir.toString()));
 
-      // configure a local zookeeper instance
+      // Configure a local zookeeper instance
       Properties zkProperties = new Properties();
       zkProperties.setProperty("tickTime", "2000");
       zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
@@ -150,7 +154,7 @@ public class InternalVertexRunner {
       QuorumPeerConfig qpConfig = new QuorumPeerConfig();
       qpConfig.parseProperties(zkProperties);
 
-      // create and run the zookeeper instance
+      // Create and run the zookeeper instance
       final InternalZooKeeper zookeeper = new InternalZooKeeper();
       final ServerConfig zkConfig = new ServerConfig();
       zkConfig.readFrom(qpConfig);
@@ -176,111 +180,11 @@ public class InternalVertexRunner {
       return Files.readLines(new File(outputDir, "part-m-00000"),
           Charsets.UTF_8);
     } finally {
-      if (tmpDir != null) {
-        new DeletingVisitor().accept(tmpDir);
-      }
+      FileUtils.delete(tmpDir);
     }
   }
 
-  /**
-   * Create a temporary folder that will be removed after the test.
-   *
-   * @param vertexClass Used for generating the folder name.
-   * @return File object for the directory.
-   */
-  private static File createTestDir(Class<?> vertexClass)
-    throws IOException {
-    String systemTmpDir = System.getProperty("java.io.tmpdir");
-    long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
-    File testTempDir = new File(systemTmpDir, "giraph-" +
-        vertexClass.getSimpleName() + '-' + simpleRandomLong);
-    if (!testTempDir.mkdir()) {
-      throw new IOException("Could not create " + testTempDir);
-    }
-    testTempDir.deleteOnExit();
-    return testTempDir;
-  }
 
-  /**
-   * Make a temporary file.
-   *
-   * @param parent Parent directory.
-   * @param name File name.
-   * @return File object to temporary file.
-   * @throws IOException
-   */
-  private static File createTempFile(File parent, String name)
-    throws IOException {
-    return createTestTempFileOrDir(parent, name, false);
-  }
-
-  /**
-   * Make a temporary directory.
-   *
-   * @param parent Parent directory.
-   * @param name Directory name.
-   * @return File object to temporary file.
-   * @throws IOException
-   */
-  private static File createTempDir(File parent, String name)
-    throws IOException {
-    File dir = createTestTempFileOrDir(parent, name, true);
-    dir.delete();
-    return dir;
-  }
-
-  /**
-   * Creae a test temp file or directory.
-   *
-   * @param parent Parent directory
-   * @param name Name of file
-   * @param dir Is directory?
-   * @return File object
-   * @throws IOException
-   */
-  private static File createTestTempFileOrDir(File parent, String name,
-      boolean dir) throws IOException {
-    File f = new File(parent, name);
-    f.deleteOnExit();
-    if (dir && !f.mkdirs()) {
-      throw new IOException("Could not make directory " + f);
-    }
-    return f;
-  }
-
-  /**
-   * Write lines to a file.
-   *
-   * @param file File to write lines to
-   * @param lines Strings written to the file
-   * @throws IOException
-   */
-  private static void writeLines(File file, String... lines)
-    throws IOException {
-    Writer writer = Files.newWriter(file, Charsets.UTF_8);
-    try {
-      for (String line : lines) {
-        writer.write(line);
-        writer.write('\n');
-      }
-    } finally {
-      Closeables.closeQuietly(writer);
-    }
-  }
-
-  /**
-   * Deletes files.
-   */
-  private static class DeletingVisitor implements FileFilter {
-    @Override
-    public boolean accept(File f) {
-      if (!f.isFile()) {
-        f.listFiles(this);
-      }
-      f.delete();
-      return false;
-    }
-  }
 
   /**
    * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java Thu May 10 15:17:43 2012
@@ -18,20 +18,24 @@
 
 package org.apache.giraph;
 
-import java.io.FileNotFoundException;
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.List;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
 import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.utils.FileUtils;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.junit.After;
 import org.junit.Before;
 
 /**
@@ -50,39 +54,142 @@ public class BspCase implements Watcher 
   private final String zkList = System.getProperty("prop.zookeeper.list");
   private String testName;
 
+  /** Default path for temporary files */
+  static final Path DEFAULT_TEMP_DIR =
+      new Path(System.getProperty("java.io.tmpdir"), "_giraphTests");
+
+  /** A filter for listing parts files */
+  static final PathFilter PARTS_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith("part-");
+    }
+  };
+
   /**
    * Adjust the configuration to the basic test case
    */
-  public final void setupConfiguration(GiraphJob job) {
+  public final Configuration setupConfiguration(GiraphJob job)
+      throws IOException {
     Configuration conf = job.getConfiguration();
     conf.set("mapred.jar", getJarLocation());
 
     // Allow this test to be run on a real Hadoop setup
-    if (getJobTracker() != null) {
+    if (runningInDistributedMode()) {
       System.out.println("setup: Sending job to job tracker " +
-          getJobTracker() + " with jar path " + getJarLocation()
+          jobTracker + " with jar path " + getJarLocation()
           + " for " + getName());
-      conf.set("mapred.job.tracker", getJobTracker());
-      job.setWorkerConfiguration(getNumWorkers(),
-          getNumWorkers(),
-          100.0f);
+      conf.set("mapred.job.tracker", jobTracker);
+      job.setWorkerConfiguration(getNumWorkers(), getNumWorkers(), 100.0f);
     }
     else {
       System.out.println("setup: Using local job runner with " +
-          "location " + getJarLocation() + " for "
-          + getName());
+          "location " + getJarLocation() + " for " + getName());
       job.setWorkerConfiguration(1, 1, 100.0f);
       // Single node testing
       conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
     }
     conf.setInt(GiraphJob.POLL_ATTEMPTS, 10);
-    conf.setInt(GiraphJob.POLL_MSECS, 3*1000);
+    conf.setInt(GiraphJob.POLL_MSECS, 3 * 1000);
     conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
     if (getZooKeeperList() != null) {
       job.setZooKeeperConfiguration(getZooKeeperList());
     }
     // GeneratedInputSplit will generate 5 vertices
     conf.setLong(GeneratedVertexReader.READER_VERTICES, 5);
+
+    // Setup pathes for temporary files
+    Path zookeeperDir = getTempPath("_bspZooKeeper");
+    Path zkManagerDir = getTempPath("_defaultZkManagerDir");
+    Path checkPointDir = getTempPath("_checkpoints");
+
+    // We might start several jobs per test, so we need to clean up here
+    FileUtils.deletePath(conf, zookeeperDir);
+    FileUtils.deletePath(conf, zkManagerDir);
+    FileUtils.deletePath(conf, checkPointDir);
+
+    conf.set(GiraphJob.ZOOKEEPER_DIR, zookeeperDir.toString());
+    conf.set(GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY,
+        zkManagerDir.toString());
+    conf.set(GiraphJob.CHECKPOINT_DIRECTORY, checkPointDir.toString());
+
+    return conf;
+  }
+
+  /**
+   * Create a temporary path
+   *
+   * @param name  name of the file to create in the temporary folder
+   * @return  newly created temporary path
+   */
+  protected Path getTempPath(String name) {
+    return new Path(DEFAULT_TEMP_DIR, name);
+  }
+
+  /**
+   * Prepare a GiraphJob for test purposes
+   *
+   * @param name  identifying name for the job
+   * @param vertexClass class of the vertex to run
+   * @param vertexInputFormatClass  inputformat to use
+   * @return  fully configured job instance
+   * @throws IOException
+   */
+  protected GiraphJob prepareJob(String name, Class<?> vertexClass,
+      Class<?> vertexInputFormatClass) throws IOException {
+    return prepareJob(name, vertexClass, vertexInputFormatClass, null,
+        null);
+  }
+
+  /**
+   * Prepare a GiraphJob for test purposes
+   *
+   * @param name  identifying name for the job
+   * @param vertexClass class of the vertex to run
+   * @param vertexInputFormatClass  inputformat to use
+   * @param vertexOutputFormatClass outputformat to use
+   * @param outputPath  destination path for the output
+   * @return  fully configured job instance
+   * @throws IOException
+   */
+  protected GiraphJob prepareJob(String name, Class<?> vertexClass,
+      Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
+      Path outputPath) throws IOException {
+    return prepareJob(name, vertexClass, null, vertexInputFormatClass,
+        vertexOutputFormatClass, outputPath);
+  }
+
+  /**
+   * Prepare a GiraphJob for test purposes
+   *
+   * @param name  identifying name for the job
+   * @param vertexClass class of the vertex to run
+   * @param workerContextClass class of the workercontext to use
+   * @param vertexInputFormatClass  inputformat to use
+   * @param vertexOutputFormatClass outputformat to use
+   * @param outputPath  destination path for the output
+   * @return  fully configured job instance
+   * @throws IOException
+   */
+  protected GiraphJob prepareJob(String name, Class<?> vertexClass,
+      Class<?> workerContextClass, Class<?> vertexInputFormatClass,
+      Class<?> vertexOutputFormatClass, Path outputPath) throws IOException {
+    GiraphJob job = new GiraphJob(name);
+    setupConfiguration(job);
+    job.setVertexClass(vertexClass);
+    job.setVertexInputFormatClass(vertexInputFormatClass);
+
+    if (workerContextClass != null) {
+      job.setWorkerContextClass(workerContextClass);
+    }
+    if (vertexOutputFormatClass != null) {
+      job.setVertexOutputFormatClass(vertexOutputFormatClass);
+    }
+    if (outputPath != null) {
+      removeAndSetOutput(job, outputPath);
+    }
+
+    return job;
   }
 
   private String getName() {
@@ -102,7 +209,7 @@ public class BspCase implements Watcher 
   /**
    * Get the number of workers used in the BSP application
    *
-   * @param numProcs number of processes to use
+   * @return number of workers
    */
   public int getNumWorkers() {
     return numWorkers;
@@ -125,29 +232,28 @@ public class BspCase implements Watcher 
   }
 
   /**
-   * Get the job tracker location
+   *  Are the tests executed on a real hadoop instance?
    *
-   * @return job tracker location as a string
+   *  @return whether we use a real hadoop instance or not
    */
-  String getJobTracker() {
-    return jobTracker;
+  boolean runningInDistributedMode() {
+    return jobTracker != null;
   }
 
   /**
    * Get the single part file status and make sure there is only one part
    *
-   * @param job Job to get the file system from
+   * @param conf Configuration to get the file system from
    * @param partDirPath Directory where the single part file should exist
    * @return Single part file status
    * @throws IOException
    */
-  public static FileStatus getSinglePartFileStatus(GiraphJob job,
+  public static FileStatus getSinglePartFileStatus(Configuration conf,
       Path partDirPath) throws IOException {
-    FileSystem fs = FileSystem.get(job.getConfiguration());
-    FileStatus[] statusArray = fs.listStatus(partDirPath);
+    FileSystem fs = FileSystem.get(conf);
     FileStatus singlePartFileStatus = null;
     int partFiles = 0;
-    for (FileStatus fileStatus : statusArray) {
+    for (FileStatus fileStatus : fs.listStatus(partDirPath)) {
       if (fileStatus.getPath().getName().equals("part-m-00000")) {
         singlePartFileStatus = fileStatus;
       }
@@ -155,46 +261,58 @@ public class BspCase implements Watcher 
         ++partFiles;
       }
     }
-    if (partFiles != 1) {
-      throw new ArithmeticException(
-          "getSinglePartFile: Part file count should be 1, but is " +
-              partFiles);
-    }
+
+    Preconditions.checkState(partFiles == 1, "getSinglePartFile: Part file " +
+        "count should be 1, but is " + partFiles);
+
     return singlePartFileStatus;
   }
 
+  /**
+   * Read all parts- files in the output and count their lines. This works only for textual output!
+   *
+   * @param conf
+   * @param outputPath
+   * @return
+   * @throws IOException
+   */
+  public int getNumResults(Configuration conf, Path outputPath)
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    int numResults = 0;
+    for (FileStatus status : fs.listStatus(outputPath, PARTS_FILTER)) {
+      FSDataInputStream in = null;
+      BufferedReader reader = null;
+      try {
+        in = fs.open(status.getPath());
+        reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
+        while (reader.readLine() != null) {
+          numResults++;
+        }
+      } finally {
+        Closeables.closeQuietly(in);
+        Closeables.closeQuietly(reader);
+      }
+    }
+    return numResults;
+  }
+
   @Before
   public void setUp() {
-    if (jobTracker != null) {
+    if (runningInDistributedMode()) {
       System.out.println("Setting tasks to 3 for " + getName() +
           " since JobTracker exists...");
       numWorkers = 3;
     }
     try {
-      Configuration conf = new Configuration();
-      FileSystem hdfs = FileSystem.get(conf);
-      // Since local jobs always use the same paths, remove them
-      Path oldLocalJobPaths = new Path(
-          GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT);
-      FileStatus[] fileStatusArr;
-      try {
-        fileStatusArr = hdfs.listStatus(oldLocalJobPaths);
-        for (FileStatus fileStatus : fileStatusArr) {
-          if (fileStatus.isDir() &&
-              fileStatus.getPath().getName().contains("job_local")) {
-            System.out.println("Cleaning up local job path " +
-                fileStatus.getPath().getName());
-            hdfs.delete(oldLocalJobPaths, true);
-          }
-        }
-      } catch (FileNotFoundException e) {
-        // ignore this FileNotFound exception and continue.
-      }
+
+      cleanupTemporaryFiles();
+
       if (zkList == null) {
         return;
       }
       ZooKeeperExt zooKeeperExt =
-          new ZooKeeperExt(zkList, 30*1000, this);
+          new ZooKeeperExt(zkList, 30 * 1000, this);
       List<String> rootChildren = zooKeeperExt.getChildren("/", false);
       for (String rootChild : rootChildren) {
         if (rootChild.startsWith("_hadoopBsp")) {
@@ -216,6 +334,18 @@ public class BspCase implements Watcher 
     }
   }
 
+  @After
+  public void tearDown() throws IOException {
+    cleanupTemporaryFiles();
+  }
+
+  /**
+   * Remove temporary files
+   */
+  private void cleanupTemporaryFiles() throws IOException {
+    FileUtils.deletePath(new Configuration(), DEFAULT_TEMP_DIR);
+  }
+
   @Override
   public void process(WatchedEvent event) {
     // Do nothing
@@ -227,28 +357,15 @@ public class BspCase implements Watcher 
    * FileOutputFormat.
    *
    * @param job Job to set the output path for
-   * @param outputPathString Path to output as a string
+   * @param outputPath Path to output
    * @throws IOException
    */
   public static void removeAndSetOutput(GiraphJob job,
       Path outputPath) throws IOException {
-    remove(job.getConfiguration(), outputPath);
+    FileUtils.deletePath(job.getConfiguration(), outputPath);
     FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
   }
 
-  /**
-   * Helper method to remove a path if it exists.
-   *
-   * @param conf Configuration to load FileSystem from
-   * @param path Path to remove
-   * @throws IOException
-   */
-  public static void remove(Configuration conf, Path path)
-      throws IOException {
-    FileSystem hdfs = FileSystem.get(conf);
-    hdfs.delete(path, true);
-  }
-
   public static String getCallingMethodName() {
     return Thread.currentThread().getStackTrace()[2].getMethodName();
   }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java Thu May 10 15:17:43 2012
@@ -26,6 +26,7 @@ import org.apache.giraph.examples.Simple
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 
@@ -33,19 +34,6 @@ import org.junit.Test;
  * Unit test for automated checkpoint restarting
  */
 public class TestAutoCheckpoint extends BspCase {
-  /** Where the checkpoints will be stored and restarted */
-  private final String HDFS_CHECKPOINT_DIR =
-      "/tmp/testBspCheckpoints";
-
-  /**
-   * Create the test case
-   *
-   * @param testName name of the test case
-   */
-  public TestAutoCheckpoint(String testName) {
-    super(testName);
-  }
-
 
   public TestAutoCheckpoint() {
     super(TestAutoCheckpoint.class.getName());
@@ -62,31 +50,30 @@ public class TestAutoCheckpoint extends 
   @Test
   public void testSingleFault()
     throws IOException, InterruptedException, ClassNotFoundException {
-    if (getJobTracker() == null) {
+    if (!runningInDistributedMode()) {
       System.out.println(
           "testSingleFault: Ignore this test in local mode.");
       return;
     }
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.getConfiguration().setBoolean(SimpleCheckpointVertex.ENABLE_FAULT,
-        true);
-    job.getConfiguration().setInt("mapred.map.max.attempts", 4);
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleCheckpointVertex.class,
+        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+        SimpleSuperstepVertexInputFormat.class,
+        SimpleSuperstepVertexOutputFormat.class,
+        outputPath);
+
+    Configuration conf = job.getConfiguration();
+    conf.setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, true);
+    conf.setInt("mapred.map.max.attempts", 4);
     // Trigger failure faster
-    job.getConfiguration().setInt("mapred.task.timeout", 30000);
-    job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 5000);
-    job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2);
-    job.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
-        HDFS_CHECKPOINT_DIR);
-    job.getConfiguration().setBoolean(
-        GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
-    job.setVertexClass(SimpleCheckpointVertex.class);
-    job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-    job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-    job.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
-    Path outputPath = new Path("/tmp/" + getCallingMethodName());
-    removeAndSetOutput(job, outputPath);
+    conf.setInt("mapred.task.timeout", 30000);
+    conf.setInt(GiraphJob.POLL_MSECS, 5000);
+    conf.setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2);
+    conf.set(GiraphJob.CHECKPOINT_DIRECTORY,
+        getTempPath("_singleFaultCheckpoints").toString());
+    conf.setBoolean(GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+
     assertTrue(job.run(true));
   }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Thu May 10 15:17:43 2012
@@ -20,62 +20,63 @@ package org.apache.giraph;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
-import org.apache.giraph.examples.SimpleAggregatorWriter;
-import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
-import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
 import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.LongSumAggregator;
+import org.apache.giraph.examples.MaxAggregator;
+import org.apache.giraph.examples.MinAggregator;
 import org.apache.giraph.examples.SimpleCombinerVertex;
 import org.apache.giraph.examples.SimpleFailVertex;
 import org.apache.giraph.examples.SimpleMsgVertex;
 import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimpleShortestPathsVertex;
+import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat;
 import org.apache.giraph.examples.SimpleSumCombiner;
 import org.apache.giraph.examples.SimpleSuperstepVertex;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.TextAggregatorWriter;
 import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.BasicVertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobContext;
 /*if[HADOOP_NON_SASL_RPC]
 else[HADOOP_NON_SASL_RPC]*/
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 /*end[HADOOP_NON_SASL_RPC]*/
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
+import java.util.Map;
+
 import org.junit.Test;
 
 /**
  * Unit test for many simple BSP applications.
  */
 public class TestBspBasic extends BspCase {
-  /**
-   * Create the test case
-   *
-   * @param testName name of the test case
-   */
-  public TestBspBasic(String testName) {
-    super(testName);
-  }
 
   public TestBspBasic() {
     super(TestBspBasic.class.getName());
@@ -101,9 +102,8 @@ public class TestBspBasic extends BspCas
       InvocationTargetException, SecurityException, NoSuchMethodException {
     System.out.println("testInstantiateVertex: java.class.path=" +
         System.getProperty("java.class.path"));
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    job.setVertexClass(SimpleSuperstepVertex.class);
-    job.setVertexInputFormatClass(
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleSuperstepVertex.class,
         SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
     GraphState<LongWritable, IntWritable, FloatWritable, IntWritable> gs =
         new GraphState<LongWritable, IntWritable,
@@ -126,8 +126,7 @@ public class TestBspBasic extends BspCas
       /*end[HADOOP_NON_SASL_RPC]*/
     ByteArrayOutputStream byteArrayOutputStream =
         new ByteArrayOutputStream();
-    DataOutputStream outputStream =
-        new DataOutputStream(byteArrayOutputStream);
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
     ((Writable) splitArray.get(0)).write(outputStream);
     System.out.println("testInstantiateVertex: Example output split = " +
         byteArrayOutputStream.toString());
@@ -143,27 +142,26 @@ public class TestBspBasic extends BspCas
   @Test
   public void testLocalJobRunnerConfig()
       throws IOException, InterruptedException, ClassNotFoundException {
-    if (getJobTracker() != null) {
+    if (runningInDistributedMode()) {
       System.out.println("testLocalJobRunnerConfig: Skipping for " +
           "non-local");
       return;
     }
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleSuperstepVertex.class, SimpleSuperstepVertexInputFormat.class);
     job.setWorkerConfiguration(5, 5, 100.0f);
     job.getConfiguration().setBoolean(GiraphJob.SPLIT_MASTER_WORKER, true);
-    job.setVertexClass(SimpleSuperstepVertex.class);
-    job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+
     try {
       job.run(true);
-      assertTrue(false);
+      fail();
     } catch (IllegalArgumentException e) {
     }
 
     job.getConfiguration().setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
     try {
       job.run(true);
-      assertTrue(false);
+      fail();
     } catch (IllegalArgumentException e) {
     }
     job.setWorkerConfiguration(1, 1, 100.0f);
@@ -182,18 +180,15 @@ public class TestBspBasic extends BspCas
   public void testBspFail()
       throws IOException, InterruptedException, ClassNotFoundException {
     // Allow this test only to be run on a real Hadoop setup
-    if (getJobTracker() == null) {
+    if (!runningInDistributedMode()) {
       System.out.println("testBspFail: not executed for local setup.");
       return;
     }
 
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
+    GiraphJob job = prepareJob(getCallingMethodName(), SimpleFailVertex.class,
+        SimplePageRankVertexInputFormat.class, null,
+        getTempPath(getCallingMethodName()));
     job.getConfiguration().setInt("mapred.map.max.attempts", 1);
-    job.setVertexClass(SimpleFailVertex.class);
-    job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
-    Path outputPath = new Path("/tmp/" + getCallingMethodName());
-    removeAndSetOutput(job, outputPath);
     assertTrue(!job.run(true));
   }
 
@@ -207,22 +202,18 @@ public class TestBspBasic extends BspCas
   @Test
   public void testBspSuperStep()
       throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.getConfiguration().setFloat(GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER,
-        2.0f);
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleSuperstepVertex.class, SimpleSuperstepVertexInputFormat.class,
+        SimpleSuperstepVertexOutputFormat.class, outputPath);
+    Configuration conf = job.getConfiguration();
+    conf.setFloat(GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER, 2.0f);
     // GeneratedInputSplit will generate 10 vertices
-    job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES,
-        10);
-    job.setVertexClass(SimpleSuperstepVertex.class);
-    job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-    job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-    Path outputPath = new Path("/tmp/" + getCallingMethodName());
-    removeAndSetOutput(job, outputPath);
+    conf.setLong(GeneratedVertexReader.READER_VERTICES, 10);
     assertTrue(job.run(true));
-    if (getJobTracker() == null) {
-      FileStatus fileStatus = getSinglePartFileStatus(job, outputPath);
-      assertTrue(fileStatus.getLen() == 49);
+    if (!runningInDistributedMode()) {
+      FileStatus fileStatus = getSinglePartFileStatus(conf, outputPath);
+      assertEquals(49l, fileStatus.getLen());
     }
   }
 
@@ -236,10 +227,8 @@ public class TestBspBasic extends BspCas
   @Test
   public void testBspMsg()
       throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(SimpleMsgVertex.class);
-    job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), SimpleMsgVertex.class,
+        SimpleSuperstepVertexInputFormat.class);
     assertTrue(job.run(true));
   }
 
@@ -255,12 +244,9 @@ public class TestBspBasic extends BspCas
   @Test
   public void testEmptyVertexInputFormat()
       throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES,
-        0);
-    job.setVertexClass(SimpleMsgVertex.class);
-    job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), SimpleMsgVertex.class,
+        SimpleSuperstepVertexInputFormat.class);
+    job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES, 0);
     assertTrue(job.run(true));
   }
 
@@ -274,10 +260,8 @@ public class TestBspBasic extends BspCas
   @Test
   public void testBspCombiner()
       throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(SimpleCombinerVertex.class);
-    job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleCombinerVertex.class, SimpleSuperstepVertexInputFormat.class);
     job.setVertexCombinerClass(SimpleSumCombiner.class);
     assertTrue(job.run(true));
   }
@@ -292,14 +276,12 @@ public class TestBspBasic extends BspCas
   @Test
   public void testBspPageRank()
       throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(SimplePageRankVertex.class);
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimplePageRankVertex.class, SimplePageRankVertexInputFormat.class);
     job.setWorkerContextClass(
         SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
-    job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
     assertTrue(job.run(true));
-    if (getJobTracker() == null) {
+    if (!runningInDistributedMode()) {
       double maxPageRank =
           SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
       double minPageRank =
@@ -307,13 +289,10 @@ public class TestBspBasic extends BspCas
       long numVertices =
           SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
       System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
-          " minPageRank=" + minPageRank +
-          " numVertices=" + numVertices);
-      assertTrue("34.030 !< " + maxPageRank + " !< " + " 34.0301",
-          maxPageRank > 34.030 && maxPageRank < 34.0301);
-      assertTrue("0.03 !< " + minPageRank + " !< " + "0.03001",
-          minPageRank > 0.03 && minPageRank < 0.03001);
-      assertTrue("numVertices = " + numVertices + " != 5", numVertices == 5);
+          " minPageRank=" + minPageRank + " numVertices=" + numVertices);
+      assertEquals(34.03, maxPageRank, 0.001);
+      assertEquals(0.03, minPageRank, 0.00001);
+      assertEquals(5l, numVertices);
     }
   }
 
@@ -327,32 +306,21 @@ public class TestBspBasic extends BspCas
   @Test
   public void testBspShortestPaths()
       throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(SimpleShortestPathsVertex.class);
-    job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
-    job.setVertexOutputFormatClass(
-        SimpleShortestPathsVertexOutputFormat.class);
-    job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID, 0);
-    Path outputPath = new Path("/tmp/" + getCallingMethodName());
-    removeAndSetOutput(job, outputPath);
-    assertTrue(job.run(true));
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleShortestPathsVertex.class,
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class,
+        SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat.class,
+        outputPath);
+    Configuration conf = job.getConfiguration();
+    conf.setLong(SimpleShortestPathsVertex.SOURCE_ID, 0);
 
-    job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(SimpleShortestPathsVertex.class);
-    job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
-    job.setVertexOutputFormatClass(
-        SimpleShortestPathsVertexOutputFormat.class);
-    job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID, 0);
-    Path outputPath2 = new Path("/tmp/" + getCallingMethodName() + "2");
-    removeAndSetOutput(job, outputPath2);
     assertTrue(job.run(true));
-    if (getJobTracker() == null) {
-      FileStatus fileStatus = getSinglePartFileStatus(job, outputPath);
-      FileStatus fileStatus2 = getSinglePartFileStatus(job, outputPath2);
-      assertTrue(fileStatus.getLen() == fileStatus2.getLen());
-    }
+
+    int numResults = getNumResults(job.getConfiguration(), outputPath);
+
+    int expectedNumResults = runningInDistributedMode() ? 15 : 5;
+    assertEquals(expectedNumResults, numResults);
   }
 
   /**
@@ -365,56 +333,82 @@ public class TestBspBasic extends BspCas
   @Test
   public void testBspPageRankWithAggregatorWriter()
       throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(SimplePageRankVertex.class);
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimplePageRankVertex.class,
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class,
+        SimplePageRankVertex.SimplePageRankVertexOutputFormat.class,
+        outputPath);
     job.setWorkerContextClass(
         SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
-    job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
-    job.setAggregatorWriterClass(SimpleAggregatorWriter.class);
-    Path outputPath = new Path("/tmp/" + getCallingMethodName());
-    removeAndSetOutput(job, outputPath);
+
+    Configuration conf = job.getConfiguration();
+
+    job.setAggregatorWriterClass(TextAggregatorWriter.class);
+    Path aggregatorValues = getTempPath("aggregatorValues");
+    conf.setInt(TextAggregatorWriter.FREQUENCY, TextAggregatorWriter.ALWAYS);
+    conf.set(TextAggregatorWriter.FILENAME, aggregatorValues.toString());
+
     assertTrue(job.run(true));
-    if (getJobTracker() == null) {
-      double maxPageRank =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
-      double minPageRank =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
-      long numVertices =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
-      System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
-          " minPageRank=" + minPageRank +
-          " numVertices=" + numVertices);
-      FileSystem fs = FileSystem.get(new Configuration());
-      FSDataInputStream input =
-          fs.open(new Path(SimpleAggregatorWriter.getFilename()));
-      int i, all;
-      for (i = 0; ; i++) {
-        all = 0;
+
+    FileSystem fs = FileSystem.get(conf);
+    Path valuesFile = new Path(aggregatorValues.toString() + "_0");
+
+    try {
+      if (!runningInDistributedMode()) {
+        double maxPageRank =
+            SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
+        double minPageRank =
+            SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
+        long numVertices =
+            SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
+        System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
+            " minPageRank=" + minPageRank + " numVertices=" + numVertices);
+
+        FSDataInputStream in = null;
+        BufferedReader reader = null;
         try {
-          DoubleWritable max = new DoubleWritable();
-          max.readFields(input);
-          all++;
-          DoubleWritable min = new DoubleWritable();
-          min.readFields(input);
-          all++;
-          LongWritable sum = new LongWritable();
-          sum.readFields(input);
-          all++;
-          if (i > 0) {
-            assertTrue(max.get() == maxPageRank);
-            assertTrue(min.get() == minPageRank);
-            assertTrue(sum.get() == numVertices);
+          Map<Integer, Double> minValues = Maps.newHashMap();
+          Map<Integer, Double> maxValues = Maps.newHashMap();
+          Map<Integer, Long> vertexCounts = Maps.newHashMap();
+
+          in = fs.open(valuesFile);
+          reader = new BufferedReader(new InputStreamReader(in,
+              Charsets.UTF_8));
+          String line;
+          while ((line = reader.readLine()) != null) {
+            String[] tokens = line.split("\t");
+            int superstep = Integer.parseInt(tokens[0].split("=")[1]);
+            String value = (tokens[1].split("=")[1]);
+            String aggregatorName = tokens[2];
+
+            if (MinAggregator.class.getName().equals(aggregatorName)) {
+              minValues.put(superstep, Double.parseDouble(value));
+            }
+            if (MaxAggregator.class.getName().equals(aggregatorName)) {
+              maxValues.put(superstep, Double.parseDouble(value));
+            }
+            if (LongSumAggregator.class.getName().equals(aggregatorName)) {
+              vertexCounts.put(superstep, Long.parseLong(value));
+            }
           }
-        } catch (IOException e) {
-          break;
+
+          int maxSuperstep = SimplePageRankVertex.MAX_SUPERSTEPS;
+          assertEquals(maxSuperstep + 1, minValues.size());
+          assertEquals(maxSuperstep + 1, maxValues.size());
+          assertEquals(maxSuperstep + 1, vertexCounts.size());
+
+          assertEquals(maxPageRank, maxValues.get(maxSuperstep));
+          assertEquals(minPageRank, minValues.get(maxSuperstep));
+          assertEquals(numVertices, vertexCounts.get(maxSuperstep));
+
+        } finally {
+          Closeables.closeQuietly(in);
+          Closeables.closeQuietly(reader);
         }
       }
-      input.close();
-      // contained all supersteps
-      assertTrue(i == SimplePageRankVertex.MAX_SUPERSTEPS+1 && all == 0);
-      remove(new Configuration(),
-          new Path(SimpleAggregatorWriter.getFilename()));
+    } finally {
+      fs.delete(valuesFile, false);
     }
   }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java Thu May 10 15:17:43 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -39,19 +40,26 @@ import org.junit.Test;
  * Unit test for manual checkpoint restarting
  */
 public class TestGraphPartitioner extends BspCase {
-    /**
-     * Create the test case
-     *
-     * @param testName name of the test case
-     */
-    public TestGraphPartitioner(String testName) {
-        super(testName);
-    }
-    
+
     public TestGraphPartitioner() {
         super(TestGraphPartitioner.class.getName());
     }
 
+    private void verifyOutput(FileSystem fs, Path outputPath)
+        throws IOException {
+      final int correctLen = 123;
+      if (runningInDistributedMode()) {
+        FileStatus [] fileStatusArr = fs.listStatus(outputPath);
+        int totalLen = 0;
+        for (FileStatus fileStatus : fileStatusArr) {
+          if (fileStatus.getPath().toString().contains("/part-m-")) {
+            totalLen += fileStatus.getLen();
+          }
+        }
+        assertEquals(correctLen, totalLen);
+      }
+    }
+
     /**
      * Run a sample BSP job locally and test various partitioners and
      * partition algorithms.
@@ -63,76 +71,43 @@ public class TestGraphPartitioner extend
     @Test
     public void testPartitioners()
             throws IOException, InterruptedException, ClassNotFoundException {
-        final int correctLen = 123;
 
-        GiraphJob job = new GiraphJob("testVertexBalancer");
-        setupConfiguration(job);
-        job.setVertexClass(SimpleCheckpointVertex.class);
-        job.setWorkerContextClass(
-            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
-        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        Path outputPath = getTempPath("testVertexBalancer");
+        GiraphJob job = prepareJob("testVertexBalancer",
+            SimpleCheckpointVertex.class,
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+            SimpleSuperstepVertexInputFormat.class,
+            SimpleSuperstepVertexOutputFormat.class, outputPath);
+
         job.getConfiguration().set(
             PartitionBalancer.PARTITION_BALANCE_ALGORITHM,
             PartitionBalancer.VERTICES_BALANCE_ALGORITHM);
-        Path outputPath = new Path("/tmp/testVertexBalancer");
-        removeAndSetOutput(job, outputPath);
+
         assertTrue(job.run(true));
         FileSystem hdfs = FileSystem.get(job.getConfiguration());
-        if (getJobTracker() != null) {
-            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
-            int totalLen = 0;
-            for (FileStatus fileStatus : fileStatusArr) {
-                if (fileStatus.getPath().toString().contains("/part-m-")) {
-                    totalLen += fileStatus.getLen();
-                }
-            }
-            assertTrue(totalLen == correctLen);
-        }
 
-        job = new GiraphJob("testHashPartitioner");
-        setupConfiguration(job);
-        job.setVertexClass(SimpleCheckpointVertex.class);
-        job.setWorkerContextClass(
-            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
-        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-        outputPath = new Path("/tmp/testHashPartitioner");
-        removeAndSetOutput(job, outputPath);
+
+        outputPath = getTempPath("testHashPartitioner");
+        job = prepareJob("testHashPartitioner", SimpleCheckpointVertex.class,
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+            SimpleSuperstepVertexInputFormat.class,
+            SimpleSuperstepVertexOutputFormat.class, outputPath);
         assertTrue(job.run(true));
-        if (getJobTracker() != null) {
-            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
-            int totalLen = 0;
-            for (FileStatus fileStatus : fileStatusArr) {
-                if (fileStatus.getPath().toString().contains("/part-m-")) {
-                    totalLen += fileStatus.getLen();
-                }
-            }
-            assertTrue(totalLen == correctLen);
-        }
+        verifyOutput(hdfs, outputPath);
+
+        outputPath = getTempPath("testSuperstepHashPartitioner");
+        job = prepareJob("testSuperstepHashPartitioner",
+            SimpleCheckpointVertex.class,
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+            SimpleSuperstepVertexInputFormat.class,
+            SimpleSuperstepVertexOutputFormat.class,
+            outputPath);
 
-        job = new GiraphJob("testSuperstepHashPartitioner");
-        setupConfiguration(job);
-        job.setVertexClass(SimpleCheckpointVertex.class);
-        job.setWorkerContextClass(
-            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
-        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
         job.setGraphPartitionerFactoryClass(
             SuperstepHashPartitionerFactory.class);
-        outputPath = new Path("/tmp/testSuperstepHashPartitioner");
-        removeAndSetOutput(job, outputPath);
+
         assertTrue(job.run(true));
-        if (getJobTracker() != null) {
-            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
-            int totalLen = 0;
-            for (FileStatus fileStatus : fileStatusArr) {
-                if (fileStatus.getPath().toString().contains("/part-m-")) {
-                    totalLen += fileStatus.getLen();
-                }
-            }
-            assertTrue(totalLen == correctLen);
-        }
+        verifyOutput(hdfs, outputPath);
 
         job = new GiraphJob("testHashRangePartitioner");
         setupConfiguration(job);
@@ -146,41 +121,19 @@ public class TestGraphPartitioner extend
         outputPath = new Path("/tmp/testHashRangePartitioner");
         removeAndSetOutput(job, outputPath);
         assertTrue(job.run(true));
-        if (getJobTracker() != null) {
-            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
-            int totalLen = 0;
-            for (FileStatus fileStatus : fileStatusArr) {
-                if (fileStatus.getPath().toString().contains("/part-m-")) {
-                    totalLen += fileStatus.getLen();
-                }
-            }
-            assertTrue(totalLen == correctLen);
-        }
+        verifyOutput(hdfs, outputPath);
 
-        job = new GiraphJob("testReverseIdSuperstepHashPartitioner");
-        setupConfiguration(job);
-        job.setVertexClass(SimpleCheckpointVertex.class);
-        job.setWorkerContextClass(
-            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
-        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        outputPath = getTempPath("testReverseIdSuperstepHashPartitioner");
+        job = prepareJob("testReverseIdSuperstepHashPartitioner",
+            SimpleCheckpointVertex.class,
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+            SimpleSuperstepVertexInputFormat.class,
+            SimpleSuperstepVertexOutputFormat.class, outputPath);
         job.setGraphPartitionerFactoryClass(
             SuperstepHashPartitionerFactory.class);
         job.getConfiguration().setBoolean(
-            GeneratedVertexReader.REVERSE_ID_ORDER,
-            true);
-        outputPath = new Path("/tmp/testReverseIdSuperstepHashPartitioner");
-        removeAndSetOutput(job, outputPath);
+            GeneratedVertexReader.REVERSE_ID_ORDER, true);
         assertTrue(job.run(true));
-        if (getJobTracker() != null) {
-            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
-            int totalLen = 0;
-            for (FileStatus fileStatus : fileStatusArr) {
-                if (fileStatus.getPath().toString().contains("/part-m-")) {
-                    totalLen += fileStatus.getLen();
-                }
-            }
-            assertTrue(totalLen == correctLen);
-        }
+        verifyOutput(hdfs, outputPath);
     }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java Thu May 10 15:17:43 2012
@@ -17,6 +17,7 @@
  */
 
 package org.apache.giraph;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -27,6 +28,7 @@ import org.apache.giraph.benchmark.Pseud
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.lib.JsonBase64VertexInputFormat;
 import org.apache.giraph.lib.JsonBase64VertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -60,54 +62,43 @@ public class TestJsonBase64Format extend
    */
   @Test
   public void testContinue()
-    throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(PageRankBenchmark.class);
-    job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
-    job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
+      throws IOException, InterruptedException, ClassNotFoundException {
+
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphJob job = prepareJob(getCallingMethodName(), PageRankBenchmark.class,
+        PseudoRandomVertexInputFormat.class,
+        JsonBase64VertexOutputFormat.class, outputPath);
     job.getConfiguration().setLong(
         PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
     job.getConfiguration().setLong(
         PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
     job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 2);
-    Path outputPath = new Path("/tmp/" + getCallingMethodName());
-    removeAndSetOutput(job, outputPath);
+
     assertTrue(job.run(true));
 
-    job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(PageRankBenchmark.class);
-    job.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
-    job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
+    Path outputPath2 = getTempPath(getCallingMethodName() + "2");
+    job = prepareJob(getCallingMethodName(), PageRankBenchmark.class,
+        JsonBase64VertexInputFormat.class, JsonBase64VertexOutputFormat.class,
+        outputPath2);
     job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 3);
     FileInputFormat.setInputPaths(job.getInternalJob(), outputPath);
-    Path outputPath2 = new Path("/tmp/" + getCallingMethodName() + "2");
-    removeAndSetOutput(job, outputPath2);
     assertTrue(job.run(true));
 
-    FileStatus twoJobsFile = null;
-    if (getJobTracker() == null) {
-      twoJobsFile = getSinglePartFileStatus(job, outputPath);
-    }
-
-    job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
-    job.setVertexClass(PageRankBenchmark.class);
-    job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
-    job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
+    Path outputPath3 = getTempPath(getCallingMethodName() + "3");
+    job = prepareJob(getCallingMethodName(), PageRankBenchmark.class,
+        PseudoRandomVertexInputFormat.class,
+        JsonBase64VertexOutputFormat.class, outputPath3);
     job.getConfiguration().setLong(
         PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
     job.getConfiguration().setLong(
         PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
     job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 5);
-    Path outputPath3 = new Path("/tmp/" + getCallingMethodName() + "3");
-    removeAndSetOutput(job, outputPath3);
     assertTrue(job.run(true));
 
-    if (getJobTracker() == null) {
-      FileStatus oneJobFile = getSinglePartFileStatus(job, outputPath3);
-      assertTrue(twoJobsFile.getLen() == oneJobFile.getLen());
-    }
+    Configuration conf = job.getConfiguration();
+
+    assertEquals(101, getNumResults(conf, outputPath));
+    assertEquals(101, getNumResults(conf, outputPath2));
+    assertEquals(101, getNumResults(conf, outputPath3));
   }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java Thu May 10 15:17:43 2012
@@ -17,6 +17,7 @@
  */
 
 package org.apache.giraph;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -33,18 +34,6 @@ import org.junit.Test;
  * Unit test for manual checkpoint restarting
  */
 public class TestManualCheckpoint extends BspCase {
-  /** Where the checkpoints will be stored and restarted */
-  private final String HDFS_CHECKPOINT_DIR =
-      "/tmp/testBspCheckpoints";
-
-  /**
-   * Create the test case
-   *
-   * @param testName name of the test case
-   */
-  public TestManualCheckpoint(String testName) {
-    super(testName);
-  }
 
   public TestManualCheckpoint() {
     super(TestManualCheckpoint.class.getName());
@@ -59,61 +48,52 @@ public class TestManualCheckpoint extend
   @Test
   public void testBspCheckpoint()
       throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = new GiraphJob(getCallingMethodName());
-    setupConfiguration(job);
+    Path checkpointsDir = getTempPath("checkPointsForTesting");
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleCheckpointVertex.class,
+        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+        SimpleSuperstepVertexInputFormat.class,
+        SimpleSuperstepVertexOutputFormat.class, outputPath);
+
     job.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
-        HDFS_CHECKPOINT_DIR);
+        checkpointsDir.toString());
     job.getConfiguration().setBoolean(
         GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
     job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2);
-    job.setVertexClass(SimpleCheckpointVertex.class);
-    job.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
-    job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-    job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-    Path outputPath = new Path("/tmp/" + getCallingMethodName());
-    removeAndSetOutput(job, outputPath);
+
     assertTrue(job.run(true));
-    long fileLen = 0;
+
     long idSum = 0;
-    if (getJobTracker() == null) {
-      FileStatus fileStatus = getSinglePartFileStatus(job, outputPath);
-      fileLen = fileStatus.getLen();
-      idSum =
-          SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.getFinalSum();
+    if (!runningInDistributedMode()) {
+      FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
+          outputPath);
+      idSum = SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext
+          .getFinalSum();
       System.out.println("testBspCheckpoint: idSum = " + idSum +
-          " fileLen = " + fileLen);
+          " fileLen = " + fileStatus.getLen());
     }
 
     // Restart the test from superstep 2
-    System.out.println(
-        "testBspCheckpoint: Restarting from superstep 2" +
-            " with checkpoint path = " + HDFS_CHECKPOINT_DIR);
-    GiraphJob restartedJob = new GiraphJob(getCallingMethodName() +
-        "Restarted");
-    setupConfiguration(restartedJob);
+    System.out.println("testBspCheckpoint: Restarting from superstep 2" +
+        " with checkpoint path = " + checkpointsDir);
+    outputPath = getTempPath(getCallingMethodName() + "Restarted");
+    GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+        SimpleCheckpointVertex.class,
+        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+        SimpleSuperstepVertexInputFormat.class,
+        SimpleSuperstepVertexOutputFormat.class, outputPath);
     restartedJob.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
-        HDFS_CHECKPOINT_DIR);
-    restartedJob.getConfiguration().setLong(GiraphJob.RESTART_SUPERSTEP, 2);
-    restartedJob.setVertexClass(SimpleCheckpointVertex.class);
-    restartedJob.setWorkerContextClass(
-        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
-    restartedJob.setVertexInputFormatClass(
-        SimpleSuperstepVertexInputFormat.class);
-    restartedJob.setVertexOutputFormatClass(
-        SimpleSuperstepVertexOutputFormat.class);
-    outputPath = new Path("/tmp/" + getCallingMethodName() + "Restarted");
-    removeAndSetOutput(restartedJob, outputPath);
+        checkpointsDir.toString());
+
     assertTrue(restartedJob.run(true));
-    if (getJobTracker() == null) {
-      FileStatus fileStatus = getSinglePartFileStatus(job, outputPath);
-      fileLen = fileStatus.getLen();
-      assertTrue(fileStatus.getLen() == fileLen);
+    if (!runningInDistributedMode()) {
       long idSumRestarted =
-          SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.getFinalSum();
+          SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext
+              .getFinalSum();
       System.out.println("testBspCheckpoint: idSumRestarted = " +
           idSumRestarted);
-      assertTrue(idSum == idSumRestarted);
+      assertEquals(idSum, idSumRestarted);
     }
   }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java Thu May 10 15:17:43 2012
@@ -33,15 +33,7 @@ import org.junit.Test;
  * Unit test for graph mutation
  */
 public class TestMutateGraphVertex extends BspCase {
-    /**
-     * Create the test case
-     *
-     * @param testName name of the test case
-     */
-    public TestMutateGraphVertex(String testName) {
-        super(testName);
-    }
-    
+
     public TestMutateGraphVertex() {
         super(TestMutateGraphVertex.class.getName());
     }
@@ -56,15 +48,12 @@ public class TestMutateGraphVertex exten
     @Test
     public void testMutateGraph()
             throws IOException, InterruptedException, ClassNotFoundException {
-        GiraphJob job = new GiraphJob(getCallingMethodName());
-        setupConfiguration(job);
-        job.setVertexClass(SimpleMutateGraphVertex.class);
-        job.setWorkerContextClass(
-            SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
-        job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
-        job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
-        Path outputPath = new Path("/tmp/" + getCallingMethodName());
-        removeAndSetOutput(job, outputPath);
+        GiraphJob job = prepareJob(getCallingMethodName(),
+            SimpleMutateGraphVertex.class,
+            SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class,
+            SimplePageRankVertexInputFormat.class,
+            SimplePageRankVertexOutputFormat.class,
+            getTempPath(getCallingMethodName()));
         assertTrue(job.run(true));
     }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java Thu May 10 15:17:43 2012
@@ -33,15 +33,7 @@ import org.junit.Test;
  * Unit test for not enough map tasks
  */
 public class TestNotEnoughMapTasks extends BspCase {
-    /**
-     * Create the test case
-     *
-     * @param testName name of the test case
-     */
-    public TestNotEnoughMapTasks(String testName) {
-        super(testName);
-    }
-    
+
     public TestNotEnoughMapTasks() {
         super(TestNotEnoughMapTasks.class.getName());
     }
@@ -56,25 +48,23 @@ public class TestNotEnoughMapTasks exten
     @Test
     public void testNotEnoughMapTasks()
             throws IOException, InterruptedException, ClassNotFoundException {
-        if (getJobTracker() == null) {
+        if (!runningInDistributedMode()) {
             System.out.println(
                 "testNotEnoughMapTasks: Ignore this test in local mode.");
             return;
         }
-        GiraphJob job = new GiraphJob(getCallingMethodName());
-        setupConfiguration(job);
+        Path outputPath = getTempPath(getCallingMethodName());
+        GiraphJob job = prepareJob(getCallingMethodName(),
+            SimpleCheckpointVertex.class,
+            SimpleSuperstepVertexInputFormat.class,
+            SimpleSuperstepVertexOutputFormat.class, outputPath);
+
         // An unlikely impossible number of workers to achieve
         final int unlikelyWorkers = Short.MAX_VALUE;
-        job.setWorkerConfiguration(
-            unlikelyWorkers, unlikelyWorkers, 100.0f);
+        job.setWorkerConfiguration(unlikelyWorkers, unlikelyWorkers, 100.0f);
         // Only one poll attempt of one second to make failure faster
         job.getConfiguration().setInt(GiraphJob.POLL_ATTEMPTS, 1);
         job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 1);
-        job.setVertexClass(SimpleCheckpointVertex.class);
-        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-        Path outputPath = new Path("/tmp/" + getCallingMethodName());
-        removeAndSetOutput(job, outputPath);
         assertFalse(job.run(false));
     }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestZooKeeperExt.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestZooKeeperExt.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestZooKeeperExt.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestZooKeeperExt.java Thu May 10 15:17:43 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
@@ -52,7 +53,7 @@ public class TestZooKeeperExt implements
                 return;
             }
             zooKeeperExt =
-                new ZooKeeperExt(zkList, 30*1000, this);
+                new ZooKeeperExt(zkList, 30 * 1000, this);
             zooKeeperExt.deleteExt(BASE_PATH, -1, true);
         } catch (KeeperException.NoNodeException e) {
             System.out.println("Clean start: No node " + BASE_PATH);
@@ -153,7 +154,7 @@ public class TestZooKeeperExt implements
         for (String fullPath : sequenceOrderedList) {
             assertTrue(fullPath.contains(BASE_PATH + "/"));
         }
-        assertTrue(sequenceOrderedList.size() == 4);
+        assertEquals(4, sequenceOrderedList.size());
         assertTrue(sequenceOrderedList.get(0).contains("/b"));
         assertTrue(sequenceOrderedList.get(1).contains("/a"));
         assertTrue(sequenceOrderedList.get(2).contains("/d"));

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java?rev=1336743&r1=1336742&r2=1336743&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java Thu May 10 15:17:43 2012
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -50,7 +51,7 @@ public class TestEdgeListVertex {
   private GiraphJob job;
 
   /**
-   * Simple instantiable class that extends {@link EdgeArrayVertex}.
+   * Simple instantiable class that extends {@link EdgeListVertex}.
    */
   private static class IFDLEdgeListVertex extends
       EdgeListVertex<IntWritable, FloatWritable, DoubleWritable,
@@ -69,17 +70,18 @@ public class TestEdgeListVertex {
       throw new RuntimeException("setUp: Failed", e);
     }
     job.setVertexClass(IFDLEdgeListVertex.class);
-    job.getConfiguration().setClass(GiraphJob.VERTEX_INDEX_CLASS,
-        IntWritable.class, WritableComparable.class);
-    job.getConfiguration().setClass(GiraphJob.VERTEX_VALUE_CLASS,
-        FloatWritable.class, Writable.class);
-    job.getConfiguration().setClass(GiraphJob.EDGE_VALUE_CLASS,
-        DoubleWritable.class, Writable.class);
-    job.getConfiguration().setClass(GiraphJob.MESSAGE_VALUE_CLASS,
-        LongWritable.class, Writable.class);
+    Configuration conf = job.getConfiguration();
+    conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, IntWritable.class,
+        WritableComparable.class);
+    conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, FloatWritable.class,
+        Writable.class);
+    conf.setClass(GiraphJob.EDGE_VALUE_CLASS, DoubleWritable.class,
+        Writable.class);
+    conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS, LongWritable.class,
+        Writable.class);
     vertex = (IFDLEdgeListVertex)
       BspUtils.<IntWritable, FloatWritable, DoubleWritable, LongWritable>
-      createVertex(job.getConfiguration());
+      createVertex(conf);
   }
 
   @Test



Mime
View raw message