http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
new file mode 100644
index 0000000..75e4d8f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.benchmark;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.examples.DoubleSumCombiner;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.PseudoRandomEdgeInputFormat;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * Default Pregel-style PageRank computation.
+ */
+public class PageRankBenchmark implements Tool {
+ /**
+ * Class logger
+ */
+ private static final Logger LOG = Logger.getLogger(PageRankBenchmark.class);
+ /**
+ * Configuration from Configurable
+ */
+ private Configuration conf;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public final int run(final String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("w",
+ "workers",
+ true,
+ "Number of workers");
+ options.addOption("s",
+ "supersteps",
+ true,
+ "Supersteps to execute before finishing");
+ options.addOption("V",
+ "aggregateVertices",
+ true,
+ "Aggregate vertices");
+ options.addOption("e",
+ "edgesPerVertex",
+ true,
+ "Edges per vertex");
+ options.addOption("c",
+ "vertexClass",
+ true,
+ "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex, " +
+ "2 for RepresentativeVertex, " +
+ "3 for RepresentativeVertex with unsafe, " +
+ "4 for HashMapVertex (using EdgeInputFormat), " +
+ "5 for MultiGraphEdgeListVertex (using EdgeInputFormat), " +
+ "6 for MultiGraphRepresentativeVertex (using " +
+ "EdgeInputFormat), " +
+ "7 for MultiGraphRepresentativeVertex with unsafe (using " +
+ "EdgeInputFormat))");
+ options.addOption("N",
+ "name",
+ true,
+ "Name of the job");
+ options.addOption("t",
+ "combinerType",
+ true,
+ "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)");
+
+ HelpFormatter formatter = new HelpFormatter();
+ if (args.length == 0) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+ if (cmd.hasOption('h')) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ if (!cmd.hasOption('w')) {
+ LOG.info("Need to choose the number of workers (-w)");
+ return -1;
+ }
+ if (!cmd.hasOption('s')) {
+ LOG.info("Need to set the number of supersteps (-s)");
+ return -1;
+ }
+ if (!cmd.hasOption('V')) {
+ LOG.info("Need to set the aggregate vertices (-V)");
+ return -1;
+ }
+ if (!cmd.hasOption('e')) {
+ LOG.info("Need to set the number of edges " +
+ "per vertex (-e)");
+ return -1;
+ }
+
+ int workers = Integer.parseInt(cmd.getOptionValue('w'));
+ String name = getClass().getName();
+ if (cmd.hasOption("N")) {
+ name = name + " " + cmd.getOptionValue("N");
+ }
+
+ GiraphJob job = new GiraphJob(getConf(), name);
+ GiraphConfiguration configuration = job.getConfiguration();
+ setVertexAndInputFormatClasses(cmd, configuration);
+ configuration.setWorkerConfiguration(workers, workers, 100.0f);
+ configuration.setInt(
+ PageRankComputation.SUPERSTEP_COUNT,
+ Integer.parseInt(cmd.getOptionValue('s')));
+
+ boolean isVerbose = false;
+ if (cmd.hasOption('v')) {
+ isVerbose = true;
+ }
+ if (job.run(isVerbose)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Set vertex class and input format class based on command-line arguments.
+ *
+ * @param cmd Command line arguments
+ * @param configuration Giraph job configuration
+ */
+ protected void setVertexAndInputFormatClasses(
+ CommandLine cmd, GiraphConfiguration configuration) {
+ int vertexClassOption = cmd.hasOption('c') ? Integer.parseInt(
+ cmd.getOptionValue('c')) : 1;
+ if (vertexClassOption == 1) {
+ configuration.setVertexClass(
+ EdgeListVertexPageRankBenchmark.class);
+ } else if (vertexClassOption == 0 || vertexClassOption == 4) {
+ configuration.setVertexClass(
+ HashMapVertexPageRankBenchmark.class);
+ } else if (vertexClassOption == 2) {
+ configuration.setVertexClass(
+ RepresentativeVertexPageRankBenchmark.class);
+ configuration.useUnsafeSerialization(false);
+ } else if (vertexClassOption == 3) {
+ configuration.setVertexClass(
+ RepresentativeVertexPageRankBenchmark.class);
+ configuration.useUnsafeSerialization(true);
+ } else if (vertexClassOption == 5) {
+ configuration.setVertexClass(
+ MultiGraphEdgeListVertexPageRankBenchmark.class);
+ } else if (vertexClassOption == 6) {
+ configuration.setVertexClass(
+ MultiGraphRepresentativeVertexPageRankBenchmark.class);
+ configuration.useUnsafeSerialization(false);
+ } else if (vertexClassOption == 7) {
+ configuration.setVertexClass(
+ MultiGraphRepresentativeVertexPageRankBenchmark.class);
+ configuration.useUnsafeSerialization(true);
+ }
+ LOG.info("Using class " +
+ configuration.get(GiraphConstants.VERTEX_CLASS));
+ if (!cmd.hasOption('t') ||
+ (Integer.parseInt(cmd.getOptionValue('t')) == 2)) {
+ configuration.setVertexCombinerClass(
+ DoubleSumCombiner.class);
+ } else if (Integer.parseInt(cmd.getOptionValue('t')) == 1) {
+ configuration.setVertexCombinerClass(
+ DoubleSumCombiner.class);
+ }
+ if (vertexClassOption <= 3) {
+ configuration.setVertexInputFormatClass(
+ PseudoRandomVertexInputFormat.class);
+ configuration.setLong(
+ PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+ Long.parseLong(cmd.getOptionValue('V')));
+ configuration.setLong(
+ PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+ Long.parseLong(cmd.getOptionValue('e')));
+ } else {
+ configuration.setEdgeInputFormatClass(
+ PseudoRandomEdgeInputFormat.class);
+ configuration.setLong(
+ PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES,
+ Long.parseLong(cmd.getOptionValue('V')));
+ configuration.setLong(
+ PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX,
+ Long.parseLong(cmd.getOptionValue('e')));
+ }
+ }
+
+ /**
+ * Execute the benchmark.
+ *
+ * @param args Typically the command line arguments.
+ * @throws Exception Any exception from the computation.
+ */
+ public static void main(final String[] args) throws Exception {
+ System.exit(ToolRunner.run(new PageRankBenchmark(), args));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
new file mode 100644
index 0000000..d6a8cb7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.MutableVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Shared computation of class Pregel-style PageRank computation for benchmark
+ * classes.
+ */
+public class PageRankComputation {
+ /** Number of supersteps */
+ public static final String SUPERSTEP_COUNT =
+ "PageRankBenchmark.superstepCount";
+
+ /**
+ * Do not construct.
+ */
+ private PageRankComputation() { }
+
+ /**
+ * Generic page rank algorithm.
+ *
+ * @param vertex Vertex to compute on.
+ * @param messages Iterator of messages from previous superstep.
+ */
+ public static void computePageRank(
+ MutableVertex<LongWritable, DoubleWritable, DoubleWritable,
+ DoubleWritable> vertex, Iterable<DoubleWritable> messages) {
+ if (vertex.getSuperstep() >= 1) {
+ double sum = 0;
+ for (DoubleWritable message : messages) {
+ sum += message.get();
+ }
+ DoubleWritable vertexValue = new DoubleWritable(
+ (0.15f / vertex.getTotalNumVertices()) + 0.85f * sum);
+ vertex.setValue(vertexValue);
+ }
+
+ if (vertex.getSuperstep() < vertex.getConf().getInt(SUPERSTEP_COUNT, -1)) {
+ long edges = vertex.getNumEdges();
+ vertex.sendMessageToAllEdges(
+ new DoubleWritable(vertex.getValue().get() / edges));
+ } else {
+ vertex.voteToHalt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
new file mode 100644
index 0000000..604c4a9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import java.util.Random;
+
+/**
+ * Random Message Benchmark for evaluating the messaging performance.
+ */
+public class RandomMessageBenchmark implements Tool {
+ /** How many supersteps to run */
+ public static final String SUPERSTEP_COUNT =
+ "RandomMessageBenchmark.superstepCount";
+ /** How many bytes per message */
+ public static final String NUM_BYTES_PER_MESSAGE =
+ "RandomMessageBenchmark.numBytesPerMessage";
+ /** Default bytes per message */
+ public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
+ /** How many messages per edge */
+ public static final String NUM_MESSAGES_PER_EDGE =
+ "RandomMessageBenchmark.numMessagesPerEdge";
+ /** Default messages per edge */
+ public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
+ /** All bytes sent during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_BYTES =
+ "superstep total bytes sent";
+ /** All bytes sent during this application */
+ public static final String AGG_TOTAL_BYTES = "total bytes sent";
+ /** All messages during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
+ "superstep total messages";
+ /** All messages during this application */
+ public static final String AGG_TOTAL_MESSAGES = "total messages";
+ /** All millis during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
+ "superstep total millis";
+ /** All millis during this application */
+ public static final String AGG_TOTAL_MILLIS = "total millis";
+ /** Workers for that superstep */
+ public static final String WORKERS = "workers";
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+ /** Configuration from Configurable */
+ private Configuration conf;
+
+ /**
+ * {@link WorkerContext} forRandomMessageBenchmark.
+ */
+ public static class RandomMessageBenchmarkWorkerContext extends
+ WorkerContext {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+ /** Bytes to be sent */
+ private byte[] messageBytes;
+ /** Number of messages sent per edge */
+ private int numMessagesPerEdge = -1;
+ /** Number of supersteps */
+ private int numSupersteps = -1;
+ /** Random generator for random bytes message */
+ private final Random random = new Random(System.currentTimeMillis());
+ /** Start superstep millis */
+ private long startSuperstepMillis = 0;
+ /** Total bytes */
+ private long totalBytes = 0;
+ /** Total messages */
+ private long totalMessages = 0;
+ /** Total millis */
+ private long totalMillis = 0;
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ messageBytes =
+ new byte[getContext().getConfiguration().
+ getInt(NUM_BYTES_PER_MESSAGE,
+ DEFAULT_NUM_BYTES_PER_MESSAGE)];
+ numMessagesPerEdge =
+ getContext().getConfiguration().
+ getInt(NUM_MESSAGES_PER_EDGE,
+ DEFAULT_NUM_MESSAGES_PER_EDGE);
+ numSupersteps = getContext().getConfiguration().
+ getInt(SUPERSTEP_COUNT, -1);
+ }
+
+ @Override
+ public void preSuperstep() {
+ long superstepBytes = this.<LongWritable>
+ getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get();
+ long superstepMessages = this.<LongWritable>
+ getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get();
+ long superstepMillis = this.<LongWritable>
+ getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get();
+ long workers = this.<LongWritable>getAggregatedValue(WORKERS).get();
+
+ // For timing and tracking the supersteps
+ // - superstep 0 starts the time, but cannot display any stats
+ // since nothing has been aggregated yet
+ // - supersteps > 0 can display the stats
+ if (getSuperstep() == 0) {
+ startSuperstepMillis = System.currentTimeMillis();
+ } else {
+ totalBytes += superstepBytes;
+ totalMessages += superstepMessages;
+ totalMillis += superstepMillis;
+ double superstepMegabytesPerSecond =
+ superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis;
+ double megabytesPerSecond = totalBytes *
+ workers * 1000d / 1024d / 1024d / totalMillis;
+ double superstepMessagesPerSecond =
+ superstepMessages * workers * 1000d / superstepMillis;
+ double messagesPerSecond =
+ totalMessages * workers * 1000d / totalMillis;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Outputing statistics for superstep " + getSuperstep());
+ LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes);
+ LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
+ LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages);
+ LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
+ LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis);
+ LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
+ LOG.info(WORKERS + " : " + workers);
+ LOG.info("Superstep megabytes / second = " +
+ superstepMegabytesPerSecond);
+ LOG.info("Total megabytes / second = " +
+ megabytesPerSecond);
+ LOG.info("Superstep messages / second = " +
+ superstepMessagesPerSecond);
+ LOG.info("Total messages / second = " +
+ messagesPerSecond);
+ LOG.info("Superstep megabytes / second / worker = " +
+ superstepMegabytesPerSecond / workers);
+ LOG.info("Total megabytes / second / worker = " +
+ megabytesPerSecond / workers);
+ LOG.info("Superstep messages / second / worker = " +
+ superstepMessagesPerSecond / workers);
+ LOG.info("Total messages / second / worker = " +
+ messagesPerSecond / workers);
+ }
+ }
+
+ aggregate(WORKERS, new LongWritable(1));
+ }
+
+ @Override
+ public void postSuperstep() {
+ long endSuperstepMillis = System.currentTimeMillis();
+ long superstepMillis = endSuperstepMillis - startSuperstepMillis;
+ startSuperstepMillis = endSuperstepMillis;
+ aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis));
+ }
+
+ @Override
+ public void postApplication() { }
+
+ /**
+ * Get the message bytes to be used for sending.
+ *
+ * @return Byte array used for messages.
+ */
+ public byte[] getMessageBytes() {
+ return messageBytes;
+ }
+
+ /**
+ * Get the number of edges per message.
+ *
+ * @return Messages per edge.
+ */
+ public int getNumMessagePerEdge() {
+ return numMessagesPerEdge;
+ }
+
+ /**
+ * Get the number of supersteps.
+ *
+ * @return Number of supersteps.
+ */
+ public int getNumSupersteps() {
+ return numSupersteps;
+ }
+
+ /**
+ * Randomize the message bytes.
+ */
+ public void randomizeMessageBytes() {
+ random.nextBytes(messageBytes);
+ }
+ }
+
+ /**
+ * Master compute associated with {@link RandomMessageBenchmark}.
+ * It registers required aggregators.
+ */
+ public static class RandomMessageBenchmarkMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
+ LongSumAggregator.class);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
+ LongSumAggregator.class);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
+ LongSumAggregator.class);
+ registerAggregator(WORKERS,
+ LongSumAggregator.class);
+ }
+ }
+
+ /**
+ * Actual message computation (messaging in this case)
+ */
+ public static class RandomMessageVertex extends EdgeListVertex<
+ LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
+ @Override
+ public void compute(Iterable<BytesWritable> messages) {
+ RandomMessageBenchmarkWorkerContext workerContext =
+ (RandomMessageBenchmarkWorkerContext) getWorkerContext();
+ if (getSuperstep() < workerContext.getNumSupersteps()) {
+ for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
+ workerContext.randomizeMessageBytes();
+ sendMessageToAllEdges(
+ new BytesWritable(workerContext.getMessageBytes()));
+ long bytesSent = workerContext.getMessageBytes().length *
+ getNumEdges();
+ aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
+ aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
+ new LongWritable(getNumEdges()));
+ }
+ } else {
+ voteToHalt();
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("w",
+ "workers",
+ true,
+ "Number of workers");
+ options.addOption("b",
+ "bytes",
+ true,
+ "Message bytes per memssage");
+ options.addOption("n",
+ "number",
+ true,
+ "Number of messages per edge");
+ options.addOption("s",
+ "supersteps",
+ true,
+ "Supersteps to execute before finishing");
+ options.addOption("V",
+ "aggregateVertices",
+ true,
+ "Aggregate vertices");
+ options.addOption("e",
+ "edgesPerVertex",
+ true,
+ "Edges per vertex");
+ options.addOption("f",
+ "flusher",
+ true,
+ "Number of flush threads");
+
+ HelpFormatter formatter = new HelpFormatter();
+ if (args.length == 0) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+ if (cmd.hasOption('h')) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ if (!cmd.hasOption('w')) {
+ LOG.info("Need to choose the number of workers (-w)");
+ return -1;
+ }
+ if (!cmd.hasOption('s')) {
+ LOG.info("Need to set the number of supersteps (-s)");
+ return -1;
+ }
+ if (!cmd.hasOption('V')) {
+ LOG.info("Need to set the aggregate vertices (-V)");
+ return -1;
+ }
+ if (!cmd.hasOption('e')) {
+ LOG.info("Need to set the number of edges " +
+ "per vertex (-e)");
+ return -1;
+ }
+ if (!cmd.hasOption('b')) {
+ LOG.info("Need to set the number of message bytes (-b)");
+ return -1;
+ }
+ if (!cmd.hasOption('n')) {
+ LOG.info("Need to set the number of messages per edge (-n)");
+ return -1;
+ }
+ int workers = Integer.parseInt(cmd.getOptionValue('w'));
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.getConfiguration().setVertexClass(RandomMessageVertex.class);
+ job.getConfiguration().setVertexInputFormatClass(
+ PseudoRandomVertexInputFormat.class);
+ job.getConfiguration().setWorkerContextClass(
+ RandomMessageBenchmarkWorkerContext.class);
+ job.getConfiguration().setMasterComputeClass(
+ RandomMessageBenchmarkMasterCompute.class);
+ job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+ job.getConfiguration().setLong(
+ PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+ Long.parseLong(cmd.getOptionValue('V')));
+ job.getConfiguration().setLong(
+ PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+ Long.parseLong(cmd.getOptionValue('e')));
+ job.getConfiguration().setInt(
+ SUPERSTEP_COUNT,
+ Integer.parseInt(cmd.getOptionValue('s')));
+ job.getConfiguration().setInt(
+ RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
+ Integer.parseInt(cmd.getOptionValue('b')));
+ job.getConfiguration().setInt(
+ RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
+ Integer.parseInt(cmd.getOptionValue('n')));
+
+ boolean isVerbose = false;
+ if (cmd.hasOption('v')) {
+ isVerbose = true;
+ }
+ if (cmd.hasOption('s')) {
+ getConf().setInt(SUPERSTEP_COUNT,
+ Integer.parseInt(cmd.getOptionValue('s')));
+ }
+ if (cmd.hasOption('f')) {
+ job.getConfiguration().setInt(
+ GiraphConstants.MSG_NUM_FLUSH_THREADS,
+ Integer.parseInt(cmd.getOptionValue('f')));
+ }
+ if (job.run(isVerbose)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Execute the benchmark.
+ *
+ * @param args Typically, this is the command line arguments.
+ * @throws Exception Any exception thrown during computation.
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
new file mode 100644
index 0000000..f12f8ac
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import java.io.IOException;
+import org.apache.giraph.graph.RepresentativeVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Same benchmark code as {@link PageRankBenchmark}, but uses
+ * {@link org.apache.giraph.graph.RepresentativeVertex}
+ * implementation.
+ */
+public class RepresentativeVertexPageRankBenchmark extends
+ RepresentativeVertex<LongWritable, DoubleWritable,
+ DoubleWritable, DoubleWritable> {
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws
+ IOException {
+ PageRankComputation.computePageRank(this, messages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
new file mode 100644
index 0000000..21fc0ac
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.examples.MinimumDoubleCombiner;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Single-source shortest paths benchmark.
+ */
+public class ShortestPathsBenchmark implements Tool {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(ShortestPathsBenchmark.class);
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * Vertex implementation
+ */
+ public static class ShortestPathsBenchmarkVertex extends
+ EdgeListVertex<LongWritable, DoubleWritable, DoubleWritable,
+ DoubleWritable> {
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ ShortestPathsComputation.computeShortestPaths(this, messages);
+ }
+ }
+
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public final int run(final String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("w",
+ "workers",
+ true,
+ "Number of workers");
+ options.addOption("V",
+ "aggregateVertices",
+ true,
+ "Aggregate vertices");
+ options.addOption("e",
+ "edgesPerVertex",
+ true,
+ "Edges per vertex");
+ options.addOption("c",
+ "vertexClass",
+ true,
+ "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex)");
+ options.addOption("nc",
+ "noCombiner",
+ false,
+ "Don't use a combiner");
+ HelpFormatter formatter = new HelpFormatter();
+ if (args.length == 0) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+ if (cmd.hasOption('h')) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ if (!cmd.hasOption('w')) {
+ LOG.info("Need to choose the number of workers (-w)");
+ return -1;
+ }
+ if (!cmd.hasOption('V')) {
+ LOG.info("Need to set the aggregate vertices (-V)");
+ return -1;
+ }
+ if (!cmd.hasOption('e')) {
+ LOG.info("Need to set the number of edges " +
+ "per vertex (-e)");
+ return -1;
+ }
+
+ int workers = Integer.parseInt(cmd.getOptionValue('w'));
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ if (!cmd.hasOption('c') ||
+ (Integer.parseInt(cmd.getOptionValue('c')) == 1)) {
+ job.getConfiguration().setVertexClass(ShortestPathsBenchmarkVertex.class);
+ } else {
+ job.getConfiguration().setVertexClass(
+ HashMapVertexShortestPathsBenchmark.class);
+ }
+ LOG.info("Using class " +
+ job.getConfiguration().get(GiraphConstants.VERTEX_CLASS));
+ job.getConfiguration().setVertexInputFormatClass(
+ PseudoRandomVertexInputFormat.class);
+ if (!cmd.hasOption("nc")) {
+ job.getConfiguration().setVertexCombinerClass(
+ MinimumDoubleCombiner.class);
+ }
+ job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+ job.getConfiguration().setLong(
+ PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+ Long.parseLong(cmd.getOptionValue('V')));
+ job.getConfiguration().setLong(
+ PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+ Long.parseLong(cmd.getOptionValue('e')));
+
+ boolean isVerbose = false;
+ if (cmd.hasOption('v')) {
+ isVerbose = true;
+ }
+ if (job.run(isVerbose)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Execute the benchmark.
+ *
+ * @param args Typically the command line arguments.
+ * @throws Exception Any exception from the computation.
+ */
+ public static void main(final String[] args) throws Exception {
+ System.exit(ToolRunner.run(new ShortestPathsBenchmark(), args));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
new file mode 100644
index 0000000..1a5128d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Default single-source shortest paths computation.
+ */
+public class ShortestPathsComputation {
+ /** Source id. */
+ public static final String SOURCE_ID = "ShortestPathsBenchmark.sourceId";
+ /** Default source id. */
+ public static final long SOURCE_ID_DEFAULT = 1;
+
+ /** Do not construct. */
+ private ShortestPathsComputation() { };
+
+ /**
+ * Is this vertex the source?
+ *
+ * @param vertex Candidate vertex
+ * @return Whether the vertex is the source.
+ */
+ private static boolean isSource(Vertex<LongWritable, DoubleWritable,
+ DoubleWritable, DoubleWritable> vertex) {
+ return vertex.getId().get() ==
+ vertex.getContext().getConfiguration().getLong(SOURCE_ID,
+ SOURCE_ID_DEFAULT);
+ }
+
+ /**
+ * Generic single-source shortest paths algorithm.
+ *
+ * @param vertex Vertex to run
+ * @param messages Incoming messages for vertex
+ */
+ public static void computeShortestPaths(
+ Vertex<LongWritable, DoubleWritable, DoubleWritable,
+ DoubleWritable> vertex,
+ Iterable<DoubleWritable> messages) {
+ if (vertex.getSuperstep() == 0) {
+ vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
+ }
+
+ double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
+ for (DoubleWritable message : messages) {
+ minDist = Math.min(minDist, message.get());
+ }
+
+ if (minDist < vertex.getValue().get()) {
+ vertex.setValue(new DoubleWritable(minDist));
+ for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+ double distance = minDist + edge.getValue().get();
+ vertex.sendMessage(edge.getTargetVertexId(),
+ new DoubleWritable(distance));
+ }
+ }
+
+ vertex.voteToHalt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java
new file mode 100644
index 0000000..66743fc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of benchmarks for performance testing and optimization
+ */
+package org.apache.giraph.benchmark;
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java
new file mode 100644
index 0000000..303ed06
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+/**
+ * State of the BSP application
+ */
+public enum ApplicationState {
+ /** Shouldn't be seen, just an initial state */
+ UNKNOWN,
+ /** Start from a desired superstep */
+ START_SUPERSTEP,
+ /** Unrecoverable */
+ FAILED,
+ /** Successful completion */
+ FINISHED,
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
new file mode 100644
index 0000000..bce84b1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This InputFormat supports the BSP model by ensuring that the user specifies
+ * how many splits (number of mappers) should be started simultaneously.
+ * The number of splits depends on whether the master and worker processes are
+ * separate. It is not meant to do any meaningful split of user-data.
+ */
+public class BspInputFormat extends InputFormat<Text, Text> {
+ /** Class Logger */
+ private static final Logger LOG = Logger.getLogger(BspInputFormat.class);
+
+ /**
+ * Get the correct number of mappers based on the configuration
+ *
+ * @param conf Configuration to determine the number of mappers
+ * @return Maximum number of tasks
+ */
+ public static int getMaxTasks(Configuration conf) {
+ int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
+ boolean splitMasterWorker =
+ conf.getBoolean(GiraphConstants.SPLIT_MASTER_WORKER,
+ GiraphConstants.SPLIT_MASTER_WORKER_DEFAULT);
+ int maxTasks = maxWorkers;
+ if (splitMasterWorker) {
+ int zkServers =
+ conf.getInt(GiraphConstants.ZOOKEEPER_SERVER_COUNT,
+ GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ maxTasks += zkServers;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
+ ", split master/worker = " + splitMasterWorker +
+ ", total max tasks = " + maxTasks);
+ }
+ return maxTasks;
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int maxTasks = getMaxTasks(conf);
+ if (maxTasks <= 0) {
+ throw new InterruptedException(
+ "getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
+ }
+ List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+ for (int i = 0; i < maxTasks; ++i) {
+ inputSplitList.add(new BspInputSplit());
+ }
+ return inputSplitList;
+ }
+
+ @Override
+ public RecordReader<Text, Text>
+ createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new BspRecordReader();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java
new file mode 100644
index 0000000..2258917
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This InputSplit will not give any ordering or location data.
+ * It is used internally by BspInputFormat (which determines
+ * how many tasks to run the application on). Users should not use this
+ * directly.
+ */
+public class BspInputSplit extends InputSplit implements Writable {
+ /** Number of splits */
+ private int numSplits = -1;
+ /** Split index */
+ private int splitIndex = -1;
+
+ /**
+ * Reflection constructor.
+ */
+ public BspInputSplit() { }
+
+ /**
+ * Constructor used by {@link BspInputFormat}.
+ *
+ * @param splitIndex Index of this split.
+ * @param numSplits Total number of splits.
+ */
+ public BspInputSplit(int splitIndex, int numSplits) {
+ this.splitIndex = splitIndex;
+ this.numSplits = numSplits;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{};
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ splitIndex = in.readInt();
+ numSplits = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(splitIndex);
+ out.writeInt(numSplits);
+ }
+
+ /**
+ * Get the index of this split.
+ *
+ * @return Index of this split.
+ */
+ public int getSplitIndex() {
+ return splitIndex;
+ }
+
+ /**
+ * Get the number of splits for this application.
+ *
+ * @return Total number of splits.
+ */
+ public int getNumSplits() {
+ return numSplits;
+ }
+
+ @Override
+ public String toString() {
+ return "'" + getClass().getCanonicalName() +
+ ", index=" + getSplitIndex() + ", num=" + getNumSplits();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
new file mode 100644
index 0000000..9e43ca6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * This is for internal use only. Allows the vertex output format routines
+ * to be called as if a normal Hadoop job.
+ */
+public class BspOutputFormat extends OutputFormat<Text, Text> {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(BspOutputFormat.class);
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) ==
+ null) {
+ LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" +
+ " will not check anything");
+ return;
+ }
+ BspUtils.createVertexOutputFormat(context.getConfiguration()).
+ checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) ==
+ null) {
+ LOG.warn("getOutputCommitter: Returning " +
+ "ImmutableOutputCommiter (does nothing).");
+ return new ImmutableOutputCommitter();
+ }
+ return BspUtils.createVertexOutputFormat(context.getConfiguration()).
+ getOutputCommitter(context);
+ }
+
+ @Override
+ public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new BspRecordWriter();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java
new file mode 100644
index 0000000..5646018
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Only returns a single key-value pair so that the map() can run.
+ */
+class BspRecordReader extends RecordReader<Text, Text> {
+ /** Singular key object */
+ private static final Text ONLY_KEY = new Text("only key");
+ /** Single value object */
+ private static final Text ONLY_VALUE = new Text("only value");
+
+ /** Has the one record been seen? */
+ private boolean seenRecord = false;
+
+ @Override
+ public void close() throws IOException {
+ return;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return seenRecord ? 1f : 0f;
+ }
+
+ @Override
+ public Text getCurrentKey() throws IOException, InterruptedException {
+ return ONLY_KEY;
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException {
+ return ONLY_VALUE;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!seenRecord) {
+ seenRecord = true;
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
new file mode 100644
index 0000000..c6bde1e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Used by {@link BspOutputFormat} since some versions of Hadoop
+ * require that a RecordWriter is returned from getRecordWriter.
+ * Does nothing, except insures that write is never called.
+ */
+public class BspRecordWriter extends RecordWriter<Text, Text> {
+
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ // Do nothing
+ }
+
+ @Override
+ public void write(Text key, Text value)
+ throws IOException, InterruptedException {
+ throw new IOException("write: Cannot write with " +
+ getClass().getName() +
+ ". Should never be called");
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
new file mode 100644
index 0000000..e28ebe1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Basic service interface shared by both {@link CentralizedServiceMaster} and
+ * {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface CentralizedService<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+
+
+ /**
+ * Get the current global superstep of the application to work on.
+ *
+ * @return global superstep (begins at INPUT_SUPERSTEP)
+ */
+ long getSuperstep();
+
+ /**
+ * Get the restarted superstep
+ *
+ * @return -1 if not manually restarted, otherwise the superstep id
+ */
+ long getRestartedSuperstep();
+
+ /**
+ * Given a superstep, should it be checkpointed based on the
+ * checkpoint frequency?
+ *
+ * @param superstep superstep to check against frequency
+ * @return true if checkpoint frequency met or superstep is 1.
+ */
+ boolean checkpointFrequencyMet(long superstep);
+
+ /**
+ * Get list of workers
+ *
+ * @return List of workers
+ */
+ List<WorkerInfo> getWorkerInfoList();
+
+ /**
+ * Clean up the service (no calls may be issued after this)
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void cleanup() throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
new file mode 100644
index 0000000..a328737
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import org.apache.giraph.graph.MasterAggregatorHandler;
+import org.apache.giraph.graph.MasterInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+
+/**
+ * At most, there will be one active master at a time, but many threads can
+ * be trying to be the active master.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface CentralizedServiceMaster<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> extends
+ CentralizedService<I, V, E, M> {
+ /**
+ * Setup (must be called prior to any other function)
+ */
+ void setup();
+
+ /**
+ * Become the master.
+ * @return true if became the master, false if the application is done.
+ */
+ boolean becomeMaster();
+
+ /**
+ * Get master information
+ *
+ * @return Master information
+ */
+ MasterInfo getMasterInfo();
+
+ /**
+ * Create the {@link InputSplit} objects from the index range based on the
+ * user-defined VertexInputFormat. The {@link InputSplit} objects will
+ * processed by the workers later on during the INPUT_SUPERSTEP.
+ *
+ * @return Number of splits. Returns -1 on failure to create
+ * valid input splits.
+ */
+ int createVertexInputSplits();
+
+ /**
+ * Create the {@link InputSplit} objects from the index range based on the
+ * user-defined EdgeInputFormat. The {@link InputSplit} objects will
+ * processed by the workers later on during the INPUT_SUPERSTEP.
+ *
+ * @return Number of splits. Returns -1 on failure to create
+ * valid input splits.
+ */
+ int createEdgeInputSplits();
+
+ /**
+ * Master coordinates the superstep
+ *
+ * @return State of the application as a result of this superstep
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ SuperstepState coordinateSuperstep()
+ throws KeeperException, InterruptedException;
+
+ /**
+ * Master can decide to restart from the last good checkpoint if a
+ * worker fails during a superstep.
+ *
+ * @param checkpoint Checkpoint to restart from
+ */
+ void restartFromCheckpoint(long checkpoint);
+
+ /**
+ * Get the last known good checkpoint
+ *
+ * @return Last good superstep number
+ * @throws IOException
+ */
+ long getLastGoodCheckpoint() throws IOException;
+
+ /**
+ * If the master decides that this job doesn't have the resources to
+ * continue, it can fail the job. It can also designate what to do next.
+ * Typically this is mainly informative.
+ *
+ * @param state State of the application.
+ * @param applicationAttempt Attempt to start on
+ * @param desiredSuperstep Superstep to restart from (if applicable)
+ */
+ void setJobState(ApplicationState state,
+ long applicationAttempt,
+ long desiredSuperstep);
+
+ /**
+ * Get master aggregator handler
+ *
+ * @return Master aggregator handler
+ */
+ MasterAggregatorHandler getAggregatorHandler();
+
+ /**
+ * Superstep has finished.
+ */
+ void postSuperstep();
+
+ /**
+ * Application has finished.
+ */
+ void postApplication();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
new file mode 100644
index 0000000..f359bd4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.MasterInfo;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.graph.WorkerAggregatorHandler;
+import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.WorkerContext;
+
+/**
+ * All workers should have access to this centralized service to
+ * execute the following methods.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface CentralizedServiceWorker<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends CentralizedService<I, V, E, M> {
+ /**
+ * Setup (must be called prior to any other function)
+ *
+ * @return Finished superstep stats for the input superstep
+ */
+ FinishedSuperstepStats setup();
+
+ /**
+ * Get the worker information
+ *
+ * @return Worker information
+ */
+ WorkerInfo getWorkerInfo();
+
+ /**
+ * Get the worker client (for instantiating WorkerClientRequestProcessor
+ * instances.
+ *
+ * @return Worker client
+ */
+ WorkerClient<I, V, E, M> getWorkerClient();
+
+ /**
+ * Get the worker context.
+ *
+ * @return worker's WorkerContext
+ */
+ WorkerContext getWorkerContext();
+
+ /**
+ * Get the partition store for this worker.
+ * The partitions contain the vertices for
+ * this worker and can be used to run compute() for the vertices or do
+ * checkpointing.
+ *
+ * @return The partition store for this worker.
+ */
+ PartitionStore<I, V, E, M> getPartitionStore();
+
+ /**
+ * Both the vertices and the messages need to be checkpointed in order
+ * for them to be used. This is done after all messages have been
+ * delivered, but prior to a superstep starting.
+ */
+ void storeCheckpoint() throws IOException;
+
+ /**
+ * Load the vertices, edges, messages from the beginning of a superstep.
+ * Will load the vertex partitions as designated by the master and set the
+ * appropriate superstep.
+ *
+ * @param superstep which checkpoint to use
+ * @return Graph-wide vertex and edge counts
+ * @throws IOException
+ */
+ VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
+
+ /**
+ * Take all steps prior to actually beginning the computation of a
+ * superstep.
+ *
+ * @param graphState Current graph state
+ * @return Collection of all the partition owners from the master for this
+ * superstep.
+ */
+ Collection<? extends PartitionOwner> startSuperstep(
+ GraphState<I, V, E, M> graphState);
+
+ /**
+ * Worker is done with its portion of the superstep. Report the
+ * worker level statistics after the computation.
+ *
+ * @param graphState Current graph state
+ * @param partitionStatsList All the partition stats for this worker
+ * @return Stats of the superstep completion
+ */
+ FinishedSuperstepStats finishSuperstep(
+ GraphState<I, V, E, M> graphState,
+ List<PartitionStats> partitionStatsList);
+
+ /**
+ * Get the partition that a vertex id would belong to.
+ *
+ * @param vertexId Id of the vertex that is used to find the correct
+ * partition.
+ * @return Correct partition if exists on this worker, null otherwise.
+ */
+ Partition<I, V, E, M> getPartition(I vertexId);
+
+ /**
+ * Get the partition id that a vertex id would belong to.
+ *
+ * @param vertexId Vertex id
+ * @return Partition id
+ */
+ Integer getPartitionId(I vertexId);
+
+ /**
+ * Whether a partition with given id exists on this worker.
+ *
+ * @param partitionId Partition id
+ * @return True iff this worker has the specified partition
+ */
+ boolean hasPartition(Integer partitionId);
+
+ /**
+ * Every client will need to get a partition owner from a vertex id so that
+ * they know which worker to sent the request to.
+ *
+ * @param vertexId Vertex index to look for
+ * @return PartitionOnwer that should contain this vertex if it exists
+ */
+ PartitionOwner getVertexPartitionOwner(I vertexId);
+
+ /**
+ * Get all partition owners.
+ *
+ * @return Iterable through partition owners
+ */
+ Iterable<? extends PartitionOwner> getPartitionOwners();
+
+ /**
+ * Look up a vertex on a worker given its vertex index.
+ *
+ * @param vertexId Vertex index to look for
+ * @return Vertex if it exists on this worker.
+ */
+ Vertex<I, V, E, M> getVertex(I vertexId);
+
+ /**
+ * If desired by the user, vertex partitions are redistributed among
+ * workers according to the chosen WorkerGraphPartitioner.
+ *
+ * @param masterSetPartitionOwners Partition owner info passed from the
+ * master.
+ */
+ void exchangeVertexPartitions(
+ Collection<? extends PartitionOwner> masterSetPartitionOwners);
+
+ /**
+ * Get master info
+ *
+ * @return Master info
+ */
+ MasterInfo getMasterInfo();
+
+ /**
+ * Get the GraphMapper that this service is using. Vertices need to know
+ * this.
+ *
+ * @return BspMapper
+ */
+ GraphMapper<I, V, E, M> getGraphMapper();
+
+ /**
+ * Operations that will be called if there is a failure by a worker.
+ */
+ void failureCleanup();
+
+ /**
+ * Get server data
+ *
+ * @return Server data
+ */
+ ServerData<I, V, E, M> getServerData();
+
+ /**
+ * Get worker aggregator handler
+ *
+ * @return Worker aggregator handler
+ */
+ WorkerAggregatorHandler getAggregatorHandler();
+
+ /**
+ * Final preparation for superstep, called after startSuperstep and
+ * potential loading from checkpoint, right before the computation started
+ * TODO how to avoid this additional function
+ */
+ void prepareSuperstep();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java b/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
new file mode 100644
index 0000000..feb4041
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This output committer doesn't do anything, meant for the case
+ * where output isn't desired, or as a base for not using
+ * FileOutputCommitter.
+ */
+public class ImmutableOutputCommitter extends OutputCommitter {
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ }
+
+ @Override
+ /*if[HADOOP_NON_SECURE]
+ public void cleanupJob(JobContext jobContext) throws IOException {
+ }
+ else[HADOOP_NON_SECURE]*/
+ /*end[HADOOP_NON_SECURE]*/
+ public void commitJob(JobContext jobContext) throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
new file mode 100644
index 0000000..c384fbf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+/**
+ * State of a coordinated superstep
+ */
+public enum SuperstepState {
+ /** Nothing happened yet */
+ INITIAL,
+ /** A worker died during this superstep */
+ WORKER_FAILURE,
+ /** This superstep completed correctly */
+ THIS_SUPERSTEP_DONE,
+ /** All supersteps are complete */
+ ALL_SUPERSTEPS_DONE,
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java b/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java
new file mode 100644
index 0000000..b5e7dc3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of generic bulk synchronous processing objects.
+ */
+package org.apache.giraph.bsp;
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
new file mode 100644
index 0000000..65f2079
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.graph.Aggregator;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+/**
+ * Interface for master to send messages to workers
+ */
+public interface MasterClient {
+ /**
+ * Make sure that all the connections to workers have been established.
+ */
+ void openConnections();
+
+ /**
+ * Sends aggregator to its owner
+ *
+ * @param aggregatorName Name of the aggregator
+ * @param aggregatorClass Class of the aggregator
+ * @param aggregatedValue Value of the aggregator
+ * @throws IOException
+ */
+ void sendAggregator(String aggregatorName,
+ Class<? extends Aggregator> aggregatorClass,
+ Writable aggregatedValue) throws IOException;
+
+ /**
+ * Flush aggregated values cache.
+ */
+ void finishSendingAggregatedValues() throws IOException;
+
+ /**
+ * Flush all outgoing messages. This will synchronously ensure that all
+ * messages have been send and delivered prior to returning.
+ */
+ void flush();
+
+ /**
+ * Closes all connections.
+ */
+ void closeConnections();
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
new file mode 100644
index 0000000..de991f8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Interface for master to receive messages from workers
+ */
+public interface MasterServer {
+ /**
+ * Get server address
+ *
+ * @return Address used by this server
+ */
+ InetSocketAddress getMyAddress();
+
+ /**
+ * Shuts down.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java b/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java
new file mode 100644
index 0000000..7f7fa43
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.utils.ArrayListWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wrapper around {@link ArrayListWritable} that allows the message class to
+ * be set prior to calling readFields().
+ *
+ * @param <M> message type
+ */
+public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 100L;
+
+ /**
+ * Default constructor.
+ */
+ public MsgList() {
+ super();
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param msgList List of messages for writing.
+ */
+ public MsgList(MsgList<M> msgList) {
+ super(msgList);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setClass() {
+ setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
+ }
+}
|