incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1245205 [16/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example...
Date Thu, 16 Feb 2012 22:12:36 GMT
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -40,78 +40,77 @@ import org.apache.hadoop.mapreduce.lib.o
  * @param <E> Edge value
  */
 @SuppressWarnings("rawtypes")
-public abstract class TextVertexOutputFormat<
-        I extends WritableComparable, V extends Writable, E extends Writable>
-        extends VertexOutputFormat<I, V, E> {
-    /** Uses the TextOutputFormat to do everything */
-    protected TextOutputFormat<Text, Text> textOutputFormat =
-        new TextOutputFormat<Text, Text>();
+public abstract class TextVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends VertexOutputFormat<I, V, E> {
+  /** Uses the TextOutputFormat to do everything */
+  protected TextOutputFormat<Text, Text> textOutputFormat =
+      new TextOutputFormat<Text, Text>();
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex output.  Easiest to ignore the key value separator and only use
+   * key instead.
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   */
+  public abstract static class TextVertexWriter<I extends WritableComparable,
+      V extends Writable, E extends Writable> implements VertexWriter<I, V, E> {
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+    /** Internal line record writer */
+    private final RecordWriter<Text, Text> lineRecordWriter;
 
     /**
-     * Abstract class to be implemented by the user based on their specific
-     * vertex output.  Easiest to ignore the key value separator and only use
-     * key instead.
+     * Initialize with the LineRecordWriter.
      *
-     * @param <I> Vertex index value
-     * @param <V> Vertex value
-     * @param <E> Edge value
+     * @param lineRecordWriter Line record writer from TextOutputFormat
      */
-    public static abstract class TextVertexWriter<I extends WritableComparable,
-            V extends Writable, E extends Writable>
-            implements VertexWriter<I, V, E> {
-        /** Context passed to initialize */
-        private TaskAttemptContext context;
-        /** Internal line record writer */
-        private final RecordWriter<Text, Text> lineRecordWriter;
-
-        /**
-         * Initialize with the LineRecordWriter.
-         *
-         * @param lineRecordWriter Line record writer from TextOutputFormat
-         */
-        public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
-            this.lineRecordWriter = lineRecordWriter;
-        }
-
-        @Override
-        public void initialize(TaskAttemptContext context) throws IOException {
-            this.context = context;
-        }
-
-        @Override
-        public void close(TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            lineRecordWriter.close(context);
-        }
-
-        /**
-         * Get the line record writer.
-         *
-         * @return Record writer to be used for writing.
-         */
-        public RecordWriter<Text, Text> getRecordWriter() {
-            return lineRecordWriter;
-        }
-
-        /**
-         * Get the context.
-         *
-         * @return Context passed to initialize.
-         */
-        public TaskAttemptContext getContext() {
-            return context;
-        }
+    public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+      this.lineRecordWriter = lineRecordWriter;
     }
 
     @Override
-    public void checkOutputSpecs(JobContext context)
-            throws IOException, InterruptedException {
-        textOutputFormat.checkOutputSpecs(context);
+    public void initialize(TaskAttemptContext context) throws IOException {
+      this.context = context;
     }
 
     @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-            throws IOException, InterruptedException {
-        return textOutputFormat.getOutputCommitter(context);
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      lineRecordWriter.close(context);
     }
+
+    /**
+     * Get the line record writer.
+     *
+     * @return Record writer to be used for writing.
+     */
+    public RecordWriter<Text, Text> getRecordWriter() {
+      return lineRecordWriter;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    public TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    textOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return textOutputFormat.getOutputCommitter(context);
+  }
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of reusable library Giraph objects.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.lib;

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Base giraph package.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ComparisonUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ComparisonUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ComparisonUtils.java Thu Feb 16 22:12:31 2012
@@ -20,29 +20,43 @@ package org.apache.giraph.utils;
 
 import java.util.Iterator;
 
-/** simple helper class for comparisons and equality checking */
+/** Simple helper class for comparisons and equality checking */
 public class ComparisonUtils {
 
-    private ComparisonUtils() {
-    }
+  /** Do not construct this object */
+  private ComparisonUtils() { }
 
-    /** compare elements, sort order and length */
-    public static <T> boolean equal(Iterable<T> first, Iterable<T> second) {
-        return equal(first.iterator(), second.iterator());
-    }
+  /**
+   * Compare elements, sort order and length
+   *
+   * @param <T> Type of iterable to compare.
+   * @param first First iterable to compare.
+   * @param second Second iterable to compare.
+   * @return True if equal, false otherwise.
+   */
+  public static <T> boolean equal(Iterable<T> first, Iterable<T> second) {
+    return equal(first.iterator(), second.iterator());
+  }
 
-    /** compare elements, sort order and length */
-    public static <T> boolean equal(Iterator<T> first, Iterator<T> second) {
-        while (first.hasNext() && second.hasNext()) {
-            T message = first.next();
-            T otherMessage = second.next();
-            /* element-wise equality */
-            if (!(message == null ? otherMessage == null :
-                    message.equals(otherMessage))) {
-                return false;
-            }
-        }
-        /* length must also be equal */
-        return !(first.hasNext() || second.hasNext());
+  /**
+   * Compare elements, sort order and length
+   *
+   * @param <T> Type of iterable to compare.
+   * @param first First iterable to compare.
+   * @param second Second iterable to compare.
+   * @return True if equal, false otherwise.
+   */
+  public static <T> boolean equal(Iterator<T> first, Iterator<T> second) {
+    while (first.hasNext() && second.hasNext()) {
+      T message = first.next();
+      T otherMessage = second.next();
+      /* element-wise equality */
+      if (!(message == null ? otherMessage == null :
+        message.equals(otherMessage))) {
+        return false;
+      }
     }
+    /* length must also be equal */
+    return !(first.hasNext() || second.hasNext());
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/EmptyIterable.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/EmptyIterable.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/EmptyIterable.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/EmptyIterable.java Thu Feb 16 22:12:31 2012
@@ -20,26 +20,30 @@ package org.apache.giraph.utils;
 
 import java.util.Iterator;
 
+/**
+ * Helper empty iterable when there are no messages.
+ *
+ * @param <M> Message data
+ */
 public class EmptyIterable<M> implements Iterable<M>, Iterator<M> {
-
-    @Override
-    public Iterator<M> iterator() {
-        return this;
-    }
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public M next() {
-        return null;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
+  @Override
+  public Iterator<M> iterator() {
+    return this;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public M next() {
+    return null;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
 }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java Thu Feb 16 22:12:31 2012
@@ -42,208 +42,253 @@ import java.util.concurrent.Executors;
 /**
  * A base class for running internal tests on a vertex
  *
- * Extending classes only have to invoke the run() method to test their vertex. All data
- * is written to a local tmp directory that is removed afterwards. A local zookeeper
- * instance is started in an extra thread and shutdown at the end.
+ * Extending classes only have to invoke the run() method to test their vertex.
+ * All data is written to a local tmp directory that is removed afterwards.
+ * A local zookeeper instance is started in an extra thread and
+ * shutdown at the end.
  *
  * Heavily inspired from Apache Mahout's MahoutTestCase
  */
 public class InternalVertexRunner {
+  /** ZooKeeper port to use for tests */
+  public static final int LOCAL_ZOOKEEPER_PORT = 22182;
 
-    public static final int LOCAL_ZOOKEEPER_PORT = 22182;
+  /**
+   * Default constructor.
+   */
+  private InternalVertexRunner() { }
+
+  /**
+   * Attempts to run the vertex internally in the current JVM, reading from
+   * and writing to a temporary folder on local disk. Will start
+   * its own ZooKeeper instance.
+   *
+   * @param vertexClass the vertex class to instantiate
+   * @param vertexInputFormatClass the inputformat to use
+   * @param vertexOutputFormatClass the outputformat to use
+   * @param params a map of parameters to add to the hadoop configuration
+   * @param data linewise input data
+   * @return linewise output data
+   * @throws Exception
+   */
+  public static Iterable<String> run(Class<?> vertexClass,
+      Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
+      Map<String, String> params, String... data) throws Exception {
+    return run(vertexClass, null, vertexInputFormatClass,
+        vertexOutputFormatClass, params, data);
+  }
+
+  /**
+   *  Attempts to run the vertex internally in the current JVM, reading from
+   *  and writing to a temporary folder on local disk. Will start its own
+   *  zookeeper instance.
+   *
+   * @param vertexClass the vertex class to instantiate
+   * @param vertexCombinerClass the vertex combiner to use (or null)
+   * @param vertexInputFormatClass the inputformat to use
+   * @param vertexOutputFormatClass the outputformat to use
+   * @param params a map of parameters to add to the hadoop configuration
+   * @param data linewise input data
+   * @return linewise output data
+   * @throws Exception
+   */
+  public static Iterable<String> run(Class<?> vertexClass,
+      Class<?> vertexCombinerClass, Class<?> vertexInputFormatClass,
+      Class<?> vertexOutputFormatClass, Map<String, String> params,
+      String... data) throws Exception {
+
+    File tmpDir = null;
+    try {
+      // prepare input file, output folder and zookeeper folder
+      tmpDir = createTestDir(vertexClass);
+      File inputFile = createTempFile(tmpDir, "graph.txt");
+      File outputDir = createTempDir(tmpDir, "output");
+      File zkDir = createTempDir(tmpDir, "zooKeeper");
+
+      // write input data to disk
+      writeLines(inputFile, data);
+
+      // create and configure the job to run the vertex
+      GiraphJob job = new GiraphJob(vertexClass.getName());
+      job.setVertexClass(vertexClass);
+      job.setVertexInputFormatClass(vertexInputFormatClass);
+      job.setVertexOutputFormatClass(vertexOutputFormatClass);
+
+      if (vertexCombinerClass != null) {
+        job.setVertexCombinerClass(vertexCombinerClass);
+      }
+
+      job.setWorkerConfiguration(1, 1, 100.0f);
+      Configuration conf = job.getConfiguration();
+      conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
+      conf.setBoolean(GiraphJob.LOCAL_TEST_MODE, true);
+      conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" +
+          String.valueOf(LOCAL_ZOOKEEPER_PORT));
+
+      for (Map.Entry<String, String> param : params.entrySet()) {
+        conf.set(param.getKey(), param.getValue());
+      }
+
+      FileInputFormat.addInputPath(job, new Path(inputFile.toString()));
+      FileOutputFormat.setOutputPath(job, new Path(outputDir.toString()));
+
+      // configure a local zookeeper instance
+      Properties zkProperties = new Properties();
+      zkProperties.setProperty("tickTime", "2000");
+      zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+      zkProperties.setProperty("clientPort",
+          String.valueOf(LOCAL_ZOOKEEPER_PORT));
+      zkProperties.setProperty("maxClientCnxns", "10000");
+      zkProperties.setProperty("minSessionTimeout", "10000");
+      zkProperties.setProperty("maxSessionTimeout", "100000");
+      zkProperties.setProperty("initLimit", "10");
+      zkProperties.setProperty("syncLimit", "5");
+      zkProperties.setProperty("snapCount", "50000");
+
+      QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+      qpConfig.parseProperties(zkProperties);
+
+      // create and run the zookeeper instance
+      final InternalZooKeeper zookeeper = new InternalZooKeeper();
+      final ServerConfig zkConfig = new ServerConfig();
+      zkConfig.readFrom(qpConfig);
 
-    private InternalVertexRunner() {
-    }
-
-    /**
-     *  Attempts to run the vertex internally in the current JVM, reading from and writing to a
-     *  temporary folder on local disk. Will start an own zookeeper instance.
-     *
-     * @param vertexClass the vertex class to instantiate
-     * @param vertexInputFormatClass the inputformat to use
-     * @param vertexOutputFormatClass the outputformat to use
-     * @param params a map of parameters to add to the hadoop configuration
-     * @param data linewise input data
-     * @return linewise output data
-     * @throws Exception
-     */
-    public static Iterable<String> run(Class<?> vertexClass,
-            Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
-            Map<String, String> params, String... data) throws Exception {
-        return run(vertexClass, null, vertexInputFormatClass,
-                vertexOutputFormatClass, params, data);
-    }
-    
-    /**
-     *  Attempts to run the vertex internally in the current JVM, reading from and writing to a
-     *  temporary folder on local disk. Will start an own zookeeper instance.
-     *
-     * @param vertexClass the vertex class to instantiate
-     * @param vertexCombinerClass the vertex combiner to use (or null)
-     * @param vertexInputFormatClass the inputformat to use
-     * @param vertexOutputFormatClass the outputformat to use
-     * @param params a map of parameters to add to the hadoop configuration
-     * @param data linewise input data
-     * @return linewise output data
-     * @throws Exception
-     */
-    public static Iterable<String> run(Class<?> vertexClass,
-            Class<?> vertexCombinerClass, Class<?> vertexInputFormatClass, 
-            Class<?> vertexOutputFormatClass, Map<String, String> params,
-            String... data) throws Exception {
-
-        File tmpDir = null;
-        try {
-            // prepare input file, output folder and zookeeper folder
-            tmpDir = createTestDir(vertexClass);
-            File inputFile = createTempFile(tmpDir, "graph.txt");
-            File outputDir = createTempDir(tmpDir, "output");
-            File zkDir = createTempDir(tmpDir, "zooKeeper");
-
-            // write input data to disk
-            writeLines(inputFile, data);
-
-            // create and configure the job to run the vertex
-            GiraphJob job = new GiraphJob(vertexClass.getName());
-            job.setVertexClass(vertexClass);
-            job.setVertexInputFormatClass(vertexInputFormatClass);
-            job.setVertexOutputFormatClass(vertexOutputFormatClass);
-            
-            if (vertexCombinerClass != null) {
-                job.setVertexCombinerClass(vertexCombinerClass);
-            }
-
-            job.setWorkerConfiguration(1, 1, 100.0f);
-            Configuration conf = job.getConfiguration();
-            conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
-            conf.setBoolean(GiraphJob.LOCAL_TEST_MODE, true);
-            conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" +
-                    String.valueOf(LOCAL_ZOOKEEPER_PORT));
-
-            for (Map.Entry<String,String> param : params.entrySet()) {
-                conf.set(param.getKey(), param.getValue());
-            }
-
-            FileInputFormat.addInputPath(job, new Path(inputFile.toString()));
-            FileOutputFormat.setOutputPath(job, new Path(outputDir.toString()));
-
-            // configure a local zookeeper instance
-            Properties zkProperties = new Properties();
-            zkProperties.setProperty("tickTime", "2000");
-            zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
-            zkProperties.setProperty("clientPort",
-                    String.valueOf(LOCAL_ZOOKEEPER_PORT));
-            zkProperties.setProperty("maxClientCnxns", "10000");
-            zkProperties.setProperty("minSessionTimeout", "10000");
-            zkProperties.setProperty("maxSessionTimeout", "100000");
-            zkProperties.setProperty("initLimit", "10");
-            zkProperties.setProperty("syncLimit", "5");
-            zkProperties.setProperty("snapCount", "50000");
-
-            QuorumPeerConfig qpConfig = new QuorumPeerConfig();
-            qpConfig.parseProperties(zkProperties);
-
-            // create and run the zookeeper instance
-            final InternalZooKeeper zookeeper = new InternalZooKeeper();
-            final ServerConfig zkConfig = new ServerConfig();
-            zkConfig.readFrom(qpConfig);
-
-            ExecutorService executorService = Executors.newSingleThreadExecutor();
-            executorService.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        zookeeper.runFromConfig(zkConfig);
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            });
-            try {
-                job.run(true);
-            } finally {
-                executorService.shutdown();
-                zookeeper.end();
-            }
-
-            return Files.readLines(new File(outputDir, "part-m-00000"),
-                    Charsets.UTF_8);
-        } finally {
-            if (tmpDir != null) {
-                new DeletingVisitor().accept(tmpDir);
-            }
-        }
-    }
-
-    /**
-     *  Create a temporary folder that will be removed after the test
-     */
-    private static final File createTestDir(Class<?> vertexClass)
-            throws IOException {
-        String systemTmpDir = System.getProperty("java.io.tmpdir");
-        long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
-        File testTempDir = new File(systemTmpDir, "giraph-" +
-                vertexClass.getSimpleName() + '-' + simpleRandomLong);
-        if (!testTempDir.mkdir()) {
-            throw new IOException("Could not create " + testTempDir);
-        }
-        testTempDir.deleteOnExit();
-        return testTempDir;
-    }
-
-    private static final File createTempFile(File parent, String name)
-            throws IOException {
-        return createTestTempFileOrDir(parent, name, false);
-    }
-
-    private static final File createTempDir(File parent, String name)
-            throws IOException {
-        File dir = createTestTempFileOrDir(parent, name, true);
-        dir.delete();
-        return dir;
-    }
-
-    private static File createTestTempFileOrDir(File parent, String name,
-            boolean dir) throws IOException {
-        File f = new File(parent, name);
-        f.deleteOnExit();
-        if (dir && !f.mkdirs()) {
-            throw new IOException("Could not make directory " + f);
-        }
-        return f;
-    }
-
-    private static void writeLines(File file, String... lines)
-            throws IOException {
-        Writer writer = Files.newWriter(file, Charsets.UTF_8);
-        try {
-            for (String line : lines) {
-                writer.write(line);
-                writer.write('\n');
-            }
-        } finally {
-            Closeables.closeQuietly(writer);
-        }
-    }
-
-    private static class DeletingVisitor implements FileFilter {
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      executorService.execute(new Runnable() {
         @Override
-        public boolean accept(File f) {
-            if (!f.isFile()) {
-                f.listFiles(this);
-            }
-            f.delete();
-            return false;
-        }
-    }
-
+        public void run() {
+          try {
+            zookeeper.runFromConfig(zkConfig);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+      try {
+        job.run(true);
+      } finally {
+        executorService.shutdown();
+        zookeeper.end();
+      }
+
+      return Files.readLines(new File(outputDir, "part-m-00000"),
+          Charsets.UTF_8);
+    } finally {
+      if (tmpDir != null) {
+        new DeletingVisitor().accept(tmpDir);
+      }
+    }
+  }
+
+  /**
+   * Create a temporary folder that will be removed after the test.
+   *
+   * @param vertexClass Used for generating the folder name.
+   * @return File object for the directory.
+   */
+  private static File createTestDir(Class<?> vertexClass)
+    throws IOException {
+    String systemTmpDir = System.getProperty("java.io.tmpdir");
+    long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
+    File testTempDir = new File(systemTmpDir, "giraph-" +
+        vertexClass.getSimpleName() + '-' + simpleRandomLong);
+    if (!testTempDir.mkdir()) {
+      throw new IOException("Could not create " + testTempDir);
+    }
+    testTempDir.deleteOnExit();
+    return testTempDir;
+  }
+
+  /**
+   * Make a temporary file.
+   *
+   * @param parent Parent directory.
+   * @param name File name.
+   * @return File object to temporary file.
+   * @throws IOException
+   */
+  private static File createTempFile(File parent, String name)
+    throws IOException {
+    return createTestTempFileOrDir(parent, name, false);
+  }
+
+  /**
+   * Make a temporary directory.
+   *
+   * @param parent Parent directory.
+   * @param name Directory name.
+   * @return File object to temporary file.
+   * @throws IOException
+   */
+  private static File createTempDir(File parent, String name)
+    throws IOException {
+    File dir = createTestTempFileOrDir(parent, name, true);
+    dir.delete();
+    return dir;
+  }
+
+  /**
+   * Creae a test temp file or directory.
+   *
+   * @param parent Parent directory
+   * @param name Name of file
+   * @param dir Is directory?
+   * @return File object
+   * @throws IOException
+   */
+  private static File createTestTempFileOrDir(File parent, String name,
+      boolean dir) throws IOException {
+    File f = new File(parent, name);
+    f.deleteOnExit();
+    if (dir && !f.mkdirs()) {
+      throw new IOException("Could not make directory " + f);
+    }
+    return f;
+  }
+
+  /**
+   * Write lines to a file.
+   *
+   * @param file File to write lines to
+   * @param lines Strings written to the file
+   * @throws IOException
+   */
+  private static void writeLines(File file, String... lines)
+    throws IOException {
+    Writer writer = Files.newWriter(file, Charsets.UTF_8);
+    try {
+      for (String line : lines) {
+        writer.write(line);
+        writer.write('\n');
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
+
+  /**
+   * Deletes files.
+   */
+  private static class DeletingVisitor implements FileFilter {
+    @Override
+    public boolean accept(File f) {
+      if (!f.isFile()) {
+        f.listFiles(this);
+      }
+      f.delete();
+      return false;
+    }
+  }
+
+  /**
+   * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
+   */
+  private static class InternalZooKeeper extends ZooKeeperServerMain {
     /**
-     * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
+     * Shutdown the ZooKeeper instance.
      */
-    private static class InternalZooKeeper extends ZooKeeperServerMain {
-        void end() {
-            shutdown();
-        }
+    void end() {
+      shutdown();
     }
-
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java Thu Feb 16 22:12:31 2012
@@ -22,18 +22,20 @@ package org.apache.giraph.utils;
  * Helper static methods for tracking memory usage.
  */
 public class MemoryUtils {
-    /**
-     * Get stringified runtime memory stats
-     *
-     * @return String of all Runtime stats.
-     */
-    public static String getRuntimeMemoryStats() {
-        return "totalMem = " +
-               (Runtime.getRuntime().totalMemory() / 1024f / 1024f) +
-               "M, maxMem = "  +
-               (Runtime.getRuntime().maxMemory() / 1024f / 1024f) +
-               "M, freeMem = " +
-               (Runtime.getRuntime().freeMemory() / 1024f / 1024f)
-                + "M";
-    }
+  /** Do not instantiate. */
+  private MemoryUtils() { }
+
+  /**
+   * Get stringified runtime memory stats
+   *
+   * @return String of all Runtime stats.
+   */
+  public static String getRuntimeMemoryStats() {
+    return "totalMem = " +
+      (Runtime.getRuntime().totalMemory() / 1024f / 1024f) +
+      "M, maxMem = "  +
+      (Runtime.getRuntime().maxMemory() / 1024f / 1024f) +
+      "M, freeMem = " +
+      (Runtime.getRuntime().freeMemory() / 1024f / 1024f) + "M";
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java Thu Feb 16 22:12:31 2012
@@ -35,114 +35,127 @@ import java.util.Map;
  * generic classes, not interfaces.
  */
 public class ReflectionUtils {
-    /**
-     * Get the underlying class for a type, or null if the type is
-     * a variable type.
-     *
-     * @param type the type
-     * @return the underlying class
-     */
-    public static Class<?> getClass(Type type) {
-        if (type instanceof Class) {
-            return (Class<?>) type;
-        }
-        else if (type instanceof ParameterizedType) {
-            return getClass(((ParameterizedType) type).getRawType());
-        }
-        else if (type instanceof GenericArrayType) {
-            Type componentType =
-                ((GenericArrayType) type).getGenericComponentType();
-            Class<?> componentClass = getClass(componentType);
-            if (componentClass != null ) {
-                return Array.newInstance(componentClass, 0).getClass();
-            }
-            else {
-                return null;
-            }
-        }
-        else {
-            return null;
-        }
+  /**
+   * Do not instantiate.
+   */
+  private ReflectionUtils() { }
+
+  /**
+   * Get the underlying class for a type, or null if the type is
+   * a variable type.
+   *
+   * @param type the type
+   * @return the underlying class
+   */
+  public static Class<?> getClass(Type type) {
+    if (type instanceof Class) {
+      return (Class<?>) type;
+    } else if (type instanceof ParameterizedType) {
+      return getClass(((ParameterizedType) type).getRawType());
+    } else if (type instanceof GenericArrayType) {
+      Type componentType =
+          ((GenericArrayType) type).getGenericComponentType();
+      Class<?> componentClass = getClass(componentType);
+      if (componentClass != null) {
+        return Array.newInstance(componentClass, 0).getClass();
+      } else {
+        return null;
+      }
+    } else {
+      return null;
     }
+  }
 
-    /**
-     * Get the actual type arguments a child class has used to extend a
-     * generic base class.
-     *
-     * @param baseClass the base class
-     * @param childClass the child class
-     * @return a list of the raw classes for the actual type arguments.
-     */
-    public static <T> List<Class<?>> getTypeArguments(
-            Class<T> baseClass, Class<? extends T> childClass) {
-        Map<Type, Type> resolvedTypes = new HashMap<Type, Type>();
-        Type type = childClass;
-        // start walking up the inheritance hierarchy until we hit baseClass
-        while (! getClass(type).equals(baseClass)) {
-            if (type instanceof Class) {
-                // there is no useful information for us in raw types,
-                // so just keep going.
-                type = ((Class<?>) type).getGenericSuperclass();
-            }
-            else {
-                ParameterizedType parameterizedType = (ParameterizedType) type;
-                Class<?> rawType = (Class<?>) parameterizedType.getRawType();
-
-                Type[] actualTypeArguments =
-                    parameterizedType.getActualTypeArguments();
-                TypeVariable<?>[] typeParameters = rawType.getTypeParameters();
-                for (int i = 0; i < actualTypeArguments.length; i++) {
-                    resolvedTypes.put(typeParameters[i],
-                                      actualTypeArguments[i]);
-                }
-
-                if (!rawType.equals(baseClass)) {
-                    type = rawType.getGenericSuperclass();
-                }
-            }
+  /**
+   * Get the actual type arguments a child class has used to extend a
+   * generic base class.
+   *
+   * @param <T> Type to evaluate.
+   * @param baseClass the base class
+   * @param childClass the child class
+   * @return a list of the raw classes for the actual type arguments.
+   */
+  public static <T> List<Class<?>> getTypeArguments(
+      Class<T> baseClass, Class<? extends T> childClass) {
+    Map<Type, Type> resolvedTypes = new HashMap<Type, Type>();
+    Type type = childClass;
+    // start walking up the inheritance hierarchy until we hit baseClass
+    while (! getClass(type).equals(baseClass)) {
+      if (type instanceof Class) {
+        // there is no useful information for us in raw types,
+        // so just keep going.
+        type = ((Class<?>) type).getGenericSuperclass();
+      } else {
+        ParameterizedType parameterizedType = (ParameterizedType) type;
+        Class<?> rawType = (Class<?>) parameterizedType.getRawType();
+
+        Type[] actualTypeArguments =
+            parameterizedType.getActualTypeArguments();
+        TypeVariable<?>[] typeParameters = rawType.getTypeParameters();
+        for (int i = 0; i < actualTypeArguments.length; i++) {
+          resolvedTypes.put(typeParameters[i],
+              actualTypeArguments[i]);
         }
 
-        // finally, for each actual type argument provided to baseClass,
-        // determine (if possible)
-        // the raw class for that type argument.
-        Type[] actualTypeArguments;
-        if (type instanceof Class) {
-            actualTypeArguments = ((Class<?>) type).getTypeParameters();
+        if (!rawType.equals(baseClass)) {
+          type = rawType.getGenericSuperclass();
         }
-        else {
-            actualTypeArguments =
-                ((ParameterizedType) type).getActualTypeArguments();
-        }
-        List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>();
-        // resolve types by chasing down type variables.
-        for (Type baseType: actualTypeArguments) {
-            while (resolvedTypes.containsKey(baseType)) {
-                baseType = resolvedTypes.get(baseType);
-            }
-            typeArgumentsAsClasses.add(getClass(baseType));
-        }
-        return typeArgumentsAsClasses;
+      }
     }
 
-    /** try to directly set a (possibly private) field on an Object */
-    public static void setField(Object target, String fieldname, Object value)
-            throws NoSuchFieldException, IllegalAccessException {
-        Field field = findDeclaredField(target.getClass(), fieldname);
-        field.setAccessible(true);
-        field.set(target, value);
+    // finally, for each actual type argument provided to baseClass,
+    // determine (if possible)
+    // the raw class for that type argument.
+    Type[] actualTypeArguments;
+    if (type instanceof Class) {
+      actualTypeArguments = ((Class<?>) type).getTypeParameters();
+    } else {
+      actualTypeArguments =
+          ((ParameterizedType) type).getActualTypeArguments();
+    }
+    List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>();
+    // resolve types by chasing down type variables.
+    for (Type baseType: actualTypeArguments) {
+      while (resolvedTypes.containsKey(baseType)) {
+        baseType = resolvedTypes.get(baseType);
+      }
+      typeArgumentsAsClasses.add(getClass(baseType));
     }
+    return typeArgumentsAsClasses;
+  }
 
-    /** find a declared field in a class or one of its super classes */
-    private static Field findDeclaredField(Class<?> inClass, String fieldname)
-            throws NoSuchFieldException {
-        while (!Object.class.equals(inClass)) {
-            for (Field field : inClass.getDeclaredFields()) {
-                if (field.getName().equalsIgnoreCase(fieldname)) {
-                    return field;
-                }
-            }
-            inClass = inClass.getSuperclass();
+  /**
+   * Try to directly set a (possibly private) field on an Object.
+   *
+   * @param target Target to set the field on.
+   * @param fieldname Name of field.
+   * @param value Value to set on target.
+   */
+  public static void setField(Object target, String fieldname, Object value)
+    throws NoSuchFieldException, IllegalAccessException {
+    Field field = findDeclaredField(target.getClass(), fieldname);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+
+  /**
+   * Find a declared field in a class or one of its super classes
+   *
+   * @param inClass Class to search for declared field.
+   * @param fieldname Field name to search for
+   * @return Field or will throw.
+   * @throws NoSuchFieldException When field not found.
+   */
+  private static Field findDeclaredField(Class<?> inClass, String fieldname)
+    throws NoSuchFieldException {
+    while (!Object.class.equals(inClass)) {
+      for (Field field : inClass.getDeclaredFields()) {
+        if (field.getName().equalsIgnoreCase(fieldname)) {
+          return field;
         }
-        throw new NoSuchFieldException();
+      }
+      inClass = inClass.getSuperclass();
     }
+    throw new NoSuchFieldException();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java Thu Feb 16 22:12:31 2012
@@ -24,24 +24,30 @@ import org.apache.hadoop.io.IntWritable;
 /**
  * {@link UnmodifiableIterator} over a primitive int array
  */
-public class UnmodifiableIntArrayIterator
-        extends UnmodifiableIterator<IntWritable> {
+public class UnmodifiableIntArrayIterator extends
+    UnmodifiableIterator<IntWritable> {
+  /** Array to iterate over */
+  private final int[] intArray;
+  /** Offset to array */
+  private int offset;
 
-    private final int[] arr;
-    private int offset;
+  /**
+   * Constructor with array to iterate over.
+   *
+   * @param intArray Array to iterate over.
+   */
+  public UnmodifiableIntArrayIterator(int[] intArray) {
+    this.intArray = intArray;
+    offset = 0;
+  }
 
-    public UnmodifiableIntArrayIterator(int[] arr) {
-        this.arr = arr;
-        offset = 0;
-    }
+  @Override
+  public boolean hasNext() {
+    return offset < intArray.length;
+  }
 
-    @Override
-    public boolean hasNext() {
-        return offset < arr.length;
-    }
-
-    @Override
-    public IntWritable next() {
-        return new IntWritable(arr[offset++]);
-    }
-}
\ No newline at end of file
+  @Override
+  public IntWritable next() {
+    return new IntWritable(intArray[offset++]);
+  }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java Thu Feb 16 22:12:31 2012
@@ -41,147 +41,215 @@ import org.apache.zookeeper.data.Stat;
  * Helper static methods for working with Writable objects.
  */
 public class WritableUtils {
-    public static void readFieldsFromByteArray(
-            byte[] byteArray, Writable writableObject) {
-        DataInputStream inputStream =
-            new DataInputStream(new ByteArrayInputStream(byteArray));
-        try {
-            writableObject.readFields(inputStream);
-        } catch (IOException e) {
-            throw new IllegalStateException(
-                "readFieldsFromByteArray: IOException", e);
-        }
-    }
-
-    public static void readFieldsFromZnode(ZooKeeperExt zkExt,
-                                           String zkPath,
-                                           boolean watch,
-                                           Stat stat,
-                                           Writable writableObject) {
-        try {
-            byte[] zkData = zkExt.getData(zkPath, false, stat);
-            readFieldsFromByteArray(zkData, writableObject);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-               "readFieldsFromZnode: KeeperException on " + zkPath, e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-               "readFieldsFromZnode: InterrruptedStateException on " + zkPath,
-               e);
-        }
-    }
-
-    public static byte[] writeToByteArray(Writable writableObject) {
-        ByteArrayOutputStream outputStream =
-            new ByteArrayOutputStream();
-        DataOutput output = new DataOutputStream(outputStream);
-        try {
-            writableObject.write(output);
-        } catch (IOException e) {
-            throw new IllegalStateException(
-                "writeToByteArray: IOStateException", e);
-        }
-        return outputStream.toByteArray();
-    }
-
-    public static PathStat writeToZnode(ZooKeeperExt zkExt,
-                                        String zkPath,
-                                        int version,
-                                        Writable writableObject) {
-        try {
-            byte[] byteArray = writeToByteArray(writableObject);
-            return zkExt.createOrSetExt(zkPath,
-                                        byteArray,
-                                        Ids.OPEN_ACL_UNSAFE,
-                                        CreateMode.PERSISTENT,
-                                        true,
-                                        version);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-               "writeToZnode: KeeperException on " + zkPath, e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "writeToZnode: InterruptedException on " + zkPath, e);
-        }
-    }
-
-    public static byte[] writeListToByteArray(
-            List<? extends Writable> writableList) {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutput output = new DataOutputStream(outputStream);
-        try {
-            output.writeInt(writableList.size());
-            for (Writable writable : writableList) {
-                writable.write(output);
-            }
-        } catch (IOException e) {
-            throw new IllegalStateException(
-                "writeListToByteArray: IOException", e);
-        }
-        return outputStream.toByteArray();
-    }
-
-    public static PathStat writeListToZnode(
-            ZooKeeperExt zkExt,
-            String zkPath,
-            int version,
-            List<? extends Writable> writableList) {
-        try {
-            return zkExt.createOrSetExt(
-                zkPath,
-                writeListToByteArray(writableList),
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT,
-                true,
-                version);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-               "writeListToZnode: KeeperException on " + zkPath, e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "writeListToZnode: InterruptedException on " + zkPath, e);
-        }
-    }
-
-    public static List<? extends Writable> readListFieldsFromByteArray(
-            byte[] byteArray,
-            Class<? extends Writable> writableClass,
-            Configuration conf) {
-        try {
-            DataInputStream inputStream =
-                new DataInputStream(new ByteArrayInputStream(byteArray));
-            int size = inputStream.readInt();
-            List<Writable> writableList = new ArrayList<Writable>(size);
-            for (int i = 0; i < size; ++i) {
-                Writable writable =
-                    ReflectionUtils.newInstance(writableClass, conf);
-                writable.readFields(inputStream);
-                writableList.add(writable);
-            }
-            return writableList;
-        } catch (IOException e) {
-            throw new IllegalStateException(
-                    "readListFieldsFromZnode: IOException", e);
-        }
-    }
-
-    public static List<? extends Writable> readListFieldsFromZnode(
-            ZooKeeperExt zkExt,
-            String zkPath,
-            boolean watch,
-            Stat stat,
-            Class<? extends Writable> writableClass,
-            Configuration conf) {
-        try {
-            byte[] zkData = zkExt.getData(zkPath, false, stat);
-            return readListFieldsFromByteArray(zkData, writableClass, conf);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "readListFieldsFromZnode: KeeperException on " + zkPath, e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "readListFieldsFromZnode: InterruptedException on " + zkPath,
-                e);
-        }
+  /**
+   * Don't construct.
+   */
+  private WritableUtils() { }
+
+  /**
+   * Read fields from byteArray to a Writeable object.
+   *
+   * @param byteArray Byte array to find the fields in.
+   * @param writableObject Object to fill in the fields.
+   */
+  public static void readFieldsFromByteArray(
+      byte[] byteArray, Writable writableObject) {
+    DataInputStream inputStream =
+      new DataInputStream(new ByteArrayInputStream(byteArray));
+    try {
+      writableObject.readFields(inputStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "readFieldsFromByteArray: IOException", e);
+    }
+  }
+
+  /**
+   * Read fields from a ZooKeeper znode.
+   *
+   * @param zkExt ZooKeeper instance.
+   * @param zkPath Path of znode.
+   * @param watch Add a watch?
+   * @param stat Stat of znode if desired.
+   * @param writableObject Object to read into.
+   */
+  public static void readFieldsFromZnode(ZooKeeperExt zkExt,
+                                         String zkPath,
+                                         boolean watch,
+                                         Stat stat,
+                                         Writable writableObject) {
+    try {
+      byte[] zkData = zkExt.getData(zkPath, false, stat);
+      readFieldsFromByteArray(zkData, writableObject);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+        "readFieldsFromZnode: KeeperException on " + zkPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+        "readFieldsFromZnode: InterrruptedStateException on " + zkPath, e);
+    }
+  }
+
+  /**
+   * Write object to a byte array.
+   *
+   * @param writableObject Object to write from.
+   * @return Byte array with serialized object.
+   */
+  public static byte[] writeToByteArray(Writable writableObject) {
+    ByteArrayOutputStream outputStream =
+        new ByteArrayOutputStream();
+    DataOutput output = new DataOutputStream(outputStream);
+    try {
+      writableObject.write(output);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "writeToByteArray: IOStateException", e);
+    }
+    return outputStream.toByteArray();
+  }
+
+  /**
+   * Write object to a ZooKeeper znode.
+   *
+   * @param zkExt ZooKeeper instance.
+   * @param zkPath Path of znode.
+   * @param version Version of the write.
+   * @param writableObject Object to write from.
+   * @return Path and stat information of the znode.
+   */
+  public static PathStat writeToZnode(ZooKeeperExt zkExt,
+                                      String zkPath,
+                                      int version,
+                                      Writable writableObject) {
+    try {
+      byte[] byteArray = writeToByteArray(writableObject);
+      return zkExt.createOrSetExt(zkPath,
+          byteArray,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true,
+          version);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "writeToZnode: KeeperException on " + zkPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "writeToZnode: InterruptedException on " + zkPath, e);
+    }
+  }
+
+  /**
+   * Write list of object to a byte array.
+   *
+   * @param writableList List of object to write from.
+   * @return Byte array with serialized objects.
+   */
+  public static byte[] writeListToByteArray(
+      List<? extends Writable> writableList) {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    DataOutput output = new DataOutputStream(outputStream);
+    try {
+      output.writeInt(writableList.size());
+      for (Writable writable : writableList) {
+        writable.write(output);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "writeListToByteArray: IOException", e);
+    }
+    return outputStream.toByteArray();
+  }
+
+  /**
+   * Write list of objects to a ZooKeeper znode.
+   *
+   * @param zkExt ZooKeeper instance.
+   * @param zkPath Path of znode.
+   * @param version Version of the write.
+   * @param writableList List of objects to write from.
+   * @return Path and stat information of the znode.
+   */
+  public static PathStat writeListToZnode(
+      ZooKeeperExt zkExt,
+      String zkPath,
+      int version,
+      List<? extends Writable> writableList) {
+    try {
+      return zkExt.createOrSetExt(
+          zkPath,
+          writeListToByteArray(writableList),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true,
+          version);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "writeListToZnode: KeeperException on " + zkPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "writeListToZnode: InterruptedException on " + zkPath, e);
+    }
+  }
+
+  /**
+   * Read fields from byteArray to a list of Writeable objects.
+   *
+   * @param byteArray Byte array to find the fields in.
+   * @param writableClass Class of the objects to instantiate.
+   * @param conf Configuration used for instantiation (i.e Configurable)
+   * @return List of writable objects.
+   */
+  public static List<? extends Writable> readListFieldsFromByteArray(
+      byte[] byteArray,
+      Class<? extends Writable> writableClass,
+      Configuration conf) {
+    try {
+      DataInputStream inputStream =
+          new DataInputStream(new ByteArrayInputStream(byteArray));
+      int size = inputStream.readInt();
+      List<Writable> writableList = new ArrayList<Writable>(size);
+      for (int i = 0; i < size; ++i) {
+        Writable writable =
+            ReflectionUtils.newInstance(writableClass, conf);
+        writable.readFields(inputStream);
+        writableList.add(writable);
+      }
+      return writableList;
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "readListFieldsFromZnode: IOException", e);
+    }
+  }
+
+  /**
+   * Read fields from a ZooKeeper znode into a list of objects.
+   *
+   * @param zkExt ZooKeeper instance.
+   * @param zkPath Path of znode.
+   * @param watch Add a watch?
+   * @param stat Stat of znode if desired.
+   * @param writableClass Class of the objects to instantiate.
+   * @param conf Configuration used for instantiation (i.e Configurable)
+   * @return List of writable objects.
+   */
+  public static List<? extends Writable> readListFieldsFromZnode(
+      ZooKeeperExt zkExt,
+      String zkPath,
+      boolean watch,
+      Stat stat,
+      Class<? extends Writable> writableClass,
+      Configuration conf) {
+    try {
+      byte[] zkData = zkExt.getData(zkPath, false, stat);
+      return readListFieldsFromByteArray(zkData, writableClass, conf);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "readListFieldsFromZnode: KeeperException on " + zkPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "readListFieldsFromZnode: InterruptedException on " + zkPath,
+          e);
     }
+  }
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of all generic utility classes.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.utils;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/BspEvent.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/BspEvent.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/BspEvent.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/BspEvent.java Thu Feb 16 22:12:31 2012
@@ -23,27 +23,27 @@ package org.apache.giraph.zk;
  * event.
  */
 public interface BspEvent {
-    /**
-     * Reset the permanent signal.
-     */
-    void reset();
+  /**
+   * Reset the permanent signal.
+   */
+  void reset();
 
-    /**
-     * The event occurred and the occurrence has been logged for future
-     * waiters.
-     */
-    void signal();
+  /**
+   * The event occurred and the occurrence has been logged for future
+   * waiters.
+   */
+  void signal();
 
-    /**
-     * Wait until the event occurred or waiting timed out.
-     * @param msecs Milliseconds to wait until the event occurred. 0 indicates
-     *        check immediately.  -1 indicates wait forever.
-     * @return true if event occurred, false if timed out while waiting
-     */
-    boolean waitMsecs(int msecs);
+  /**
+   * Wait until the event occurred or waiting timed out.
+   * @param msecs Milliseconds to wait until the event occurred. 0 indicates
+   *        check immediately.  -1 indicates wait forever.
+   * @return true if event occurred, false if timed out while waiting
+   */
+  boolean waitMsecs(int msecs);
 
-    /**
-     * Wait indefinitely until the event occurs.
-     */
-    void waitForever();
+  /**
+   * Wait indefinitely until the event occurs.
+   */
+  void waitForever();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java Thu Feb 16 22:12:31 2012
@@ -24,29 +24,29 @@ import org.apache.hadoop.mapreduce.Mappe
  * A lock that will keep the job context updated while waiting.
  */
 public class ContextLock extends PredicateLock {
-    /** Job context (for progress) */
-    @SuppressWarnings("rawtypes")
-    private final Context context;
-    /** Msecs to refresh the progress meter */
-    private static final int msecPeriod = 10000;
+  /** Msecs to refresh the progress meter */
+  private static final int MSEC_PERIOD = 10000;
+  /** Job context (for progress) */
+  @SuppressWarnings("rawtypes")
+  private final Context context;
 
-    /**
-     * Constructor.
-     *
-     * @param context used to call progress()
-     */
-    ContextLock(@SuppressWarnings("rawtypes") Context context) {
-        this.context = context;
-    }
+  /**
+   * Constructor.
+   *
+   * @param context used to call progress()
+   */
+  ContextLock(@SuppressWarnings("rawtypes") Context context) {
+    this.context = context;
+  }
 
-    /**
-     * Specialized version of waitForever() that will keep the job progressing
-     * while waiting.
-     */
-    @Override
-    public void waitForever() {
-        while (waitMsecs(msecPeriod) == false) {
-            context.progress();
-        }
+  /**
+   * Specialized version of waitForever() that will keep the job progressing
+   * while waiting.
+   */
+  @Override
+  public void waitForever() {
+    while (!waitMsecs(MSEC_PERIOD)) {
+      context.progress();
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java Thu Feb 16 22:12:31 2012
@@ -29,89 +29,88 @@ import org.apache.log4j.Logger;
  * A lock with a predicate that was be used to synchronize events.
  */
 public class PredicateLock implements BspEvent {
-    /** Lock */
-    private Lock lock = new ReentrantLock();
-    /** Condition associated with lock */
-    private Condition cond = lock.newCondition();
-    /** Predicate */
-    private boolean eventOccurred = false;
-    /** Class logger */
-    private Logger LOG = Logger.getLogger(PredicateLock.class);
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(PredicateLock.class);
+  /** Lock */
+  private Lock lock = new ReentrantLock();
+  /** Condition associated with lock */
+  private Condition cond = lock.newCondition();
+  /** Predicate */
+  private boolean eventOccurred = false;
 
-    @Override
-    public void reset() {
-        lock.lock();
-        try {
-            eventOccurred = false;
-        } finally {
-            lock.unlock();
-        }
+  @Override
+  public void reset() {
+    lock.lock();
+    try {
+      eventOccurred = false;
+    } finally {
+      lock.unlock();
     }
+  }
 
-    @Override
-    public void signal() {
-        lock.lock();
-        try {
-            eventOccurred = true;
-            cond.signalAll();
-        } finally {
-            lock.unlock();
-        }
+  @Override
+  public void signal() {
+    lock.lock();
+    try {
+      eventOccurred = true;
+      cond.signalAll();
+    } finally {
+      lock.unlock();
     }
+  }
 
-    @Override
-    public boolean waitMsecs(int msecs) {
-        if (msecs < -1) {
-            throw new RuntimeException("msecs < -1");
-        }
+  @Override
+  public boolean waitMsecs(int msecs) {
+    if (msecs < -1) {
+      throw new RuntimeException("msecs < -1");
+    }
 
-        long maxMsecs = System.currentTimeMillis() + msecs;
-        long curMsecTimeout = 0;
-        lock.lock();
-        try {
-            while (eventOccurred == false) {
-                if (msecs == -1) {
-                    try {
-                        cond.await();
-                    } catch (InterruptedException e) {
-                        throw new IllegalStateException(
-                            "waitMsecs: Caught interrupted " +
-                            "exception on cond.await()", e);
-                    }
-                }
-                else {
-                    // Keep the wait non-negative
-                    curMsecTimeout =
-                        Math.max(maxMsecs - System.currentTimeMillis(), 0);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
-                    }
-                    try {
-                        boolean signaled =
-                            cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("waitMsecs: Got timed signaled of " +
-                                      signaled);
-                        }
-                    } catch (InterruptedException e) {
-                        throw new IllegalStateException(
-                            "waitMsecs: Caught interrupted " +
-                            "exception on cond.await() " +
-                            curMsecTimeout, e);
-                    }
-                    if (System.currentTimeMillis() > maxMsecs) {
-                        return false;
-                    }
-                }
+    long maxMsecs = System.currentTimeMillis() + msecs;
+    long curMsecTimeout = 0;
+    lock.lock();
+    try {
+      while (!eventOccurred) {
+        if (msecs == -1) {
+          try {
+            cond.await();
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "waitMsecs: Caught interrupted " +
+                    "exception on cond.await()", e);
+          }
+        } else {
+          // Keep the wait non-negative
+          curMsecTimeout =
+              Math.max(maxMsecs - System.currentTimeMillis(), 0);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
+          }
+          try {
+            boolean signaled =
+                cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("waitMsecs: Got timed signaled of " +
+                  signaled);
             }
-        } finally {
-            lock.unlock();
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "waitMsecs: Caught interrupted " +
+                    "exception on cond.await() " +
+                    curMsecTimeout, e);
+          }
+          if (System.currentTimeMillis() > maxMsecs) {
+            return false;
+          }
         }
-        return true;
+      }
+    } finally {
+      lock.unlock();
     }
+    return true;
+  }
 
-    @Override
-    public void waitForever() {
-        waitMsecs(-1);
-    }
+  @Override
+  public void waitForever() {
+    waitMsecs(-1);
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java Thu Feb 16 22:12:31 2012
@@ -39,270 +39,268 @@ import org.apache.zookeeper.ZooKeeper;
  * non-atomic operations that are useful.
  */
 public class ZooKeeperExt extends ZooKeeper {
-    /** Internal logger */
-    private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
-    /** Length of the ZK sequence number */
-    private static final int SEQUENCE_NUMBER_LENGTH = 10;
-
-    /**
-     * Constructor to connect to ZooKeeper
-     *
-     * @param connectString Comma separated host:port pairs, each corresponding
-     *        to a zk server. e.g.
-     *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
-     *        chroot suffix is used the example would look
-     *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
-     *        where the client would be rooted at "/app/a" and all paths
-     *        would be relative to this root - ie getting/setting/etc...
-     *        "/foo/bar" would result in operations being run on
-     *        "/app/a/foo/bar" (from the server perspective).
-     * @param sessionTimeout Session timeout in milliseconds
-     * @param watcher A watcher object which will be notified of state changes,
-     *        may also be notified for node events
-     * @throws IOException
-     */
-    public ZooKeeperExt(String connectString,
-                        int sessionTimeout,
-                        Watcher watcher) throws IOException {
-        super(connectString, sessionTimeout, watcher);
+  /** Internal logger */
+  private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
+  /** Length of the ZK sequence number */
+  private static final int SEQUENCE_NUMBER_LENGTH = 10;
+
+  /**
+   * Constructor to connect to ZooKeeper
+   *
+   * @param connectString Comma separated host:port pairs, each corresponding
+   *        to a zk server. e.g.
+   *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
+   *        chroot suffix is used the example would look
+   *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+   *        where the client would be rooted at "/app/a" and all paths
+   *        would be relative to this root - ie getting/setting/etc...
+   *        "/foo/bar" would result in operations being run on
+   *        "/app/a/foo/bar" (from the server perspective).
+   * @param sessionTimeout Session timeout in milliseconds
+   * @param watcher A watcher object which will be notified of state changes,
+   *        may also be notified for node events
+   * @throws IOException
+   */
+  public ZooKeeperExt(String connectString,
+      int sessionTimeout,
+      Watcher watcher) throws IOException {
+    super(connectString, sessionTimeout, watcher);
+  }
+
+  /**
+   * Provides a possibility of a creating a path consisting of more than one
+   * znode (not atomic).  If recursive is false, operates exactly the
+   * same as create().
+   *
+   * @param path path to create
+   * @param data data to set on the final znode
+   * @param acl acls on each znode created
+   * @param createMode only affects the final znode
+   * @param recursive if true, creates all ancestors
+   * @return Actual created path
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public String createExt(
+      final String path,
+      byte[] data,
+      List<ACL> acl,
+      CreateMode createMode,
+      boolean recursive) throws KeeperException, InterruptedException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("createExt: Creating path " + path);
     }
 
-    /**
-     * Provides a possibility of a creating a path consisting of more than one
-     * znode (not atomic).  If recursive is false, operates exactly the
-     * same as create().
-     *
-     * @param path path to create
-     * @param data data to set on the final znode
-     * @param acl acls on each znode created
-     * @param createMode only affects the final znode
-     * @param recursive if true, creates all ancestors
-     * @return Actual created path
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    public String createExt(
-            final String path,
-            byte data[],
-            List<ACL> acl,
-            CreateMode createMode,
-            boolean recursive) throws KeeperException, InterruptedException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("createExt: Creating path " + path);
-        }
-
-        if (!recursive) {
-            return create(path, data, acl, createMode);
-        }
+    if (!recursive) {
+      return create(path, data, acl, createMode);
+    }
 
-        try {
-            return create(path, data, acl, createMode);
-        } catch (KeeperException.NoNodeException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("createExt: Cannot directly create node " + path);
-            }
-        }
+    try {
+      return create(path, data, acl, createMode);
+    } catch (KeeperException.NoNodeException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("createExt: Cannot directly create node " + path);
+      }
+    }
 
-        int pos = path.indexOf("/", 1);
-        for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
-            try {
-                create(
-                    path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
-            } catch (KeeperException.NodeExistsException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("createExt: Znode " + path.substring(0, pos) +
-                              " already exists");
-                }
-            }
+    int pos = path.indexOf("/", 1);
+    for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
+      try {
+        create(
+            path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
+      } catch (KeeperException.NodeExistsException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("createExt: Znode " + path.substring(0, pos) +
+              " already exists");
         }
-        return create(path, data, acl, createMode);
+      }
     }
+    return create(path, data, acl, createMode);
+  }
+
+  /**
+   * Data structure for handling the output of createOrSet()
+   */
+  public class PathStat {
+    /** Path to created znode (if any) */
+    private String path;
+    /** Stat from set znode (if any) */
+    private Stat stat;
 
     /**
-     * Data structure for handling the output of createOrSet()
+     * Put in results from createOrSet()
+     *
+     * @param path Path to created znode (or null)
+     * @param stat Stat from set znode (if set)
      */
-    public class PathStat {
-        private String path;
-        private Stat stat;
-
-        /**
-         * Put in results from createOrSet()
-         *
-         * @param path Path to created znode (or null)
-         * @param stat Stat from set znode (if set)
-         */
-        public PathStat(String path, Stat stat) {
-            this.path = path;
-            this.stat = stat;
-        }
-
-        /**
-         * Get the path of the created znode if it was created.
-         *
-         * @return Path of created znode or null if not created
-         */
-        public String getPath() {
-            return path;
-        }
-
-        /**
-         * Get the stat of the set znode if set
-         *
-         * @return Stat of set znode or null if not set
-         */
-        public Stat getStat() {
-            return stat;
-        }
+    public PathStat(String path, Stat stat) {
+      this.path = path;
+      this.stat = stat;
     }
 
     /**
-     * Create a znode.  Set the znode if the created znode already exists.
+     * Get the path of the created znode if it was created.
      *
-     * @param path path to create
-     * @param data data to set on the final znode
-     * @param acl acls on each znode created
-     * @param createMode only affects the final znode
-     * @param recursive if true, creates all ancestors
-     * @return Path of created znode or Stat of set znode
-     * @throws InterruptedException
-     * @throws KeeperException
+     * @return Path of created znode or null if not created
      */
-    public PathStat createOrSetExt(final String path,
-                                   byte data[],
-                                   List<ACL> acl,
-                                   CreateMode createMode,
-                                   boolean recursive,
-                                   int version)
-            throws KeeperException, InterruptedException {
-        String createdPath = null;
-        Stat setStat = null;
-        try {
-            createdPath = createExt(path, data, acl, createMode, recursive);
-        } catch (KeeperException.NodeExistsException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("createOrSet: Node exists on path " + path);
-            }
-            setStat = setData(path, data, version);
-        }
-        return new PathStat(createdPath, setStat);
+    public String getPath() {
+      return path;
     }
 
     /**
-     * Create a znode if there is no other znode there
+     * Get the stat of the set znode if set
      *
-     * @param path path to create
-     * @param data data to set on the final znode
-     * @param acl acls on each znode created
-     * @param createMode only affects the final znode
-     * @param recursive if true, creates all ancestors
-     * @return Path of created znode or Stat of set znode
-     * @throws InterruptedException
-     * @throws KeeperException
+     * @return Stat of set znode or null if not set
      */
-    public PathStat createOnceExt(final String path,
-                                   byte data[],
-                                   List<ACL> acl,
-                                   CreateMode createMode,
-                                   boolean recursive)
-            throws KeeperException, InterruptedException {
-        String createdPath = null;
-        Stat setStat = null;
-        try {
-            createdPath = createExt(path, data, acl, createMode, recursive);
-        } catch (KeeperException.NodeExistsException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("createOnceExt: Node already exists on path " + path);
-            }
-        }
-        return new PathStat(createdPath, setStat);
+    public Stat getStat() {
+      return stat;
     }
+  }
 
-    /**
-     * Delete a path recursively.  When the deletion is recursive, it is a
-     * non-atomic operation, hence, not part of ZooKeeper.
-     * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
-     * @param version expected version (-1 for all)
-     * @param recursive if true, remove all children, otherwise behave like
-     *        remove()
-     * @throws InterruptedException
-     * @throws KeeperException
-     */
-    public void deleteExt(final String path, int version, boolean recursive)
-            throws InterruptedException, KeeperException {
-        if (!recursive) {
-            delete(path, version);
-            return;
-        }
+  /**
+   * Create a znode.  Set the znode if the created znode already exists.
+   *
+   * @param path path to create
+   * @param data data to set on the final znode
+   * @param acl acls on each znode created
+   * @param createMode only affects the final znode
+   * @param recursive if true, creates all ancestors
+   * @param version Version to set if setting
+   * @return Path of created znode or Stat of set znode
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public PathStat createOrSetExt(final String path,
+      byte[] data,
+      List<ACL> acl,
+      CreateMode createMode,
+      boolean recursive,
+      int version) throws KeeperException, InterruptedException {
+    String createdPath = null;
+    Stat setStat = null;
+    try {
+      createdPath = createExt(path, data, acl, createMode, recursive);
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("createOrSet: Node exists on path " + path);
+      }
+      setStat = setData(path, data, version);
+    }
+    return new PathStat(createdPath, setStat);
+  }
 
-        try {
-            delete(path, version);
-            return;
-        } catch (KeeperException.NotEmptyException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("deleteExt: Cannot directly remove node " + path);
-            }
-        }
+  /**
+   * Create a znode if there is no other znode there
+   *
+   * @param path path to create
+   * @param data data to set on the final znode
+   * @param acl acls on each znode created
+   * @param createMode only affects the final znode
+   * @param recursive if true, creates all ancestors
+   * @return Path of created znode or Stat of set znode
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public PathStat createOnceExt(final String path,
+      byte[] data,
+      List<ACL> acl,
+      CreateMode createMode,
+      boolean recursive) throws KeeperException, InterruptedException {
+    String createdPath = null;
+    Stat setStat = null;
+    try {
+      createdPath = createExt(path, data, acl, createMode, recursive);
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("createOnceExt: Node already exists on path " + path);
+      }
+    }
+    return new PathStat(createdPath, setStat);
+  }
 
-        List<String> childList = getChildren(path, false);
-        for (String child : childList) {
-            deleteExt(path + "/" + child, -1, true);
-        }
+  /**
+   * Delete a path recursively.  When the deletion is recursive, it is a
+   * non-atomic operation, hence, not part of ZooKeeper.
+   * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
+   * @param version expected version (-1 for all)
+   * @param recursive if true, remove all children, otherwise behave like
+   *        remove()
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void deleteExt(final String path, int version, boolean recursive)
+    throws InterruptedException, KeeperException {
+    if (!recursive) {
+      delete(path, version);
+      return;
+    }
 
-        delete(path, version);
+    try {
+      delete(path, version);
+      return;
+    } catch (KeeperException.NotEmptyException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("deleteExt: Cannot directly remove node " + path);
+      }
     }
 
-    /**
-     * Get the children of the path with extensions.
-     * Extension 1: Sort the children based on sequence number
-     * Extension 2: Get the full path instead of relative path
-     *
-     * @param path path to znode
-     * @param watch set the watch?
-     * @param sequenceSorted sort by the sequence number
-     * @param fullPath if true, get the fully znode path back
-     * @return list of children
-     * @throws InterruptedException
-     * @throws KeeperException
-     */
-    public List<String> getChildrenExt(
-            final String path,
-            boolean watch,
-            boolean sequenceSorted,
-            boolean fullPath)
-            throws KeeperException, InterruptedException {
-        List<String> childList = getChildren(path, watch);
-        /* Sort children according to the sequence number, if desired */
-        if (sequenceSorted) {
-            Collections.sort(childList,
-                new Comparator<String>() {
-                    public int compare(String s1, String s2) {
-                        if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
-                            (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
-                            throw new RuntimeException(
-                                "getChildrenExt: Invalid length for sequence " +
-                                " sorting > " +
-                                SEQUENCE_NUMBER_LENGTH +
-                                " for s1 (" +
-                                s1.length() + ") or s2 (" + s2.length() + ")");
-                        }
-                        int s1sequenceNumber = Integer.parseInt(
-                                s1.substring(s1.length() -
-                                             SEQUENCE_NUMBER_LENGTH));
-                        int s2sequenceNumber = Integer.parseInt(
-                                s2.substring(s2.length() -
-                                             SEQUENCE_NUMBER_LENGTH));
-                        return s1sequenceNumber - s2sequenceNumber;
-                    }
-                }
-            );
-        }
-        if (fullPath) {
-            List<String> fullChildList = new ArrayList<String>();
-            for (String child : childList) {
-                fullChildList.add(path + "/" + child);
-            }
-            return fullChildList;
+    List<String> childList = getChildren(path, false);
+    for (String child : childList) {
+      deleteExt(path + "/" + child, -1, true);
+    }
+
+    delete(path, version);
+  }
+
+  /**
+   * Get the children of the path with extensions.
+   * Extension 1: Sort the children based on sequence number
+   * Extension 2: Get the full path instead of relative path
+   *
+   * @param path path to znode
+   * @param watch set the watch?
+   * @param sequenceSorted sort by the sequence number
+   * @param fullPath if true, get the fully znode path back
+   * @return list of children
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public List<String> getChildrenExt(
+      final String path,
+      boolean watch,
+      boolean sequenceSorted,
+      boolean fullPath) throws KeeperException, InterruptedException {
+    List<String> childList = getChildren(path, watch);
+    /* Sort children according to the sequence number, if desired */
+    if (sequenceSorted) {
+      Collections.sort(childList, new Comparator<String>() {
+        public int compare(String s1, String s2) {
+          if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
+              (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
+            throw new RuntimeException(
+                "getChildrenExt: Invalid length for sequence " +
+                    " sorting > " +
+                    SEQUENCE_NUMBER_LENGTH +
+                    " for s1 (" +
+                    s1.length() + ") or s2 (" + s2.length() + ")");
+          }
+          int s1sequenceNumber = Integer.parseInt(
+              s1.substring(s1.length() -
+                  SEQUENCE_NUMBER_LENGTH));
+          int s2sequenceNumber = Integer.parseInt(
+              s2.substring(s2.length() -
+                  SEQUENCE_NUMBER_LENGTH));
+          return s1sequenceNumber - s2sequenceNumber;
         }
-        return childList;
+      });
+    }
+    if (fullPath) {
+      List<String> fullChildList = new ArrayList<String>();
+      for (String child : childList) {
+        fullChildList.add(path + "/" + child);
+      }
+      return fullChildList;
     }
+    return childList;
+  }
 }



Mime
View raw message