Return-Path: X-Original-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 848079E8F for ; Thu, 16 Feb 2012 22:13:55 +0000 (UTC) Received: (qmail 4328 invoked by uid 500); 16 Feb 2012 22:13:55 -0000 Delivered-To: apmail-incubator-giraph-commits-archive@incubator.apache.org Received: (qmail 4296 invoked by uid 500); 16 Feb 2012 22:13:55 -0000 Mailing-List: contact giraph-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: giraph-dev@incubator.apache.org Delivered-To: mailing list giraph-commits@incubator.apache.org Received: (qmail 4289 invoked by uid 99); 16 Feb 2012 22:13:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 22:13:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 22:13:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 932022388C28; Thu, 16 Feb 2012 22:12:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245205 [16/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example... Date: Thu, 16 Feb 2012 22:12:36 -0000 To: giraph-commits@incubator.apache.org From: aching@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120216221244.932022388C28@eris.apache.org> Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexOutputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexOutputFormat.java Thu Feb 16 22:12:31 2012 @@ -40,78 +40,77 @@ import org.apache.hadoop.mapreduce.lib.o * @param Edge value */ @SuppressWarnings("rawtypes") -public abstract class TextVertexOutputFormat< - I extends WritableComparable, V extends Writable, E extends Writable> - extends VertexOutputFormat { - /** Uses the TextOutputFormat to do everything */ - protected TextOutputFormat textOutputFormat = - new TextOutputFormat(); +public abstract class TextVertexOutputFormat + extends VertexOutputFormat { + /** Uses the TextOutputFormat to do everything */ + protected TextOutputFormat textOutputFormat = + new TextOutputFormat(); + + /** + * Abstract class to be implemented by the user based on their specific + * vertex output. Easiest to ignore the key value separator and only use + * key instead. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ + public abstract static class TextVertexWriter implements VertexWriter { + /** Context passed to initialize */ + private TaskAttemptContext context; + /** Internal line record writer */ + private final RecordWriter lineRecordWriter; /** - * Abstract class to be implemented by the user based on their specific - * vertex output. Easiest to ignore the key value separator and only use - * key instead. + * Initialize with the LineRecordWriter. * - * @param Vertex index value - * @param Vertex value - * @param Edge value + * @param lineRecordWriter Line record writer from TextOutputFormat */ - public static abstract class TextVertexWriter - implements VertexWriter { - /** Context passed to initialize */ - private TaskAttemptContext context; - /** Internal line record writer */ - private final RecordWriter lineRecordWriter; - - /** - * Initialize with the LineRecordWriter. - * - * @param lineRecordWriter Line record writer from TextOutputFormat - */ - public TextVertexWriter(RecordWriter lineRecordWriter) { - this.lineRecordWriter = lineRecordWriter; - } - - @Override - public void initialize(TaskAttemptContext context) throws IOException { - this.context = context; - } - - @Override - public void close(TaskAttemptContext context) - throws IOException, InterruptedException { - lineRecordWriter.close(context); - } - - /** - * Get the line record writer. - * - * @return Record writer to be used for writing. - */ - public RecordWriter getRecordWriter() { - return lineRecordWriter; - } - - /** - * Get the context. - * - * @return Context passed to initialize. - */ - public TaskAttemptContext getContext() { - return context; - } + public TextVertexWriter(RecordWriter lineRecordWriter) { + this.lineRecordWriter = lineRecordWriter; } @Override - public void checkOutputSpecs(JobContext context) - throws IOException, InterruptedException { - textOutputFormat.checkOutputSpecs(context); + public void initialize(TaskAttemptContext context) throws IOException { + this.context = context; } @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return textOutputFormat.getOutputCommitter(context); + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + lineRecordWriter.close(context); } + + /** + * Get the line record writer. + * + * @return Record writer to be used for writing. + */ + public RecordWriter getRecordWriter() { + return lineRecordWriter; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + public TaskAttemptContext getContext() { + return context; + } + } + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + textOutputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return textOutputFormat.getOutputCommitter(context); + } } Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java) URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/package-info.java Thu Feb 16 22:12:31 2012 @@ -15,15 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.giraph.bsp; - /** - * State of the BSP application + * Package of reusable library Giraph objects. */ -public enum ApplicationState { - UNKNOWN, ///< Shouldn't be seen, just an initial state - START_SUPERSTEP, ///< Start from a desired superstep - FAILED, ///< Unrecoverable - FINISHED ///< Successful completion -} +package org.apache.giraph.lib; Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java) URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/package-info.java Thu Feb 16 22:12:31 2012 @@ -15,15 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.giraph.bsp; - /** - * State of the BSP application + * Base giraph package. */ -public enum ApplicationState { - UNKNOWN, ///< Shouldn't be seen, just an initial state - START_SUPERSTEP, ///< Start from a desired superstep - FAILED, ///< Unrecoverable - FINISHED ///< Successful completion -} +package org.apache.giraph; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ComparisonUtils.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ComparisonUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ComparisonUtils.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ComparisonUtils.java Thu Feb 16 22:12:31 2012 @@ -20,29 +20,43 @@ package org.apache.giraph.utils; import java.util.Iterator; -/** simple helper class for comparisons and equality checking */ +/** Simple helper class for comparisons and equality checking */ public class ComparisonUtils { - private ComparisonUtils() { - } + /** Do not construct this object */ + private ComparisonUtils() { } - /** compare elements, sort order and length */ - public static boolean equal(Iterable first, Iterable second) { - return equal(first.iterator(), second.iterator()); - } + /** + * Compare elements, sort order and length + * + * @param Type of iterable to compare. + * @param first First iterable to compare. + * @param second Second iterable to compare. + * @return True if equal, false otherwise. + */ + public static boolean equal(Iterable first, Iterable second) { + return equal(first.iterator(), second.iterator()); + } - /** compare elements, sort order and length */ - public static boolean equal(Iterator first, Iterator second) { - while (first.hasNext() && second.hasNext()) { - T message = first.next(); - T otherMessage = second.next(); - /* element-wise equality */ - if (!(message == null ? otherMessage == null : - message.equals(otherMessage))) { - return false; - } - } - /* length must also be equal */ - return !(first.hasNext() || second.hasNext()); + /** + * Compare elements, sort order and length + * + * @param Type of iterable to compare. + * @param first First iterable to compare. + * @param second Second iterable to compare. + * @return True if equal, false otherwise. + */ + public static boolean equal(Iterator first, Iterator second) { + while (first.hasNext() && second.hasNext()) { + T message = first.next(); + T otherMessage = second.next(); + /* element-wise equality */ + if (!(message == null ? otherMessage == null : + message.equals(otherMessage))) { + return false; + } } + /* length must also be equal */ + return !(first.hasNext() || second.hasNext()); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/EmptyIterable.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/EmptyIterable.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/EmptyIterable.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/EmptyIterable.java Thu Feb 16 22:12:31 2012 @@ -20,26 +20,30 @@ package org.apache.giraph.utils; import java.util.Iterator; +/** + * Helper empty iterable when there are no messages. + * + * @param Message data + */ public class EmptyIterable implements Iterable, Iterator { - - @Override - public Iterator iterator() { - return this; - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public M next() { - return null; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } + @Override + public Iterator iterator() { + return this; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public M next() { + return null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java Thu Feb 16 22:12:31 2012 @@ -42,208 +42,253 @@ import java.util.concurrent.Executors; /** * A base class for running internal tests on a vertex * - * Extending classes only have to invoke the run() method to test their vertex. All data - * is written to a local tmp directory that is removed afterwards. A local zookeeper - * instance is started in an extra thread and shutdown at the end. + * Extending classes only have to invoke the run() method to test their vertex. + * All data is written to a local tmp directory that is removed afterwards. + * A local zookeeper instance is started in an extra thread and + * shutdown at the end. * * Heavily inspired from Apache Mahout's MahoutTestCase */ public class InternalVertexRunner { + /** ZooKeeper port to use for tests */ + public static final int LOCAL_ZOOKEEPER_PORT = 22182; - public static final int LOCAL_ZOOKEEPER_PORT = 22182; + /** + * Default constructor. + */ + private InternalVertexRunner() { } + + /** + * Attempts to run the vertex internally in the current JVM, reading from + * and writing to a temporary folder on local disk. Will start + * its own ZooKeeper instance. + * + * @param vertexClass the vertex class to instantiate + * @param vertexInputFormatClass the inputformat to use + * @param vertexOutputFormatClass the outputformat to use + * @param params a map of parameters to add to the hadoop configuration + * @param data linewise input data + * @return linewise output data + * @throws Exception + */ + public static Iterable run(Class vertexClass, + Class vertexInputFormatClass, Class vertexOutputFormatClass, + Map params, String... data) throws Exception { + return run(vertexClass, null, vertexInputFormatClass, + vertexOutputFormatClass, params, data); + } + + /** + * Attempts to run the vertex internally in the current JVM, reading from + * and writing to a temporary folder on local disk. Will start its own + * zookeeper instance. + * + * @param vertexClass the vertex class to instantiate + * @param vertexCombinerClass the vertex combiner to use (or null) + * @param vertexInputFormatClass the inputformat to use + * @param vertexOutputFormatClass the outputformat to use + * @param params a map of parameters to add to the hadoop configuration + * @param data linewise input data + * @return linewise output data + * @throws Exception + */ + public static Iterable run(Class vertexClass, + Class vertexCombinerClass, Class vertexInputFormatClass, + Class vertexOutputFormatClass, Map params, + String... data) throws Exception { + + File tmpDir = null; + try { + // prepare input file, output folder and zookeeper folder + tmpDir = createTestDir(vertexClass); + File inputFile = createTempFile(tmpDir, "graph.txt"); + File outputDir = createTempDir(tmpDir, "output"); + File zkDir = createTempDir(tmpDir, "zooKeeper"); + + // write input data to disk + writeLines(inputFile, data); + + // create and configure the job to run the vertex + GiraphJob job = new GiraphJob(vertexClass.getName()); + job.setVertexClass(vertexClass); + job.setVertexInputFormatClass(vertexInputFormatClass); + job.setVertexOutputFormatClass(vertexOutputFormatClass); + + if (vertexCombinerClass != null) { + job.setVertexCombinerClass(vertexCombinerClass); + } + + job.setWorkerConfiguration(1, 1, 100.0f); + Configuration conf = job.getConfiguration(); + conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false); + conf.setBoolean(GiraphJob.LOCAL_TEST_MODE, true); + conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" + + String.valueOf(LOCAL_ZOOKEEPER_PORT)); + + for (Map.Entry param : params.entrySet()) { + conf.set(param.getKey(), param.getValue()); + } + + FileInputFormat.addInputPath(job, new Path(inputFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(outputDir.toString())); + + // configure a local zookeeper instance + Properties zkProperties = new Properties(); + zkProperties.setProperty("tickTime", "2000"); + zkProperties.setProperty("dataDir", zkDir.getAbsolutePath()); + zkProperties.setProperty("clientPort", + String.valueOf(LOCAL_ZOOKEEPER_PORT)); + zkProperties.setProperty("maxClientCnxns", "10000"); + zkProperties.setProperty("minSessionTimeout", "10000"); + zkProperties.setProperty("maxSessionTimeout", "100000"); + zkProperties.setProperty("initLimit", "10"); + zkProperties.setProperty("syncLimit", "5"); + zkProperties.setProperty("snapCount", "50000"); + + QuorumPeerConfig qpConfig = new QuorumPeerConfig(); + qpConfig.parseProperties(zkProperties); + + // create and run the zookeeper instance + final InternalZooKeeper zookeeper = new InternalZooKeeper(); + final ServerConfig zkConfig = new ServerConfig(); + zkConfig.readFrom(qpConfig); - private InternalVertexRunner() { - } - - /** - * Attempts to run the vertex internally in the current JVM, reading from and writing to a - * temporary folder on local disk. Will start an own zookeeper instance. - * - * @param vertexClass the vertex class to instantiate - * @param vertexInputFormatClass the inputformat to use - * @param vertexOutputFormatClass the outputformat to use - * @param params a map of parameters to add to the hadoop configuration - * @param data linewise input data - * @return linewise output data - * @throws Exception - */ - public static Iterable run(Class vertexClass, - Class vertexInputFormatClass, Class vertexOutputFormatClass, - Map params, String... data) throws Exception { - return run(vertexClass, null, vertexInputFormatClass, - vertexOutputFormatClass, params, data); - } - - /** - * Attempts to run the vertex internally in the current JVM, reading from and writing to a - * temporary folder on local disk. Will start an own zookeeper instance. - * - * @param vertexClass the vertex class to instantiate - * @param vertexCombinerClass the vertex combiner to use (or null) - * @param vertexInputFormatClass the inputformat to use - * @param vertexOutputFormatClass the outputformat to use - * @param params a map of parameters to add to the hadoop configuration - * @param data linewise input data - * @return linewise output data - * @throws Exception - */ - public static Iterable run(Class vertexClass, - Class vertexCombinerClass, Class vertexInputFormatClass, - Class vertexOutputFormatClass, Map params, - String... data) throws Exception { - - File tmpDir = null; - try { - // prepare input file, output folder and zookeeper folder - tmpDir = createTestDir(vertexClass); - File inputFile = createTempFile(tmpDir, "graph.txt"); - File outputDir = createTempDir(tmpDir, "output"); - File zkDir = createTempDir(tmpDir, "zooKeeper"); - - // write input data to disk - writeLines(inputFile, data); - - // create and configure the job to run the vertex - GiraphJob job = new GiraphJob(vertexClass.getName()); - job.setVertexClass(vertexClass); - job.setVertexInputFormatClass(vertexInputFormatClass); - job.setVertexOutputFormatClass(vertexOutputFormatClass); - - if (vertexCombinerClass != null) { - job.setVertexCombinerClass(vertexCombinerClass); - } - - job.setWorkerConfiguration(1, 1, 100.0f); - Configuration conf = job.getConfiguration(); - conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false); - conf.setBoolean(GiraphJob.LOCAL_TEST_MODE, true); - conf.set(GiraphJob.ZOOKEEPER_LIST, "localhost:" + - String.valueOf(LOCAL_ZOOKEEPER_PORT)); - - for (Map.Entry param : params.entrySet()) { - conf.set(param.getKey(), param.getValue()); - } - - FileInputFormat.addInputPath(job, new Path(inputFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(outputDir.toString())); - - // configure a local zookeeper instance - Properties zkProperties = new Properties(); - zkProperties.setProperty("tickTime", "2000"); - zkProperties.setProperty("dataDir", zkDir.getAbsolutePath()); - zkProperties.setProperty("clientPort", - String.valueOf(LOCAL_ZOOKEEPER_PORT)); - zkProperties.setProperty("maxClientCnxns", "10000"); - zkProperties.setProperty("minSessionTimeout", "10000"); - zkProperties.setProperty("maxSessionTimeout", "100000"); - zkProperties.setProperty("initLimit", "10"); - zkProperties.setProperty("syncLimit", "5"); - zkProperties.setProperty("snapCount", "50000"); - - QuorumPeerConfig qpConfig = new QuorumPeerConfig(); - qpConfig.parseProperties(zkProperties); - - // create and run the zookeeper instance - final InternalZooKeeper zookeeper = new InternalZooKeeper(); - final ServerConfig zkConfig = new ServerConfig(); - zkConfig.readFrom(qpConfig); - - ExecutorService executorService = Executors.newSingleThreadExecutor(); - executorService.execute(new Runnable() { - @Override - public void run() { - try { - zookeeper.runFromConfig(zkConfig); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - try { - job.run(true); - } finally { - executorService.shutdown(); - zookeeper.end(); - } - - return Files.readLines(new File(outputDir, "part-m-00000"), - Charsets.UTF_8); - } finally { - if (tmpDir != null) { - new DeletingVisitor().accept(tmpDir); - } - } - } - - /** - * Create a temporary folder that will be removed after the test - */ - private static final File createTestDir(Class vertexClass) - throws IOException { - String systemTmpDir = System.getProperty("java.io.tmpdir"); - long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random()); - File testTempDir = new File(systemTmpDir, "giraph-" + - vertexClass.getSimpleName() + '-' + simpleRandomLong); - if (!testTempDir.mkdir()) { - throw new IOException("Could not create " + testTempDir); - } - testTempDir.deleteOnExit(); - return testTempDir; - } - - private static final File createTempFile(File parent, String name) - throws IOException { - return createTestTempFileOrDir(parent, name, false); - } - - private static final File createTempDir(File parent, String name) - throws IOException { - File dir = createTestTempFileOrDir(parent, name, true); - dir.delete(); - return dir; - } - - private static File createTestTempFileOrDir(File parent, String name, - boolean dir) throws IOException { - File f = new File(parent, name); - f.deleteOnExit(); - if (dir && !f.mkdirs()) { - throw new IOException("Could not make directory " + f); - } - return f; - } - - private static void writeLines(File file, String... lines) - throws IOException { - Writer writer = Files.newWriter(file, Charsets.UTF_8); - try { - for (String line : lines) { - writer.write(line); - writer.write('\n'); - } - } finally { - Closeables.closeQuietly(writer); - } - } - - private static class DeletingVisitor implements FileFilter { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.execute(new Runnable() { @Override - public boolean accept(File f) { - if (!f.isFile()) { - f.listFiles(this); - } - f.delete(); - return false; - } - } - + public void run() { + try { + zookeeper.runFromConfig(zkConfig); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + try { + job.run(true); + } finally { + executorService.shutdown(); + zookeeper.end(); + } + + return Files.readLines(new File(outputDir, "part-m-00000"), + Charsets.UTF_8); + } finally { + if (tmpDir != null) { + new DeletingVisitor().accept(tmpDir); + } + } + } + + /** + * Create a temporary folder that will be removed after the test. + * + * @param vertexClass Used for generating the folder name. + * @return File object for the directory. + */ + private static File createTestDir(Class vertexClass) + throws IOException { + String systemTmpDir = System.getProperty("java.io.tmpdir"); + long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random()); + File testTempDir = new File(systemTmpDir, "giraph-" + + vertexClass.getSimpleName() + '-' + simpleRandomLong); + if (!testTempDir.mkdir()) { + throw new IOException("Could not create " + testTempDir); + } + testTempDir.deleteOnExit(); + return testTempDir; + } + + /** + * Make a temporary file. + * + * @param parent Parent directory. + * @param name File name. + * @return File object to temporary file. + * @throws IOException + */ + private static File createTempFile(File parent, String name) + throws IOException { + return createTestTempFileOrDir(parent, name, false); + } + + /** + * Make a temporary directory. + * + * @param parent Parent directory. + * @param name Directory name. + * @return File object to temporary file. + * @throws IOException + */ + private static File createTempDir(File parent, String name) + throws IOException { + File dir = createTestTempFileOrDir(parent, name, true); + dir.delete(); + return dir; + } + + /** + * Creae a test temp file or directory. + * + * @param parent Parent directory + * @param name Name of file + * @param dir Is directory? + * @return File object + * @throws IOException + */ + private static File createTestTempFileOrDir(File parent, String name, + boolean dir) throws IOException { + File f = new File(parent, name); + f.deleteOnExit(); + if (dir && !f.mkdirs()) { + throw new IOException("Could not make directory " + f); + } + return f; + } + + /** + * Write lines to a file. + * + * @param file File to write lines to + * @param lines Strings written to the file + * @throws IOException + */ + private static void writeLines(File file, String... lines) + throws IOException { + Writer writer = Files.newWriter(file, Charsets.UTF_8); + try { + for (String line : lines) { + writer.write(line); + writer.write('\n'); + } + } finally { + Closeables.closeQuietly(writer); + } + } + + /** + * Deletes files. + */ + private static class DeletingVisitor implements FileFilter { + @Override + public boolean accept(File f) { + if (!f.isFile()) { + f.listFiles(this); + } + f.delete(); + return false; + } + } + + /** + * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown + */ + private static class InternalZooKeeper extends ZooKeeperServerMain { /** - * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown + * Shutdown the ZooKeeper instance. */ - private static class InternalZooKeeper extends ZooKeeperServerMain { - void end() { - shutdown(); - } + void end() { + shutdown(); } - + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java Thu Feb 16 22:12:31 2012 @@ -22,18 +22,20 @@ package org.apache.giraph.utils; * Helper static methods for tracking memory usage. */ public class MemoryUtils { - /** - * Get stringified runtime memory stats - * - * @return String of all Runtime stats. - */ - public static String getRuntimeMemoryStats() { - return "totalMem = " + - (Runtime.getRuntime().totalMemory() / 1024f / 1024f) + - "M, maxMem = " + - (Runtime.getRuntime().maxMemory() / 1024f / 1024f) + - "M, freeMem = " + - (Runtime.getRuntime().freeMemory() / 1024f / 1024f) - + "M"; - } + /** Do not instantiate. */ + private MemoryUtils() { } + + /** + * Get stringified runtime memory stats + * + * @return String of all Runtime stats. + */ + public static String getRuntimeMemoryStats() { + return "totalMem = " + + (Runtime.getRuntime().totalMemory() / 1024f / 1024f) + + "M, maxMem = " + + (Runtime.getRuntime().maxMemory() / 1024f / 1024f) + + "M, freeMem = " + + (Runtime.getRuntime().freeMemory() / 1024f / 1024f) + "M"; + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/ReflectionUtils.java Thu Feb 16 22:12:31 2012 @@ -35,114 +35,127 @@ import java.util.Map; * generic classes, not interfaces. */ public class ReflectionUtils { - /** - * Get the underlying class for a type, or null if the type is - * a variable type. - * - * @param type the type - * @return the underlying class - */ - public static Class getClass(Type type) { - if (type instanceof Class) { - return (Class) type; - } - else if (type instanceof ParameterizedType) { - return getClass(((ParameterizedType) type).getRawType()); - } - else if (type instanceof GenericArrayType) { - Type componentType = - ((GenericArrayType) type).getGenericComponentType(); - Class componentClass = getClass(componentType); - if (componentClass != null ) { - return Array.newInstance(componentClass, 0).getClass(); - } - else { - return null; - } - } - else { - return null; - } + /** + * Do not instantiate. + */ + private ReflectionUtils() { } + + /** + * Get the underlying class for a type, or null if the type is + * a variable type. + * + * @param type the type + * @return the underlying class + */ + public static Class getClass(Type type) { + if (type instanceof Class) { + return (Class) type; + } else if (type instanceof ParameterizedType) { + return getClass(((ParameterizedType) type).getRawType()); + } else if (type instanceof GenericArrayType) { + Type componentType = + ((GenericArrayType) type).getGenericComponentType(); + Class componentClass = getClass(componentType); + if (componentClass != null) { + return Array.newInstance(componentClass, 0).getClass(); + } else { + return null; + } + } else { + return null; } + } - /** - * Get the actual type arguments a child class has used to extend a - * generic base class. - * - * @param baseClass the base class - * @param childClass the child class - * @return a list of the raw classes for the actual type arguments. - */ - public static List> getTypeArguments( - Class baseClass, Class childClass) { - Map resolvedTypes = new HashMap(); - Type type = childClass; - // start walking up the inheritance hierarchy until we hit baseClass - while (! getClass(type).equals(baseClass)) { - if (type instanceof Class) { - // there is no useful information for us in raw types, - // so just keep going. - type = ((Class) type).getGenericSuperclass(); - } - else { - ParameterizedType parameterizedType = (ParameterizedType) type; - Class rawType = (Class) parameterizedType.getRawType(); - - Type[] actualTypeArguments = - parameterizedType.getActualTypeArguments(); - TypeVariable[] typeParameters = rawType.getTypeParameters(); - for (int i = 0; i < actualTypeArguments.length; i++) { - resolvedTypes.put(typeParameters[i], - actualTypeArguments[i]); - } - - if (!rawType.equals(baseClass)) { - type = rawType.getGenericSuperclass(); - } - } + /** + * Get the actual type arguments a child class has used to extend a + * generic base class. + * + * @param Type to evaluate. + * @param baseClass the base class + * @param childClass the child class + * @return a list of the raw classes for the actual type arguments. + */ + public static List> getTypeArguments( + Class baseClass, Class childClass) { + Map resolvedTypes = new HashMap(); + Type type = childClass; + // start walking up the inheritance hierarchy until we hit baseClass + while (! getClass(type).equals(baseClass)) { + if (type instanceof Class) { + // there is no useful information for us in raw types, + // so just keep going. + type = ((Class) type).getGenericSuperclass(); + } else { + ParameterizedType parameterizedType = (ParameterizedType) type; + Class rawType = (Class) parameterizedType.getRawType(); + + Type[] actualTypeArguments = + parameterizedType.getActualTypeArguments(); + TypeVariable[] typeParameters = rawType.getTypeParameters(); + for (int i = 0; i < actualTypeArguments.length; i++) { + resolvedTypes.put(typeParameters[i], + actualTypeArguments[i]); } - // finally, for each actual type argument provided to baseClass, - // determine (if possible) - // the raw class for that type argument. - Type[] actualTypeArguments; - if (type instanceof Class) { - actualTypeArguments = ((Class) type).getTypeParameters(); + if (!rawType.equals(baseClass)) { + type = rawType.getGenericSuperclass(); } - else { - actualTypeArguments = - ((ParameterizedType) type).getActualTypeArguments(); - } - List> typeArgumentsAsClasses = new ArrayList>(); - // resolve types by chasing down type variables. - for (Type baseType: actualTypeArguments) { - while (resolvedTypes.containsKey(baseType)) { - baseType = resolvedTypes.get(baseType); - } - typeArgumentsAsClasses.add(getClass(baseType)); - } - return typeArgumentsAsClasses; + } } - /** try to directly set a (possibly private) field on an Object */ - public static void setField(Object target, String fieldname, Object value) - throws NoSuchFieldException, IllegalAccessException { - Field field = findDeclaredField(target.getClass(), fieldname); - field.setAccessible(true); - field.set(target, value); + // finally, for each actual type argument provided to baseClass, + // determine (if possible) + // the raw class for that type argument. + Type[] actualTypeArguments; + if (type instanceof Class) { + actualTypeArguments = ((Class) type).getTypeParameters(); + } else { + actualTypeArguments = + ((ParameterizedType) type).getActualTypeArguments(); + } + List> typeArgumentsAsClasses = new ArrayList>(); + // resolve types by chasing down type variables. + for (Type baseType: actualTypeArguments) { + while (resolvedTypes.containsKey(baseType)) { + baseType = resolvedTypes.get(baseType); + } + typeArgumentsAsClasses.add(getClass(baseType)); } + return typeArgumentsAsClasses; + } - /** find a declared field in a class or one of its super classes */ - private static Field findDeclaredField(Class inClass, String fieldname) - throws NoSuchFieldException { - while (!Object.class.equals(inClass)) { - for (Field field : inClass.getDeclaredFields()) { - if (field.getName().equalsIgnoreCase(fieldname)) { - return field; - } - } - inClass = inClass.getSuperclass(); + /** + * Try to directly set a (possibly private) field on an Object. + * + * @param target Target to set the field on. + * @param fieldname Name of field. + * @param value Value to set on target. + */ + public static void setField(Object target, String fieldname, Object value) + throws NoSuchFieldException, IllegalAccessException { + Field field = findDeclaredField(target.getClass(), fieldname); + field.setAccessible(true); + field.set(target, value); + } + + /** + * Find a declared field in a class or one of its super classes + * + * @param inClass Class to search for declared field. + * @param fieldname Field name to search for + * @return Field or will throw. + * @throws NoSuchFieldException When field not found. + */ + private static Field findDeclaredField(Class inClass, String fieldname) + throws NoSuchFieldException { + while (!Object.class.equals(inClass)) { + for (Field field : inClass.getDeclaredFields()) { + if (field.getName().equalsIgnoreCase(fieldname)) { + return field; } - throw new NoSuchFieldException(); + } + inClass = inClass.getSuperclass(); } + throw new NoSuchFieldException(); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java Thu Feb 16 22:12:31 2012 @@ -24,24 +24,30 @@ import org.apache.hadoop.io.IntWritable; /** * {@link UnmodifiableIterator} over a primitive int array */ -public class UnmodifiableIntArrayIterator - extends UnmodifiableIterator { +public class UnmodifiableIntArrayIterator extends + UnmodifiableIterator { + /** Array to iterate over */ + private final int[] intArray; + /** Offset to array */ + private int offset; - private final int[] arr; - private int offset; + /** + * Constructor with array to iterate over. + * + * @param intArray Array to iterate over. + */ + public UnmodifiableIntArrayIterator(int[] intArray) { + this.intArray = intArray; + offset = 0; + } - public UnmodifiableIntArrayIterator(int[] arr) { - this.arr = arr; - offset = 0; - } + @Override + public boolean hasNext() { + return offset < intArray.length; + } - @Override - public boolean hasNext() { - return offset < arr.length; - } - - @Override - public IntWritable next() { - return new IntWritable(arr[offset++]); - } -} \ No newline at end of file + @Override + public IntWritable next() { + return new IntWritable(intArray[offset++]); + } +} Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java Thu Feb 16 22:12:31 2012 @@ -41,147 +41,215 @@ import org.apache.zookeeper.data.Stat; * Helper static methods for working with Writable objects. */ public class WritableUtils { - public static void readFieldsFromByteArray( - byte[] byteArray, Writable writableObject) { - DataInputStream inputStream = - new DataInputStream(new ByteArrayInputStream(byteArray)); - try { - writableObject.readFields(inputStream); - } catch (IOException e) { - throw new IllegalStateException( - "readFieldsFromByteArray: IOException", e); - } - } - - public static void readFieldsFromZnode(ZooKeeperExt zkExt, - String zkPath, - boolean watch, - Stat stat, - Writable writableObject) { - try { - byte[] zkData = zkExt.getData(zkPath, false, stat); - readFieldsFromByteArray(zkData, writableObject); - } catch (KeeperException e) { - throw new IllegalStateException( - "readFieldsFromZnode: KeeperException on " + zkPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "readFieldsFromZnode: InterrruptedStateException on " + zkPath, - e); - } - } - - public static byte[] writeToByteArray(Writable writableObject) { - ByteArrayOutputStream outputStream = - new ByteArrayOutputStream(); - DataOutput output = new DataOutputStream(outputStream); - try { - writableObject.write(output); - } catch (IOException e) { - throw new IllegalStateException( - "writeToByteArray: IOStateException", e); - } - return outputStream.toByteArray(); - } - - public static PathStat writeToZnode(ZooKeeperExt zkExt, - String zkPath, - int version, - Writable writableObject) { - try { - byte[] byteArray = writeToByteArray(writableObject); - return zkExt.createOrSetExt(zkPath, - byteArray, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true, - version); - } catch (KeeperException e) { - throw new IllegalStateException( - "writeToZnode: KeeperException on " + zkPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "writeToZnode: InterruptedException on " + zkPath, e); - } - } - - public static byte[] writeListToByteArray( - List writableList) { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - DataOutput output = new DataOutputStream(outputStream); - try { - output.writeInt(writableList.size()); - for (Writable writable : writableList) { - writable.write(output); - } - } catch (IOException e) { - throw new IllegalStateException( - "writeListToByteArray: IOException", e); - } - return outputStream.toByteArray(); - } - - public static PathStat writeListToZnode( - ZooKeeperExt zkExt, - String zkPath, - int version, - List writableList) { - try { - return zkExt.createOrSetExt( - zkPath, - writeListToByteArray(writableList), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true, - version); - } catch (KeeperException e) { - throw new IllegalStateException( - "writeListToZnode: KeeperException on " + zkPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "writeListToZnode: InterruptedException on " + zkPath, e); - } - } - - public static List readListFieldsFromByteArray( - byte[] byteArray, - Class writableClass, - Configuration conf) { - try { - DataInputStream inputStream = - new DataInputStream(new ByteArrayInputStream(byteArray)); - int size = inputStream.readInt(); - List writableList = new ArrayList(size); - for (int i = 0; i < size; ++i) { - Writable writable = - ReflectionUtils.newInstance(writableClass, conf); - writable.readFields(inputStream); - writableList.add(writable); - } - return writableList; - } catch (IOException e) { - throw new IllegalStateException( - "readListFieldsFromZnode: IOException", e); - } - } - - public static List readListFieldsFromZnode( - ZooKeeperExt zkExt, - String zkPath, - boolean watch, - Stat stat, - Class writableClass, - Configuration conf) { - try { - byte[] zkData = zkExt.getData(zkPath, false, stat); - return readListFieldsFromByteArray(zkData, writableClass, conf); - } catch (KeeperException e) { - throw new IllegalStateException( - "readListFieldsFromZnode: KeeperException on " + zkPath, e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "readListFieldsFromZnode: InterruptedException on " + zkPath, - e); - } + /** + * Don't construct. + */ + private WritableUtils() { } + + /** + * Read fields from byteArray to a Writeable object. + * + * @param byteArray Byte array to find the fields in. + * @param writableObject Object to fill in the fields. + */ + public static void readFieldsFromByteArray( + byte[] byteArray, Writable writableObject) { + DataInputStream inputStream = + new DataInputStream(new ByteArrayInputStream(byteArray)); + try { + writableObject.readFields(inputStream); + } catch (IOException e) { + throw new IllegalStateException( + "readFieldsFromByteArray: IOException", e); + } + } + + /** + * Read fields from a ZooKeeper znode. + * + * @param zkExt ZooKeeper instance. + * @param zkPath Path of znode. + * @param watch Add a watch? + * @param stat Stat of znode if desired. + * @param writableObject Object to read into. + */ + public static void readFieldsFromZnode(ZooKeeperExt zkExt, + String zkPath, + boolean watch, + Stat stat, + Writable writableObject) { + try { + byte[] zkData = zkExt.getData(zkPath, false, stat); + readFieldsFromByteArray(zkData, writableObject); + } catch (KeeperException e) { + throw new IllegalStateException( + "readFieldsFromZnode: KeeperException on " + zkPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "readFieldsFromZnode: InterrruptedStateException on " + zkPath, e); + } + } + + /** + * Write object to a byte array. + * + * @param writableObject Object to write from. + * @return Byte array with serialized object. + */ + public static byte[] writeToByteArray(Writable writableObject) { + ByteArrayOutputStream outputStream = + new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + try { + writableObject.write(output); + } catch (IOException e) { + throw new IllegalStateException( + "writeToByteArray: IOStateException", e); + } + return outputStream.toByteArray(); + } + + /** + * Write object to a ZooKeeper znode. + * + * @param zkExt ZooKeeper instance. + * @param zkPath Path of znode. + * @param version Version of the write. + * @param writableObject Object to write from. + * @return Path and stat information of the znode. + */ + public static PathStat writeToZnode(ZooKeeperExt zkExt, + String zkPath, + int version, + Writable writableObject) { + try { + byte[] byteArray = writeToByteArray(writableObject); + return zkExt.createOrSetExt(zkPath, + byteArray, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true, + version); + } catch (KeeperException e) { + throw new IllegalStateException( + "writeToZnode: KeeperException on " + zkPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "writeToZnode: InterruptedException on " + zkPath, e); + } + } + + /** + * Write list of object to a byte array. + * + * @param writableList List of object to write from. + * @return Byte array with serialized objects. + */ + public static byte[] writeListToByteArray( + List writableList) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + try { + output.writeInt(writableList.size()); + for (Writable writable : writableList) { + writable.write(output); + } + } catch (IOException e) { + throw new IllegalStateException( + "writeListToByteArray: IOException", e); + } + return outputStream.toByteArray(); + } + + /** + * Write list of objects to a ZooKeeper znode. + * + * @param zkExt ZooKeeper instance. + * @param zkPath Path of znode. + * @param version Version of the write. + * @param writableList List of objects to write from. + * @return Path and stat information of the znode. + */ + public static PathStat writeListToZnode( + ZooKeeperExt zkExt, + String zkPath, + int version, + List writableList) { + try { + return zkExt.createOrSetExt( + zkPath, + writeListToByteArray(writableList), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true, + version); + } catch (KeeperException e) { + throw new IllegalStateException( + "writeListToZnode: KeeperException on " + zkPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "writeListToZnode: InterruptedException on " + zkPath, e); + } + } + + /** + * Read fields from byteArray to a list of Writeable objects. + * + * @param byteArray Byte array to find the fields in. + * @param writableClass Class of the objects to instantiate. + * @param conf Configuration used for instantiation (i.e Configurable) + * @return List of writable objects. + */ + public static List readListFieldsFromByteArray( + byte[] byteArray, + Class writableClass, + Configuration conf) { + try { + DataInputStream inputStream = + new DataInputStream(new ByteArrayInputStream(byteArray)); + int size = inputStream.readInt(); + List writableList = new ArrayList(size); + for (int i = 0; i < size; ++i) { + Writable writable = + ReflectionUtils.newInstance(writableClass, conf); + writable.readFields(inputStream); + writableList.add(writable); + } + return writableList; + } catch (IOException e) { + throw new IllegalStateException( + "readListFieldsFromZnode: IOException", e); + } + } + + /** + * Read fields from a ZooKeeper znode into a list of objects. + * + * @param zkExt ZooKeeper instance. + * @param zkPath Path of znode. + * @param watch Add a watch? + * @param stat Stat of znode if desired. + * @param writableClass Class of the objects to instantiate. + * @param conf Configuration used for instantiation (i.e Configurable) + * @return List of writable objects. + */ + public static List readListFieldsFromZnode( + ZooKeeperExt zkExt, + String zkPath, + boolean watch, + Stat stat, + Class writableClass, + Configuration conf) { + try { + byte[] zkData = zkExt.getData(zkPath, false, stat); + return readListFieldsFromByteArray(zkData, writableClass, conf); + } catch (KeeperException e) { + throw new IllegalStateException( + "readListFieldsFromZnode: KeeperException on " + zkPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "readListFieldsFromZnode: InterruptedException on " + zkPath, + e); } + } } Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java) URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/package-info.java Thu Feb 16 22:12:31 2012 @@ -15,15 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.giraph.bsp; - /** - * State of the BSP application + * Package of all generic utility classes. */ -public enum ApplicationState { - UNKNOWN, ///< Shouldn't be seen, just an initial state - START_SUPERSTEP, ///< Start from a desired superstep - FAILED, ///< Unrecoverable - FINISHED ///< Successful completion -} +package org.apache.giraph.utils; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/BspEvent.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/BspEvent.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/BspEvent.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/BspEvent.java Thu Feb 16 22:12:31 2012 @@ -23,27 +23,27 @@ package org.apache.giraph.zk; * event. */ public interface BspEvent { - /** - * Reset the permanent signal. - */ - void reset(); + /** + * Reset the permanent signal. + */ + void reset(); - /** - * The event occurred and the occurrence has been logged for future - * waiters. - */ - void signal(); + /** + * The event occurred and the occurrence has been logged for future + * waiters. + */ + void signal(); - /** - * Wait until the event occurred or waiting timed out. - * @param msecs Milliseconds to wait until the event occurred. 0 indicates - * check immediately. -1 indicates wait forever. - * @return true if event occurred, false if timed out while waiting - */ - boolean waitMsecs(int msecs); + /** + * Wait until the event occurred or waiting timed out. + * @param msecs Milliseconds to wait until the event occurred. 0 indicates + * check immediately. -1 indicates wait forever. + * @return true if event occurred, false if timed out while waiting + */ + boolean waitMsecs(int msecs); - /** - * Wait indefinitely until the event occurs. - */ - void waitForever(); + /** + * Wait indefinitely until the event occurs. + */ + void waitForever(); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ContextLock.java Thu Feb 16 22:12:31 2012 @@ -24,29 +24,29 @@ import org.apache.hadoop.mapreduce.Mappe * A lock that will keep the job context updated while waiting. */ public class ContextLock extends PredicateLock { - /** Job context (for progress) */ - @SuppressWarnings("rawtypes") - private final Context context; - /** Msecs to refresh the progress meter */ - private static final int msecPeriod = 10000; + /** Msecs to refresh the progress meter */ + private static final int MSEC_PERIOD = 10000; + /** Job context (for progress) */ + @SuppressWarnings("rawtypes") + private final Context context; - /** - * Constructor. - * - * @param context used to call progress() - */ - ContextLock(@SuppressWarnings("rawtypes") Context context) { - this.context = context; - } + /** + * Constructor. + * + * @param context used to call progress() + */ + ContextLock(@SuppressWarnings("rawtypes") Context context) { + this.context = context; + } - /** - * Specialized version of waitForever() that will keep the job progressing - * while waiting. - */ - @Override - public void waitForever() { - while (waitMsecs(msecPeriod) == false) { - context.progress(); - } + /** + * Specialized version of waitForever() that will keep the job progressing + * while waiting. + */ + @Override + public void waitForever() { + while (!waitMsecs(MSEC_PERIOD)) { + context.progress(); } + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/PredicateLock.java Thu Feb 16 22:12:31 2012 @@ -29,89 +29,88 @@ import org.apache.log4j.Logger; * A lock with a predicate that was be used to synchronize events. */ public class PredicateLock implements BspEvent { - /** Lock */ - private Lock lock = new ReentrantLock(); - /** Condition associated with lock */ - private Condition cond = lock.newCondition(); - /** Predicate */ - private boolean eventOccurred = false; - /** Class logger */ - private Logger LOG = Logger.getLogger(PredicateLock.class); + /** Class logger */ + private static final Logger LOG = Logger.getLogger(PredicateLock.class); + /** Lock */ + private Lock lock = new ReentrantLock(); + /** Condition associated with lock */ + private Condition cond = lock.newCondition(); + /** Predicate */ + private boolean eventOccurred = false; - @Override - public void reset() { - lock.lock(); - try { - eventOccurred = false; - } finally { - lock.unlock(); - } + @Override + public void reset() { + lock.lock(); + try { + eventOccurred = false; + } finally { + lock.unlock(); } + } - @Override - public void signal() { - lock.lock(); - try { - eventOccurred = true; - cond.signalAll(); - } finally { - lock.unlock(); - } + @Override + public void signal() { + lock.lock(); + try { + eventOccurred = true; + cond.signalAll(); + } finally { + lock.unlock(); } + } - @Override - public boolean waitMsecs(int msecs) { - if (msecs < -1) { - throw new RuntimeException("msecs < -1"); - } + @Override + public boolean waitMsecs(int msecs) { + if (msecs < -1) { + throw new RuntimeException("msecs < -1"); + } - long maxMsecs = System.currentTimeMillis() + msecs; - long curMsecTimeout = 0; - lock.lock(); - try { - while (eventOccurred == false) { - if (msecs == -1) { - try { - cond.await(); - } catch (InterruptedException e) { - throw new IllegalStateException( - "waitMsecs: Caught interrupted " + - "exception on cond.await()", e); - } - } - else { - // Keep the wait non-negative - curMsecTimeout = - Math.max(maxMsecs - System.currentTimeMillis(), 0); - if (LOG.isDebugEnabled()) { - LOG.debug("waitMsecs: Wait for " + curMsecTimeout); - } - try { - boolean signaled = - cond.await(curMsecTimeout, TimeUnit.MILLISECONDS); - if (LOG.isDebugEnabled()) { - LOG.debug("waitMsecs: Got timed signaled of " + - signaled); - } - } catch (InterruptedException e) { - throw new IllegalStateException( - "waitMsecs: Caught interrupted " + - "exception on cond.await() " + - curMsecTimeout, e); - } - if (System.currentTimeMillis() > maxMsecs) { - return false; - } - } + long maxMsecs = System.currentTimeMillis() + msecs; + long curMsecTimeout = 0; + lock.lock(); + try { + while (!eventOccurred) { + if (msecs == -1) { + try { + cond.await(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "waitMsecs: Caught interrupted " + + "exception on cond.await()", e); + } + } else { + // Keep the wait non-negative + curMsecTimeout = + Math.max(maxMsecs - System.currentTimeMillis(), 0); + if (LOG.isDebugEnabled()) { + LOG.debug("waitMsecs: Wait for " + curMsecTimeout); + } + try { + boolean signaled = + cond.await(curMsecTimeout, TimeUnit.MILLISECONDS); + if (LOG.isDebugEnabled()) { + LOG.debug("waitMsecs: Got timed signaled of " + + signaled); } - } finally { - lock.unlock(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "waitMsecs: Caught interrupted " + + "exception on cond.await() " + + curMsecTimeout, e); + } + if (System.currentTimeMillis() > maxMsecs) { + return false; + } } - return true; + } + } finally { + lock.unlock(); } + return true; + } - @Override - public void waitForever() { - waitMsecs(-1); - } + @Override + public void waitForever() { + waitMsecs(-1); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java Thu Feb 16 22:12:31 2012 @@ -39,270 +39,268 @@ import org.apache.zookeeper.ZooKeeper; * non-atomic operations that are useful. */ public class ZooKeeperExt extends ZooKeeper { - /** Internal logger */ - private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class); - /** Length of the ZK sequence number */ - private static final int SEQUENCE_NUMBER_LENGTH = 10; - - /** - * Constructor to connect to ZooKeeper - * - * @param connectString Comma separated host:port pairs, each corresponding - * to a zk server. e.g. - * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional - * chroot suffix is used the example would look - * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" - * where the client would be rooted at "/app/a" and all paths - * would be relative to this root - ie getting/setting/etc... - * "/foo/bar" would result in operations being run on - * "/app/a/foo/bar" (from the server perspective). - * @param sessionTimeout Session timeout in milliseconds - * @param watcher A watcher object which will be notified of state changes, - * may also be notified for node events - * @throws IOException - */ - public ZooKeeperExt(String connectString, - int sessionTimeout, - Watcher watcher) throws IOException { - super(connectString, sessionTimeout, watcher); + /** Internal logger */ + private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class); + /** Length of the ZK sequence number */ + private static final int SEQUENCE_NUMBER_LENGTH = 10; + + /** + * Constructor to connect to ZooKeeper + * + * @param connectString Comma separated host:port pairs, each corresponding + * to a zk server. e.g. + * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional + * chroot suffix is used the example would look + * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" + * where the client would be rooted at "/app/a" and all paths + * would be relative to this root - ie getting/setting/etc... + * "/foo/bar" would result in operations being run on + * "/app/a/foo/bar" (from the server perspective). + * @param sessionTimeout Session timeout in milliseconds + * @param watcher A watcher object which will be notified of state changes, + * may also be notified for node events + * @throws IOException + */ + public ZooKeeperExt(String connectString, + int sessionTimeout, + Watcher watcher) throws IOException { + super(connectString, sessionTimeout, watcher); + } + + /** + * Provides a possibility of a creating a path consisting of more than one + * znode (not atomic). If recursive is false, operates exactly the + * same as create(). + * + * @param path path to create + * @param data data to set on the final znode + * @param acl acls on each znode created + * @param createMode only affects the final znode + * @param recursive if true, creates all ancestors + * @return Actual created path + * @throws KeeperException + * @throws InterruptedException + */ + public String createExt( + final String path, + byte[] data, + List acl, + CreateMode createMode, + boolean recursive) throws KeeperException, InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("createExt: Creating path " + path); } - /** - * Provides a possibility of a creating a path consisting of more than one - * znode (not atomic). If recursive is false, operates exactly the - * same as create(). - * - * @param path path to create - * @param data data to set on the final znode - * @param acl acls on each znode created - * @param createMode only affects the final znode - * @param recursive if true, creates all ancestors - * @return Actual created path - * @throws KeeperException - * @throws InterruptedException - */ - public String createExt( - final String path, - byte data[], - List acl, - CreateMode createMode, - boolean recursive) throws KeeperException, InterruptedException { - if (LOG.isDebugEnabled()) { - LOG.debug("createExt: Creating path " + path); - } - - if (!recursive) { - return create(path, data, acl, createMode); - } + if (!recursive) { + return create(path, data, acl, createMode); + } - try { - return create(path, data, acl, createMode); - } catch (KeeperException.NoNodeException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("createExt: Cannot directly create node " + path); - } - } + try { + return create(path, data, acl, createMode); + } catch (KeeperException.NoNodeException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("createExt: Cannot directly create node " + path); + } + } - int pos = path.indexOf("/", 1); - for (; pos != -1; pos = path.indexOf("/", pos + 1)) { - try { - create( - path.substring(0, pos), null, acl, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("createExt: Znode " + path.substring(0, pos) + - " already exists"); - } - } + int pos = path.indexOf("/", 1); + for (; pos != -1; pos = path.indexOf("/", pos + 1)) { + try { + create( + path.substring(0, pos), null, acl, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("createExt: Znode " + path.substring(0, pos) + + " already exists"); } - return create(path, data, acl, createMode); + } } + return create(path, data, acl, createMode); + } + + /** + * Data structure for handling the output of createOrSet() + */ + public class PathStat { + /** Path to created znode (if any) */ + private String path; + /** Stat from set znode (if any) */ + private Stat stat; /** - * Data structure for handling the output of createOrSet() + * Put in results from createOrSet() + * + * @param path Path to created znode (or null) + * @param stat Stat from set znode (if set) */ - public class PathStat { - private String path; - private Stat stat; - - /** - * Put in results from createOrSet() - * - * @param path Path to created znode (or null) - * @param stat Stat from set znode (if set) - */ - public PathStat(String path, Stat stat) { - this.path = path; - this.stat = stat; - } - - /** - * Get the path of the created znode if it was created. - * - * @return Path of created znode or null if not created - */ - public String getPath() { - return path; - } - - /** - * Get the stat of the set znode if set - * - * @return Stat of set znode or null if not set - */ - public Stat getStat() { - return stat; - } + public PathStat(String path, Stat stat) { + this.path = path; + this.stat = stat; } /** - * Create a znode. Set the znode if the created znode already exists. + * Get the path of the created znode if it was created. * - * @param path path to create - * @param data data to set on the final znode - * @param acl acls on each znode created - * @param createMode only affects the final znode - * @param recursive if true, creates all ancestors - * @return Path of created znode or Stat of set znode - * @throws InterruptedException - * @throws KeeperException + * @return Path of created znode or null if not created */ - public PathStat createOrSetExt(final String path, - byte data[], - List acl, - CreateMode createMode, - boolean recursive, - int version) - throws KeeperException, InterruptedException { - String createdPath = null; - Stat setStat = null; - try { - createdPath = createExt(path, data, acl, createMode, recursive); - } catch (KeeperException.NodeExistsException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("createOrSet: Node exists on path " + path); - } - setStat = setData(path, data, version); - } - return new PathStat(createdPath, setStat); + public String getPath() { + return path; } /** - * Create a znode if there is no other znode there + * Get the stat of the set znode if set * - * @param path path to create - * @param data data to set on the final znode - * @param acl acls on each znode created - * @param createMode only affects the final znode - * @param recursive if true, creates all ancestors - * @return Path of created znode or Stat of set znode - * @throws InterruptedException - * @throws KeeperException + * @return Stat of set znode or null if not set */ - public PathStat createOnceExt(final String path, - byte data[], - List acl, - CreateMode createMode, - boolean recursive) - throws KeeperException, InterruptedException { - String createdPath = null; - Stat setStat = null; - try { - createdPath = createExt(path, data, acl, createMode, recursive); - } catch (KeeperException.NodeExistsException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("createOnceExt: Node already exists on path " + path); - } - } - return new PathStat(createdPath, setStat); + public Stat getStat() { + return stat; } + } - /** - * Delete a path recursively. When the deletion is recursive, it is a - * non-atomic operation, hence, not part of ZooKeeper. - * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2) - * @param version expected version (-1 for all) - * @param recursive if true, remove all children, otherwise behave like - * remove() - * @throws InterruptedException - * @throws KeeperException - */ - public void deleteExt(final String path, int version, boolean recursive) - throws InterruptedException, KeeperException { - if (!recursive) { - delete(path, version); - return; - } + /** + * Create a znode. Set the znode if the created znode already exists. + * + * @param path path to create + * @param data data to set on the final znode + * @param acl acls on each znode created + * @param createMode only affects the final znode + * @param recursive if true, creates all ancestors + * @param version Version to set if setting + * @return Path of created znode or Stat of set znode + * @throws InterruptedException + * @throws KeeperException + */ + public PathStat createOrSetExt(final String path, + byte[] data, + List acl, + CreateMode createMode, + boolean recursive, + int version) throws KeeperException, InterruptedException { + String createdPath = null; + Stat setStat = null; + try { + createdPath = createExt(path, data, acl, createMode, recursive); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("createOrSet: Node exists on path " + path); + } + setStat = setData(path, data, version); + } + return new PathStat(createdPath, setStat); + } - try { - delete(path, version); - return; - } catch (KeeperException.NotEmptyException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("deleteExt: Cannot directly remove node " + path); - } - } + /** + * Create a znode if there is no other znode there + * + * @param path path to create + * @param data data to set on the final znode + * @param acl acls on each znode created + * @param createMode only affects the final znode + * @param recursive if true, creates all ancestors + * @return Path of created znode or Stat of set znode + * @throws InterruptedException + * @throws KeeperException + */ + public PathStat createOnceExt(final String path, + byte[] data, + List acl, + CreateMode createMode, + boolean recursive) throws KeeperException, InterruptedException { + String createdPath = null; + Stat setStat = null; + try { + createdPath = createExt(path, data, acl, createMode, recursive); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("createOnceExt: Node already exists on path " + path); + } + } + return new PathStat(createdPath, setStat); + } - List childList = getChildren(path, false); - for (String child : childList) { - deleteExt(path + "/" + child, -1, true); - } + /** + * Delete a path recursively. When the deletion is recursive, it is a + * non-atomic operation, hence, not part of ZooKeeper. + * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2) + * @param version expected version (-1 for all) + * @param recursive if true, remove all children, otherwise behave like + * remove() + * @throws InterruptedException + * @throws KeeperException + */ + public void deleteExt(final String path, int version, boolean recursive) + throws InterruptedException, KeeperException { + if (!recursive) { + delete(path, version); + return; + } - delete(path, version); + try { + delete(path, version); + return; + } catch (KeeperException.NotEmptyException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("deleteExt: Cannot directly remove node " + path); + } } - /** - * Get the children of the path with extensions. - * Extension 1: Sort the children based on sequence number - * Extension 2: Get the full path instead of relative path - * - * @param path path to znode - * @param watch set the watch? - * @param sequenceSorted sort by the sequence number - * @param fullPath if true, get the fully znode path back - * @return list of children - * @throws InterruptedException - * @throws KeeperException - */ - public List getChildrenExt( - final String path, - boolean watch, - boolean sequenceSorted, - boolean fullPath) - throws KeeperException, InterruptedException { - List childList = getChildren(path, watch); - /* Sort children according to the sequence number, if desired */ - if (sequenceSorted) { - Collections.sort(childList, - new Comparator() { - public int compare(String s1, String s2) { - if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) || - (s2.length() <= SEQUENCE_NUMBER_LENGTH)) { - throw new RuntimeException( - "getChildrenExt: Invalid length for sequence " + - " sorting > " + - SEQUENCE_NUMBER_LENGTH + - " for s1 (" + - s1.length() + ") or s2 (" + s2.length() + ")"); - } - int s1sequenceNumber = Integer.parseInt( - s1.substring(s1.length() - - SEQUENCE_NUMBER_LENGTH)); - int s2sequenceNumber = Integer.parseInt( - s2.substring(s2.length() - - SEQUENCE_NUMBER_LENGTH)); - return s1sequenceNumber - s2sequenceNumber; - } - } - ); - } - if (fullPath) { - List fullChildList = new ArrayList(); - for (String child : childList) { - fullChildList.add(path + "/" + child); - } - return fullChildList; + List childList = getChildren(path, false); + for (String child : childList) { + deleteExt(path + "/" + child, -1, true); + } + + delete(path, version); + } + + /** + * Get the children of the path with extensions. + * Extension 1: Sort the children based on sequence number + * Extension 2: Get the full path instead of relative path + * + * @param path path to znode + * @param watch set the watch? + * @param sequenceSorted sort by the sequence number + * @param fullPath if true, get the fully znode path back + * @return list of children + * @throws InterruptedException + * @throws KeeperException + */ + public List getChildrenExt( + final String path, + boolean watch, + boolean sequenceSorted, + boolean fullPath) throws KeeperException, InterruptedException { + List childList = getChildren(path, watch); + /* Sort children according to the sequence number, if desired */ + if (sequenceSorted) { + Collections.sort(childList, new Comparator() { + public int compare(String s1, String s2) { + if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) || + (s2.length() <= SEQUENCE_NUMBER_LENGTH)) { + throw new RuntimeException( + "getChildrenExt: Invalid length for sequence " + + " sorting > " + + SEQUENCE_NUMBER_LENGTH + + " for s1 (" + + s1.length() + ") or s2 (" + s2.length() + ")"); + } + int s1sequenceNumber = Integer.parseInt( + s1.substring(s1.length() - + SEQUENCE_NUMBER_LENGTH)); + int s2sequenceNumber = Integer.parseInt( + s2.substring(s2.length() - + SEQUENCE_NUMBER_LENGTH)); + return s1sequenceNumber - s2sequenceNumber; } - return childList; + }); + } + if (fullPath) { + List fullChildList = new ArrayList(); + for (String child : childList) { + fullChildList.add(path + "/" + child); + } + return fullChildList; } + return childList; + } }