giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [3/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:30 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
new file mode 100644
index 0000000..75e4d8f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.benchmark;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.examples.DoubleSumCombiner;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.PseudoRandomEdgeInputFormat;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * Default Pregel-style PageRank computation.
+ */
+public class PageRankBenchmark implements Tool {
+  /**
+   * Class logger
+   */
+  private static final Logger LOG = Logger.getLogger(PageRankBenchmark.class);
+  /**
+   * Configuration from Configurable
+   */
+  private Configuration conf;
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public final int run(final String[] args) throws Exception {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("w",
+        "workers",
+        true,
+        "Number of workers");
+    options.addOption("s",
+        "supersteps",
+        true,
+        "Supersteps to execute before finishing");
+    options.addOption("V",
+        "aggregateVertices",
+        true,
+        "Aggregate vertices");
+    options.addOption("e",
+        "edgesPerVertex",
+        true,
+        "Edges per vertex");
+    options.addOption("c",
+        "vertexClass",
+        true,
+        "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex, " +
+            "2 for RepresentativeVertex, " +
+            "3 for RepresentativeVertex with unsafe, " +
+            "4 for HashMapVertex (using EdgeInputFormat), " +
+            "5 for MultiGraphEdgeListVertex (using EdgeInputFormat), " +
+            "6 for MultiGraphRepresentativeVertex (using " +
+            "EdgeInputFormat), " +
+            "7 for MultiGraphRepresentativeVertex with unsafe (using " +
+            "EdgeInputFormat))");
+    options.addOption("N",
+        "name",
+        true,
+        "Name of the job");
+    options.addOption("t",
+        "combinerType",
+        true,
+        "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)");
+
+    HelpFormatter formatter = new HelpFormatter();
+    if (args.length == 0) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+    if (cmd.hasOption('h')) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    if (!cmd.hasOption('w')) {
+      LOG.info("Need to choose the number of workers (-w)");
+      return -1;
+    }
+    if (!cmd.hasOption('s')) {
+      LOG.info("Need to set the number of supersteps (-s)");
+      return -1;
+    }
+    if (!cmd.hasOption('V')) {
+      LOG.info("Need to set the aggregate vertices (-V)");
+      return -1;
+    }
+    if (!cmd.hasOption('e')) {
+      LOG.info("Need to set the number of edges " +
+          "per vertex (-e)");
+      return -1;
+    }
+
+    int workers = Integer.parseInt(cmd.getOptionValue('w'));
+    String name = getClass().getName();
+    if (cmd.hasOption("N")) {
+      name = name + " " + cmd.getOptionValue("N");
+    }
+
+    GiraphJob job = new GiraphJob(getConf(), name);
+    GiraphConfiguration configuration = job.getConfiguration();
+    setVertexAndInputFormatClasses(cmd, configuration);
+    configuration.setWorkerConfiguration(workers, workers, 100.0f);
+    configuration.setInt(
+        PageRankComputation.SUPERSTEP_COUNT,
+        Integer.parseInt(cmd.getOptionValue('s')));
+
+    boolean isVerbose = false;
+    if (cmd.hasOption('v')) {
+      isVerbose = true;
+    }
+    if (job.run(isVerbose)) {
+      return 0;
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * Set vertex class and input format class based on command-line arguments.
+   *
+   * @param cmd Command line arguments
+   * @param configuration Giraph job configuration
+   */
+  protected void setVertexAndInputFormatClasses(
+      CommandLine cmd, GiraphConfiguration configuration) {
+    int vertexClassOption = cmd.hasOption('c') ? Integer.parseInt(
+        cmd.getOptionValue('c')) : 1;
+    if (vertexClassOption == 1) {
+      configuration.setVertexClass(
+          EdgeListVertexPageRankBenchmark.class);
+    } else if (vertexClassOption == 0 || vertexClassOption == 4) {
+      configuration.setVertexClass(
+          HashMapVertexPageRankBenchmark.class);
+    } else if (vertexClassOption == 2) {
+      configuration.setVertexClass(
+          RepresentativeVertexPageRankBenchmark.class);
+      configuration.useUnsafeSerialization(false);
+    } else if (vertexClassOption == 3) {
+      configuration.setVertexClass(
+          RepresentativeVertexPageRankBenchmark.class);
+      configuration.useUnsafeSerialization(true);
+    } else if (vertexClassOption == 5) {
+      configuration.setVertexClass(
+          MultiGraphEdgeListVertexPageRankBenchmark.class);
+    } else if (vertexClassOption == 6) {
+      configuration.setVertexClass(
+          MultiGraphRepresentativeVertexPageRankBenchmark.class);
+      configuration.useUnsafeSerialization(false);
+    } else if (vertexClassOption == 7) {
+      configuration.setVertexClass(
+          MultiGraphRepresentativeVertexPageRankBenchmark.class);
+      configuration.useUnsafeSerialization(true);
+    }
+    LOG.info("Using class " +
+        configuration.get(GiraphConstants.VERTEX_CLASS));
+    if (!cmd.hasOption('t') ||
+        (Integer.parseInt(cmd.getOptionValue('t')) == 2)) {
+      configuration.setVertexCombinerClass(
+          DoubleSumCombiner.class);
+    } else if (Integer.parseInt(cmd.getOptionValue('t')) == 1) {
+      configuration.setVertexCombinerClass(
+          DoubleSumCombiner.class);
+    }
+    if (vertexClassOption <= 3) {
+      configuration.setVertexInputFormatClass(
+          PseudoRandomVertexInputFormat.class);
+      configuration.setLong(
+          PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+          Long.parseLong(cmd.getOptionValue('V')));
+      configuration.setLong(
+          PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+          Long.parseLong(cmd.getOptionValue('e')));
+    } else {
+      configuration.setEdgeInputFormatClass(
+          PseudoRandomEdgeInputFormat.class);
+      configuration.setLong(
+          PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES,
+          Long.parseLong(cmd.getOptionValue('V')));
+      configuration.setLong(
+          PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX,
+          Long.parseLong(cmd.getOptionValue('e')));
+    }
+  }
+
+  /**
+   * Execute the benchmark.
+   *
+   * @param args Typically the command line arguments.
+   * @throws Exception Any exception from the computation.
+   */
+  public static void main(final String[] args) throws Exception {
+    System.exit(ToolRunner.run(new PageRankBenchmark(), args));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
new file mode 100644
index 0000000..d6a8cb7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.MutableVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Shared computation of class Pregel-style PageRank computation for benchmark
+ * classes.
+ */
+public class PageRankComputation {
+  /** Number of supersteps */
+  public static final String SUPERSTEP_COUNT =
+      "PageRankBenchmark.superstepCount";
+
+  /**
+   * Do not construct.
+   */
+  private PageRankComputation() { }
+
+  /**
+   * Generic page rank algorithm.
+   *
+   * @param vertex Vertex to compute on.
+   * @param messages Iterator of messages from previous superstep.
+   */
+  public static void computePageRank(
+      MutableVertex<LongWritable, DoubleWritable, DoubleWritable,
+      DoubleWritable> vertex, Iterable<DoubleWritable> messages) {
+    if (vertex.getSuperstep() >= 1) {
+      double sum = 0;
+      for (DoubleWritable message : messages) {
+        sum += message.get();
+      }
+      DoubleWritable vertexValue = new DoubleWritable(
+          (0.15f / vertex.getTotalNumVertices()) + 0.85f * sum);
+      vertex.setValue(vertexValue);
+    }
+
+    if (vertex.getSuperstep() < vertex.getConf().getInt(SUPERSTEP_COUNT, -1)) {
+      long edges = vertex.getNumEdges();
+      vertex.sendMessageToAllEdges(
+          new DoubleWritable(vertex.getValue().get() / edges));
+    } else {
+      vertex.voteToHalt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
new file mode 100644
index 0000000..604c4a9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import java.util.Random;
+
+/**
+ * Random Message Benchmark for evaluating the messaging performance.
+ */
+public class RandomMessageBenchmark implements Tool {
+  /** How many supersteps to run */
+  public static final String SUPERSTEP_COUNT =
+      "RandomMessageBenchmark.superstepCount";
+  /** How many bytes per message */
+  public static final String NUM_BYTES_PER_MESSAGE =
+      "RandomMessageBenchmark.numBytesPerMessage";
+  /** Default bytes per message */
+  public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
+  /** How many messages per edge */
+  public static final String NUM_MESSAGES_PER_EDGE =
+      "RandomMessageBenchmark.numMessagesPerEdge";
+  /** Default messages per edge */
+  public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
+  /** All bytes sent during this superstep */
+  public static final String AGG_SUPERSTEP_TOTAL_BYTES =
+      "superstep total bytes sent";
+  /** All bytes sent during this application */
+  public static final String AGG_TOTAL_BYTES = "total bytes sent";
+  /** All messages during this superstep */
+  public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
+      "superstep total messages";
+  /** All messages during this application */
+  public static final String AGG_TOTAL_MESSAGES = "total messages";
+  /** All millis during this superstep */
+  public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
+      "superstep total millis";
+  /** All millis during this application */
+  public static final String AGG_TOTAL_MILLIS = "total millis";
+  /** Workers for that superstep */
+  public static final String WORKERS = "workers";
+  /** Class logger */
+  private static final Logger LOG =
+    Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+  /** Configuration from Configurable */
+  private Configuration conf;
+
+  /**
+   * {@link WorkerContext} forRandomMessageBenchmark.
+   */
+  public static class RandomMessageBenchmarkWorkerContext extends
+      WorkerContext {
+    /** Class logger */
+    private static final Logger LOG =
+      Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+    /** Bytes to be sent */
+    private byte[] messageBytes;
+    /** Number of messages sent per edge */
+    private int numMessagesPerEdge = -1;
+    /** Number of supersteps */
+    private int numSupersteps = -1;
+    /** Random generator for random bytes message */
+    private final Random random = new Random(System.currentTimeMillis());
+    /** Start superstep millis */
+    private long startSuperstepMillis = 0;
+    /** Total bytes */
+    private long totalBytes = 0;
+    /** Total messages */
+    private long totalMessages = 0;
+    /** Total millis */
+    private long totalMillis = 0;
+
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+      messageBytes =
+        new byte[getContext().getConfiguration().
+                 getInt(NUM_BYTES_PER_MESSAGE,
+                 DEFAULT_NUM_BYTES_PER_MESSAGE)];
+      numMessagesPerEdge =
+          getContext().getConfiguration().
+          getInt(NUM_MESSAGES_PER_EDGE,
+              DEFAULT_NUM_MESSAGES_PER_EDGE);
+      numSupersteps = getContext().getConfiguration().
+          getInt(SUPERSTEP_COUNT, -1);
+    }
+
+    @Override
+    public void preSuperstep() {
+      long superstepBytes = this.<LongWritable>
+          getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get();
+      long superstepMessages = this.<LongWritable>
+          getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get();
+      long superstepMillis = this.<LongWritable>
+          getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get();
+      long workers = this.<LongWritable>getAggregatedValue(WORKERS).get();
+
+      // For timing and tracking the supersteps
+      // - superstep 0 starts the time, but cannot display any stats
+      //   since nothing has been aggregated yet
+      // - supersteps > 0 can display the stats
+      if (getSuperstep() == 0) {
+        startSuperstepMillis = System.currentTimeMillis();
+      } else {
+        totalBytes += superstepBytes;
+        totalMessages += superstepMessages;
+        totalMillis += superstepMillis;
+        double superstepMegabytesPerSecond =
+            superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis;
+        double megabytesPerSecond = totalBytes *
+            workers * 1000d / 1024d / 1024d / totalMillis;
+        double superstepMessagesPerSecond =
+            superstepMessages * workers * 1000d / superstepMillis;
+        double messagesPerSecond =
+            totalMessages * workers * 1000d / totalMillis;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Outputing statistics for superstep " + getSuperstep());
+          LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes);
+          LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
+          LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages);
+          LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
+          LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis);
+          LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
+          LOG.info(WORKERS + " : " + workers);
+          LOG.info("Superstep megabytes / second = " +
+              superstepMegabytesPerSecond);
+          LOG.info("Total megabytes / second = " +
+              megabytesPerSecond);
+          LOG.info("Superstep messages / second = " +
+              superstepMessagesPerSecond);
+          LOG.info("Total messages / second = " +
+              messagesPerSecond);
+          LOG.info("Superstep megabytes / second / worker = " +
+              superstepMegabytesPerSecond / workers);
+          LOG.info("Total megabytes / second / worker = " +
+              megabytesPerSecond / workers);
+          LOG.info("Superstep messages / second / worker = " +
+              superstepMessagesPerSecond / workers);
+          LOG.info("Total messages / second / worker = " +
+              messagesPerSecond / workers);
+        }
+      }
+
+      aggregate(WORKERS, new LongWritable(1));
+    }
+
+    @Override
+    public void postSuperstep() {
+      long endSuperstepMillis = System.currentTimeMillis();
+      long superstepMillis = endSuperstepMillis - startSuperstepMillis;
+      startSuperstepMillis = endSuperstepMillis;
+      aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis));
+    }
+
+    @Override
+    public void postApplication() { }
+
+    /**
+     * Get the message bytes to be used for sending.
+     *
+     * @return Byte array used for messages.
+     */
+    public byte[] getMessageBytes() {
+      return messageBytes;
+    }
+
+    /**
+     * Get the number of edges per message.
+     *
+     * @return Messages per edge.
+     */
+    public int getNumMessagePerEdge() {
+      return numMessagesPerEdge;
+    }
+
+    /**
+     * Get the number of supersteps.
+     *
+     * @return Number of supersteps.
+     */
+    public int getNumSupersteps() {
+      return numSupersteps;
+    }
+
+    /**
+     * Randomize the message bytes.
+     */
+    public void randomizeMessageBytes() {
+      random.nextBytes(messageBytes);
+    }
+  }
+
+  /**
+   * Master compute associated with {@link RandomMessageBenchmark}.
+   * It registers required aggregators.
+   */
+  public static class RandomMessageBenchmarkMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
+          LongSumAggregator.class);
+      registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
+          LongSumAggregator.class);
+      registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
+          LongSumAggregator.class);
+      registerAggregator(WORKERS,
+          LongSumAggregator.class);
+    }
+  }
+
+  /**
+   * Actual message computation (messaging in this case)
+   */
+  public static class RandomMessageVertex extends EdgeListVertex<
+      LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
+    @Override
+    public void compute(Iterable<BytesWritable> messages) {
+      RandomMessageBenchmarkWorkerContext workerContext =
+          (RandomMessageBenchmarkWorkerContext) getWorkerContext();
+      if (getSuperstep() < workerContext.getNumSupersteps()) {
+        for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
+          workerContext.randomizeMessageBytes();
+          sendMessageToAllEdges(
+              new BytesWritable(workerContext.getMessageBytes()));
+          long bytesSent = workerContext.getMessageBytes().length *
+              getNumEdges();
+          aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
+          aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
+              new LongWritable(getNumEdges()));
+        }
+      } else {
+        voteToHalt();
+      }
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("w",
+        "workers",
+        true,
+        "Number of workers");
+    options.addOption("b",
+        "bytes",
+        true,
+        "Message bytes per memssage");
+    options.addOption("n",
+        "number",
+        true,
+        "Number of messages per edge");
+    options.addOption("s",
+        "supersteps",
+        true,
+        "Supersteps to execute before finishing");
+    options.addOption("V",
+        "aggregateVertices",
+        true,
+        "Aggregate vertices");
+    options.addOption("e",
+        "edgesPerVertex",
+        true,
+        "Edges per vertex");
+    options.addOption("f",
+        "flusher",
+        true,
+        "Number of flush threads");
+
+    HelpFormatter formatter = new HelpFormatter();
+    if (args.length == 0) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+    if (cmd.hasOption('h')) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    if (!cmd.hasOption('w')) {
+      LOG.info("Need to choose the number of workers (-w)");
+      return -1;
+    }
+    if (!cmd.hasOption('s')) {
+      LOG.info("Need to set the number of supersteps (-s)");
+      return -1;
+    }
+    if (!cmd.hasOption('V')) {
+      LOG.info("Need to set the aggregate vertices (-V)");
+      return -1;
+    }
+    if (!cmd.hasOption('e')) {
+      LOG.info("Need to set the number of edges " +
+          "per vertex (-e)");
+      return -1;
+    }
+    if (!cmd.hasOption('b')) {
+      LOG.info("Need to set the number of message bytes (-b)");
+      return -1;
+    }
+    if (!cmd.hasOption('n')) {
+      LOG.info("Need to set the number of messages per edge (-n)");
+      return -1;
+    }
+    int workers = Integer.parseInt(cmd.getOptionValue('w'));
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    job.getConfiguration().setVertexClass(RandomMessageVertex.class);
+    job.getConfiguration().setVertexInputFormatClass(
+        PseudoRandomVertexInputFormat.class);
+    job.getConfiguration().setWorkerContextClass(
+        RandomMessageBenchmarkWorkerContext.class);
+    job.getConfiguration().setMasterComputeClass(
+        RandomMessageBenchmarkMasterCompute.class);
+    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+    job.getConfiguration().setLong(
+        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+        Long.parseLong(cmd.getOptionValue('V')));
+    job.getConfiguration().setLong(
+        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+        Long.parseLong(cmd.getOptionValue('e')));
+    job.getConfiguration().setInt(
+        SUPERSTEP_COUNT,
+        Integer.parseInt(cmd.getOptionValue('s')));
+    job.getConfiguration().setInt(
+        RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
+        Integer.parseInt(cmd.getOptionValue('b')));
+    job.getConfiguration().setInt(
+        RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
+        Integer.parseInt(cmd.getOptionValue('n')));
+
+    boolean isVerbose = false;
+    if (cmd.hasOption('v')) {
+      isVerbose = true;
+    }
+    if (cmd.hasOption('s')) {
+      getConf().setInt(SUPERSTEP_COUNT,
+          Integer.parseInt(cmd.getOptionValue('s')));
+    }
+    if (cmd.hasOption('f')) {
+      job.getConfiguration().setInt(
+          GiraphConstants.MSG_NUM_FLUSH_THREADS,
+          Integer.parseInt(cmd.getOptionValue('f')));
+    }
+    if (job.run(isVerbose)) {
+      return 0;
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * Execute the benchmark.
+   *
+   * @param args Typically, this is the command line arguments.
+   * @throws Exception Any exception thrown during computation.
+   */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
new file mode 100644
index 0000000..f12f8ac
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import java.io.IOException;
+import org.apache.giraph.graph.RepresentativeVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Same benchmark code as {@link PageRankBenchmark}, but uses
+ * {@link org.apache.giraph.graph.RepresentativeVertex}
+ * implementation.
+ */
+public class RepresentativeVertexPageRankBenchmark extends
+    RepresentativeVertex<LongWritable, DoubleWritable,
+        DoubleWritable, DoubleWritable> {
+  @Override
+  public void compute(Iterable<DoubleWritable> messages) throws
+      IOException {
+    PageRankComputation.computePageRank(this, messages);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
new file mode 100644
index 0000000..21fc0ac
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.examples.MinimumDoubleCombiner;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Single-source shortest paths benchmark.
+ */
+public class ShortestPathsBenchmark implements Tool {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(ShortestPathsBenchmark.class);
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+   * Vertex implementation
+   */
+  public static class ShortestPathsBenchmarkVertex extends
+      EdgeListVertex<LongWritable, DoubleWritable, DoubleWritable,
+          DoubleWritable> {
+    @Override
+    public void compute(Iterable<DoubleWritable> messages) throws IOException {
+      ShortestPathsComputation.computeShortestPaths(this, messages);
+    }
+  }
+
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public final int run(final String[] args) throws Exception {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("w",
+        "workers",
+        true,
+        "Number of workers");
+    options.addOption("V",
+        "aggregateVertices",
+        true,
+        "Aggregate vertices");
+    options.addOption("e",
+        "edgesPerVertex",
+        true,
+        "Edges per vertex");
+    options.addOption("c",
+        "vertexClass",
+        true,
+        "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex)");
+    options.addOption("nc",
+        "noCombiner",
+        false,
+        "Don't use a combiner");
+    HelpFormatter formatter = new HelpFormatter();
+    if (args.length == 0) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+    if (cmd.hasOption('h')) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    if (!cmd.hasOption('w')) {
+      LOG.info("Need to choose the number of workers (-w)");
+      return -1;
+    }
+    if (!cmd.hasOption('V')) {
+      LOG.info("Need to set the aggregate vertices (-V)");
+      return -1;
+    }
+    if (!cmd.hasOption('e')) {
+      LOG.info("Need to set the number of edges " +
+          "per vertex (-e)");
+      return -1;
+    }
+
+    int workers = Integer.parseInt(cmd.getOptionValue('w'));
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    if (!cmd.hasOption('c') ||
+        (Integer.parseInt(cmd.getOptionValue('c')) == 1)) {
+      job.getConfiguration().setVertexClass(ShortestPathsBenchmarkVertex.class);
+    } else {
+      job.getConfiguration().setVertexClass(
+          HashMapVertexShortestPathsBenchmark.class);
+    }
+    LOG.info("Using class " +
+        job.getConfiguration().get(GiraphConstants.VERTEX_CLASS));
+    job.getConfiguration().setVertexInputFormatClass(
+        PseudoRandomVertexInputFormat.class);
+    if (!cmd.hasOption("nc")) {
+      job.getConfiguration().setVertexCombinerClass(
+          MinimumDoubleCombiner.class);
+    }
+    job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f);
+    job.getConfiguration().setLong(
+        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+        Long.parseLong(cmd.getOptionValue('V')));
+    job.getConfiguration().setLong(
+        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+        Long.parseLong(cmd.getOptionValue('e')));
+
+    boolean isVerbose = false;
+    if (cmd.hasOption('v')) {
+      isVerbose = true;
+    }
+    if (job.run(isVerbose)) {
+      return 0;
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * Execute the benchmark.
+   *
+   * @param args Typically the command line arguments.
+   * @throws Exception Any exception from the computation.
+   */
+  public static void main(final String[] args) throws Exception {
+    System.exit(ToolRunner.run(new ShortestPathsBenchmark(), args));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
new file mode 100644
index 0000000..1a5128d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Default single-source shortest paths computation.
+ */
+public class ShortestPathsComputation {
+  /** Source id. */
+  public static final String SOURCE_ID = "ShortestPathsBenchmark.sourceId";
+  /** Default source id. */
+  public static final long SOURCE_ID_DEFAULT = 1;
+
+  /** Do not construct. */
+  private ShortestPathsComputation() { };
+
+  /**
+   * Is this vertex the source?
+   *
+   * @param vertex Candidate vertex
+   * @return Whether the vertex is the source.
+   */
+  private static boolean isSource(Vertex<LongWritable, DoubleWritable,
+      DoubleWritable, DoubleWritable> vertex) {
+    return vertex.getId().get() ==
+        vertex.getContext().getConfiguration().getLong(SOURCE_ID,
+            SOURCE_ID_DEFAULT);
+  }
+
+  /**
+   * Generic single-source shortest paths algorithm.
+   *
+   * @param vertex Vertex to run
+   * @param messages Incoming messages for vertex
+   */
+  public static void computeShortestPaths(
+      Vertex<LongWritable, DoubleWritable, DoubleWritable,
+          DoubleWritable> vertex,
+      Iterable<DoubleWritable> messages) {
+    if (vertex.getSuperstep() == 0) {
+      vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
+    }
+
+    double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
+    for (DoubleWritable message : messages) {
+      minDist = Math.min(minDist, message.get());
+    }
+
+    if (minDist < vertex.getValue().get()) {
+      vertex.setValue(new DoubleWritable(minDist));
+      for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+        double distance = minDist + edge.getValue().get();
+        vertex.sendMessage(edge.getTargetVertexId(),
+            new DoubleWritable(distance));
+      }
+    }
+
+    vertex.voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java
new file mode 100644
index 0000000..66743fc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of benchmarks for performance testing and optimization
+ */
+package org.apache.giraph.benchmark;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java
new file mode 100644
index 0000000..303ed06
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+/**
+ *  State of the BSP application
+ */
+public enum ApplicationState {
+  /** Shouldn't be seen, just an initial state */
+  UNKNOWN,
+  /** Start from a desired superstep */
+  START_SUPERSTEP,
+  /** Unrecoverable */
+  FAILED,
+  /** Successful completion */
+  FINISHED,
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
new file mode 100644
index 0000000..bce84b1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This InputFormat supports the BSP model by ensuring that the user specifies
+ * how many splits (number of mappers) should be started simultaneously.
+ * The number of splits depends on whether the master and worker processes are
+ * separate.  It is not meant to do any meaningful split of user-data.
+ */
+public class BspInputFormat extends InputFormat<Text, Text> {
+  /** Class Logger */
+  private static final Logger LOG = Logger.getLogger(BspInputFormat.class);
+
+  /**
+   * Get the correct number of mappers based on the configuration
+   *
+   * @param conf Configuration to determine the number of mappers
+   * @return Maximum number of tasks
+   */
+  public static int getMaxTasks(Configuration conf) {
+    int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
+    boolean splitMasterWorker =
+        conf.getBoolean(GiraphConstants.SPLIT_MASTER_WORKER,
+            GiraphConstants.SPLIT_MASTER_WORKER_DEFAULT);
+    int maxTasks = maxWorkers;
+    if (splitMasterWorker) {
+      int zkServers =
+          conf.getInt(GiraphConstants.ZOOKEEPER_SERVER_COUNT,
+              GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+      maxTasks += zkServers;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
+          ", split master/worker = " + splitMasterWorker +
+          ", total max tasks = " + maxTasks);
+    }
+    return maxTasks;
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context)
+    throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    int maxTasks = getMaxTasks(conf);
+    if (maxTasks <= 0) {
+      throw new InterruptedException(
+          "getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
+    }
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < maxTasks; ++i) {
+      inputSplitList.add(new BspInputSplit());
+    }
+    return inputSplitList;
+  }
+
+  @Override
+  public RecordReader<Text, Text>
+  createRecordReader(InputSplit split, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new BspRecordReader();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java
new file mode 100644
index 0000000..2258917
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This InputSplit will not give any ordering or location data.
+ * It is used internally by BspInputFormat (which determines
+ * how many tasks to run the application on).  Users should not use this
+ * directly.
+ */
+public class BspInputSplit extends InputSplit implements Writable {
+  /** Number of splits */
+  private int numSplits = -1;
+  /** Split index */
+  private int splitIndex = -1;
+
+  /**
+   * Reflection constructor.
+   */
+  public BspInputSplit() { }
+
+  /**
+   * Constructor used by {@link BspInputFormat}.
+   *
+   * @param splitIndex Index of this split.
+   * @param numSplits Total number of splits.
+   */
+  public BspInputSplit(int splitIndex, int numSplits) {
+    this.splitIndex = splitIndex;
+    this.numSplits = numSplits;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return 0;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return new String[]{};
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    splitIndex = in.readInt();
+    numSplits = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(splitIndex);
+    out.writeInt(numSplits);
+  }
+
+  /**
+   * Get the index of this split.
+   *
+   * @return Index of this split.
+   */
+  public int getSplitIndex() {
+    return splitIndex;
+  }
+
+  /**
+   * Get the number of splits for this application.
+   *
+   * @return Total number of splits.
+   */
+  public int getNumSplits() {
+    return numSplits;
+  }
+
+  @Override
+  public String toString() {
+    return "'" + getClass().getCanonicalName() +
+      ", index=" + getSplitIndex() + ", num=" + getNumSplits();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
new file mode 100644
index 0000000..9e43ca6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * This is for internal use only.  Allows the vertex output format routines
+ * to be called as if a normal Hadoop job.
+ */
+public class BspOutputFormat extends OutputFormat<Text, Text> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(BspOutputFormat.class);
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) ==
+        null) {
+      LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" +
+          " will not check anything");
+      return;
+    }
+    BspUtils.createVertexOutputFormat(context.getConfiguration()).
+      checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) ==
+        null) {
+      LOG.warn("getOutputCommitter: Returning " +
+          "ImmutableOutputCommiter (does nothing).");
+      return new ImmutableOutputCommitter();
+    }
+    return BspUtils.createVertexOutputFormat(context.getConfiguration()).
+        getOutputCommitter(context);
+  }
+
+  @Override
+  public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new BspRecordWriter();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java
new file mode 100644
index 0000000..5646018
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Only returns a single key-value pair so that the map() can run.
+ */
+class BspRecordReader extends RecordReader<Text, Text> {
+  /** Singular key object */
+  private static final Text ONLY_KEY = new Text("only key");
+  /** Single value object */
+  private static final Text ONLY_VALUE = new Text("only value");
+
+  /** Has the one record been seen? */
+  private boolean seenRecord = false;
+
+  @Override
+  public void close() throws IOException {
+    return;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return seenRecord ? 1f : 0f;
+  }
+
+  @Override
+  public Text getCurrentKey() throws IOException, InterruptedException {
+    return ONLY_KEY;
+  }
+
+  @Override
+  public Text getCurrentValue() throws IOException, InterruptedException {
+    return ONLY_VALUE;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!seenRecord) {
+      seenRecord = true;
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
new file mode 100644
index 0000000..c6bde1e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Used by {@link BspOutputFormat} since some versions of Hadoop
+ * require that a RecordWriter is returned from getRecordWriter.
+ * Does nothing, except insures that write is never called.
+ */
+public class BspRecordWriter extends RecordWriter<Text, Text> {
+
+  @Override
+  public void close(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    // Do nothing
+  }
+
+  @Override
+  public void write(Text key, Text value)
+    throws IOException, InterruptedException {
+    throw new IOException("write: Cannot write with " +
+        getClass().getName() +
+        ".  Should never be called");
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
new file mode 100644
index 0000000..e28ebe1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Basic service interface shared by both {@link CentralizedServiceMaster} and
+ * {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface CentralizedService<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+
+
+  /**
+   * Get the current global superstep of the application to work on.
+   *
+   * @return global superstep (begins at INPUT_SUPERSTEP)
+   */
+  long getSuperstep();
+
+  /**
+   * Get the restarted superstep
+   *
+   * @return -1 if not manually restarted, otherwise the superstep id
+   */
+  long getRestartedSuperstep();
+
+  /**
+   * Given a superstep, should it be checkpointed based on the
+   * checkpoint frequency?
+   *
+   * @param superstep superstep to check against frequency
+   * @return true if checkpoint frequency met or superstep is 1.
+   */
+  boolean checkpointFrequencyMet(long superstep);
+
+  /**
+   * Get list of workers
+   *
+   * @return List of workers
+   */
+  List<WorkerInfo> getWorkerInfoList();
+
+  /**
+   * Clean up the service (no calls may be issued after this)
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void cleanup() throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
new file mode 100644
index 0000000..a328737
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import org.apache.giraph.graph.MasterAggregatorHandler;
+import org.apache.giraph.graph.MasterInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+
+/**
+ * At most, there will be one active master at a time, but many threads can
+ * be trying to be the active master.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface CentralizedServiceMaster<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    CentralizedService<I, V, E, M> {
+  /**
+   * Setup (must be called prior to any other function)
+   */
+  void setup();
+
+  /**
+   * Become the master.
+   * @return true if became the master, false if the application is done.
+   */
+  boolean becomeMaster();
+
+  /**
+   * Get master information
+   *
+   * @return Master information
+   */
+  MasterInfo getMasterInfo();
+
+  /**
+   * Create the {@link InputSplit} objects from the index range based on the
+   * user-defined VertexInputFormat.  The {@link InputSplit} objects will
+   * processed by the workers later on during the INPUT_SUPERSTEP.
+   *
+   * @return Number of splits. Returns -1 on failure to create
+   *         valid input splits.
+   */
+  int createVertexInputSplits();
+
+  /**
+   * Create the {@link InputSplit} objects from the index range based on the
+   * user-defined EdgeInputFormat.  The {@link InputSplit} objects will
+   * processed by the workers later on during the INPUT_SUPERSTEP.
+   *
+   * @return Number of splits. Returns -1 on failure to create
+   *         valid input splits.
+   */
+  int createEdgeInputSplits();
+
+  /**
+   * Master coordinates the superstep
+   *
+   * @return State of the application as a result of this superstep
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  SuperstepState coordinateSuperstep()
+    throws KeeperException, InterruptedException;
+
+  /**
+   * Master can decide to restart from the last good checkpoint if a
+   * worker fails during a superstep.
+   *
+   * @param checkpoint Checkpoint to restart from
+   */
+  void restartFromCheckpoint(long checkpoint);
+
+  /**
+   * Get the last known good checkpoint
+   *
+   * @return Last good superstep number
+   * @throws IOException
+   */
+  long getLastGoodCheckpoint() throws IOException;
+
+  /**
+   * If the master decides that this job doesn't have the resources to
+   * continue, it can fail the job.  It can also designate what to do next.
+   * Typically this is mainly informative.
+   *
+   * @param state State of the application.
+   * @param applicationAttempt Attempt to start on
+   * @param desiredSuperstep Superstep to restart from (if applicable)
+   */
+  void setJobState(ApplicationState state,
+    long applicationAttempt,
+    long desiredSuperstep);
+
+  /**
+   * Get master aggregator handler
+   *
+   * @return Master aggregator handler
+   */
+  MasterAggregatorHandler getAggregatorHandler();
+
+  /**
+   * Superstep has finished.
+   */
+  void postSuperstep();
+
+  /**
+   * Application has finished.
+   */
+  void postApplication();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
new file mode 100644
index 0000000..f359bd4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.MasterInfo;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.graph.WorkerAggregatorHandler;
+import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GraphMapper;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.WorkerContext;
+
+/**
+ * All workers should have access to this centralized service to
+ * execute the following methods.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface CentralizedServiceWorker<I extends WritableComparable,
+  V extends Writable, E extends Writable, M extends Writable>
+  extends CentralizedService<I, V, E, M> {
+  /**
+   * Setup (must be called prior to any other function)
+   *
+   * @return Finished superstep stats for the input superstep
+   */
+  FinishedSuperstepStats setup();
+
+  /**
+   * Get the worker information
+   *
+   * @return Worker information
+   */
+  WorkerInfo getWorkerInfo();
+
+  /**
+   * Get the worker client (for instantiating WorkerClientRequestProcessor
+   * instances.
+   *
+   * @return Worker client
+   */
+  WorkerClient<I, V, E, M> getWorkerClient();
+
+  /**
+   * Get the worker context.
+   *
+   * @return worker's WorkerContext
+   */
+  WorkerContext getWorkerContext();
+
+  /**
+   * Get the partition store for this worker.
+   * The partitions contain the vertices for
+   * this worker and can be used to run compute() for the vertices or do
+   * checkpointing.
+   *
+   * @return The partition store for this worker.
+   */
+  PartitionStore<I, V, E, M> getPartitionStore();
+
+  /**
+   *  Both the vertices and the messages need to be checkpointed in order
+   *  for them to be used.  This is done after all messages have been
+   *  delivered, but prior to a superstep starting.
+   */
+  void storeCheckpoint() throws IOException;
+
+  /**
+   * Load the vertices, edges, messages from the beginning of a superstep.
+   * Will load the vertex partitions as designated by the master and set the
+   * appropriate superstep.
+   *
+   * @param superstep which checkpoint to use
+   * @return Graph-wide vertex and edge counts
+   * @throws IOException
+   */
+  VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
+
+  /**
+   * Take all steps prior to actually beginning the computation of a
+   * superstep.
+   *
+   * @param graphState Current graph state
+   * @return Collection of all the partition owners from the master for this
+   *         superstep.
+   */
+  Collection<? extends PartitionOwner> startSuperstep(
+      GraphState<I, V, E, M> graphState);
+
+  /**
+   * Worker is done with its portion of the superstep.  Report the
+   * worker level statistics after the computation.
+   *
+   * @param graphState Current graph state
+   * @param partitionStatsList All the partition stats for this worker
+   * @return Stats of the superstep completion
+   */
+  FinishedSuperstepStats finishSuperstep(
+      GraphState<I, V, E, M> graphState,
+      List<PartitionStats> partitionStatsList);
+
+  /**
+   * Get the partition that a vertex id would belong to.
+   *
+   * @param vertexId Id of the vertex that is used to find the correct
+   *        partition.
+   * @return Correct partition if exists on this worker, null otherwise.
+   */
+  Partition<I, V, E, M> getPartition(I vertexId);
+
+  /**
+   * Get the partition id that a vertex id would belong to.
+   *
+   * @param vertexId Vertex id
+   * @return Partition id
+   */
+  Integer getPartitionId(I vertexId);
+
+  /**
+   * Whether a partition with given id exists on this worker.
+   *
+   * @param partitionId Partition id
+   * @return True iff this worker has the specified partition
+   */
+  boolean hasPartition(Integer partitionId);
+
+  /**
+   * Every client will need to get a partition owner from a vertex id so that
+   * they know which worker to sent the request to.
+   *
+   * @param vertexId Vertex index to look for
+   * @return PartitionOnwer that should contain this vertex if it exists
+   */
+  PartitionOwner getVertexPartitionOwner(I vertexId);
+
+  /**
+   * Get all partition owners.
+   *
+   * @return Iterable through partition owners
+   */
+  Iterable<? extends PartitionOwner> getPartitionOwners();
+
+  /**
+   * Look up a vertex on a worker given its vertex index.
+   *
+   * @param vertexId Vertex index to look for
+   * @return Vertex if it exists on this worker.
+   */
+  Vertex<I, V, E, M> getVertex(I vertexId);
+
+  /**
+   * If desired by the user, vertex partitions are redistributed among
+   * workers according to the chosen WorkerGraphPartitioner.
+   *
+   * @param masterSetPartitionOwners Partition owner info passed from the
+   *        master.
+   */
+  void exchangeVertexPartitions(
+      Collection<? extends PartitionOwner> masterSetPartitionOwners);
+
+  /**
+   * Get master info
+   *
+   * @return Master info
+   */
+  MasterInfo getMasterInfo();
+
+  /**
+   * Get the GraphMapper that this service is using.  Vertices need to know
+   * this.
+   *
+   * @return BspMapper
+   */
+  GraphMapper<I, V, E, M> getGraphMapper();
+
+  /**
+   * Operations that will be called if there is a failure by a worker.
+   */
+  void failureCleanup();
+
+  /**
+   * Get server data
+   *
+   * @return Server data
+   */
+  ServerData<I, V, E, M> getServerData();
+
+  /**
+   * Get worker aggregator handler
+   *
+   * @return Worker aggregator handler
+   */
+  WorkerAggregatorHandler getAggregatorHandler();
+
+  /**
+   * Final preparation for superstep, called after startSuperstep and
+   * potential loading from checkpoint, right before the computation started
+   * TODO how to avoid this additional function
+   */
+  void prepareSuperstep();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java b/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
new file mode 100644
index 0000000..feb4041
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This output committer doesn't do anything, meant for the case
+ * where output isn't desired, or as a base for not using
+ * FileOutputCommitter.
+ */
+public class ImmutableOutputCommitter extends OutputCommitter {
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context)
+    throws IOException {
+    return false;
+  }
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+  }
+
+  @Override
+  /*if[HADOOP_NON_SECURE]
+  public void cleanupJob(JobContext jobContext) throws IOException {
+  }
+  else[HADOOP_NON_SECURE]*/
+  /*end[HADOOP_NON_SECURE]*/
+  public void commitJob(JobContext jobContext) throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
new file mode 100644
index 0000000..c384fbf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.bsp;
+
+/**
+ * State of a coordinated superstep
+ */
+public enum SuperstepState {
+  /** Nothing happened yet */
+  INITIAL,
+  /** A worker died during this superstep */
+  WORKER_FAILURE,
+  /** This superstep completed correctly */
+  THIS_SUPERSTEP_DONE,
+  /** All supersteps are complete */
+  ALL_SUPERSTEPS_DONE,
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java b/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java
new file mode 100644
index 0000000..b5e7dc3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of generic bulk synchronous processing objects.
+ */
+package org.apache.giraph.bsp;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
new file mode 100644
index 0000000..65f2079
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.graph.Aggregator;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+/**
+ * Interface for master to send messages to workers
+ */
+public interface MasterClient {
+  /**
+   * Make sure that all the connections to workers have been established.
+   */
+  void openConnections();
+
+  /**
+   * Sends aggregator to its owner
+   *
+   * @param aggregatorName Name of the aggregator
+   * @param aggregatorClass Class of the aggregator
+   * @param aggregatedValue Value of the aggregator
+   * @throws IOException
+   */
+  void sendAggregator(String aggregatorName,
+      Class<? extends Aggregator> aggregatorClass,
+      Writable aggregatedValue) throws IOException;
+
+  /**
+   * Flush aggregated values cache.
+   */
+  void finishSendingAggregatedValues() throws IOException;
+
+  /**
+   * Flush all outgoing messages.  This will synchronously ensure that all
+   * messages have been send and delivered prior to returning.
+   */
+  void flush();
+
+  /**
+   * Closes all connections.
+   */
+  void closeConnections();
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
new file mode 100644
index 0000000..de991f8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Interface for master to receive messages from workers
+ */
+public interface MasterServer {
+  /**
+   * Get server address
+   *
+   * @return Address used by this server
+   */
+  InetSocketAddress getMyAddress();
+
+  /**
+   * Shuts down.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java b/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java
new file mode 100644
index 0000000..7f7fa43
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.utils.ArrayListWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wrapper around {@link ArrayListWritable} that allows the message class to
+ * be set prior to calling readFields().
+ *
+ * @param <M> message type
+ */
+public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+  /** Defining a layout version for a serializable class. */
+  private static final long serialVersionUID = 100L;
+
+  /**
+   * Default constructor.
+   */
+  public MsgList() {
+    super();
+  }
+
+  /**
+   * Copy constructor.
+   *
+   * @param msgList List of messages for writing.
+   */
+  public MsgList(MsgList<M> msgList) {
+    super(msgList);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setClass() {
+    setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
+  }
+}


Mime
View raw message