Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3D5E1D8AC for ; Thu, 20 Dec 2012 04:25:41 +0000 (UTC) Received: (qmail 54828 invoked by uid 500); 20 Dec 2012 04:25:39 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 54681 invoked by uid 500); 20 Dec 2012 04:25:38 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 53984 invoked by uid 99); 20 Dec 2012 04:25:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Dec 2012 04:25:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 675583247A9; Thu, 20 Dec 2012 04:25:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nitay@apache.org To: commits@giraph.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [3/51] [partial] GIRAPH-457: update module names (nitay) Message-Id: <20121220042530.675583247A9@tyr.zones.apache.org> Date: Thu, 20 Dec 2012 04:25:30 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java new file mode 100644 index 0000000..75e4d8f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.benchmark; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.examples.DoubleSumCombiner; +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.io.PseudoRandomEdgeInputFormat; +import org.apache.giraph.io.PseudoRandomVertexInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +/** + * Default Pregel-style PageRank computation. + */ +public class PageRankBenchmark implements Tool { + /** + * Class logger + */ + private static final Logger LOG = Logger.getLogger(PageRankBenchmark.class); + /** + * Configuration from Configurable + */ + private Configuration conf; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public final int run(final String[] args) throws Exception { + Options options = new Options(); + options.addOption("h", "help", false, "Help"); + options.addOption("v", "verbose", false, "Verbose"); + options.addOption("w", + "workers", + true, + "Number of workers"); + options.addOption("s", + "supersteps", + true, + "Supersteps to execute before finishing"); + options.addOption("V", + "aggregateVertices", + true, + "Aggregate vertices"); + options.addOption("e", + "edgesPerVertex", + true, + "Edges per vertex"); + options.addOption("c", + "vertexClass", + true, + "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex, " + + "2 for RepresentativeVertex, " + + "3 for RepresentativeVertex with unsafe, " + + "4 for HashMapVertex (using EdgeInputFormat), " + + "5 for MultiGraphEdgeListVertex (using EdgeInputFormat), " + + "6 for MultiGraphRepresentativeVertex (using " + + "EdgeInputFormat), " + + "7 for MultiGraphRepresentativeVertex with unsafe (using " + + "EdgeInputFormat))"); + options.addOption("N", + "name", + true, + "Name of the job"); + options.addOption("t", + "combinerType", + true, + "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)"); + + HelpFormatter formatter = new HelpFormatter(); + if (args.length == 0) { + formatter.printHelp(getClass().getName(), options, true); + return 0; + } + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args); + if (cmd.hasOption('h')) { + formatter.printHelp(getClass().getName(), options, true); + return 0; + } + if (!cmd.hasOption('w')) { + LOG.info("Need to choose the number of workers (-w)"); + return -1; + } + if (!cmd.hasOption('s')) { + LOG.info("Need to set the number of supersteps (-s)"); + return -1; + } + if (!cmd.hasOption('V')) { + LOG.info("Need to set the aggregate vertices (-V)"); + return -1; + } + if (!cmd.hasOption('e')) { + LOG.info("Need to set the number of edges " + + "per vertex (-e)"); + return -1; + } + + int workers = Integer.parseInt(cmd.getOptionValue('w')); + String name = getClass().getName(); + if (cmd.hasOption("N")) { + name = name + " " + cmd.getOptionValue("N"); + } + + GiraphJob job = new GiraphJob(getConf(), name); + GiraphConfiguration configuration = job.getConfiguration(); + setVertexAndInputFormatClasses(cmd, configuration); + configuration.setWorkerConfiguration(workers, workers, 100.0f); + configuration.setInt( + PageRankComputation.SUPERSTEP_COUNT, + Integer.parseInt(cmd.getOptionValue('s'))); + + boolean isVerbose = false; + if (cmd.hasOption('v')) { + isVerbose = true; + } + if (job.run(isVerbose)) { + return 0; + } else { + return -1; + } + } + + /** + * Set vertex class and input format class based on command-line arguments. + * + * @param cmd Command line arguments + * @param configuration Giraph job configuration + */ + protected void setVertexAndInputFormatClasses( + CommandLine cmd, GiraphConfiguration configuration) { + int vertexClassOption = cmd.hasOption('c') ? Integer.parseInt( + cmd.getOptionValue('c')) : 1; + if (vertexClassOption == 1) { + configuration.setVertexClass( + EdgeListVertexPageRankBenchmark.class); + } else if (vertexClassOption == 0 || vertexClassOption == 4) { + configuration.setVertexClass( + HashMapVertexPageRankBenchmark.class); + } else if (vertexClassOption == 2) { + configuration.setVertexClass( + RepresentativeVertexPageRankBenchmark.class); + configuration.useUnsafeSerialization(false); + } else if (vertexClassOption == 3) { + configuration.setVertexClass( + RepresentativeVertexPageRankBenchmark.class); + configuration.useUnsafeSerialization(true); + } else if (vertexClassOption == 5) { + configuration.setVertexClass( + MultiGraphEdgeListVertexPageRankBenchmark.class); + } else if (vertexClassOption == 6) { + configuration.setVertexClass( + MultiGraphRepresentativeVertexPageRankBenchmark.class); + configuration.useUnsafeSerialization(false); + } else if (vertexClassOption == 7) { + configuration.setVertexClass( + MultiGraphRepresentativeVertexPageRankBenchmark.class); + configuration.useUnsafeSerialization(true); + } + LOG.info("Using class " + + configuration.get(GiraphConstants.VERTEX_CLASS)); + if (!cmd.hasOption('t') || + (Integer.parseInt(cmd.getOptionValue('t')) == 2)) { + configuration.setVertexCombinerClass( + DoubleSumCombiner.class); + } else if (Integer.parseInt(cmd.getOptionValue('t')) == 1) { + configuration.setVertexCombinerClass( + DoubleSumCombiner.class); + } + if (vertexClassOption <= 3) { + configuration.setVertexInputFormatClass( + PseudoRandomVertexInputFormat.class); + configuration.setLong( + PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, + Long.parseLong(cmd.getOptionValue('V'))); + configuration.setLong( + PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, + Long.parseLong(cmd.getOptionValue('e'))); + } else { + configuration.setEdgeInputFormatClass( + PseudoRandomEdgeInputFormat.class); + configuration.setLong( + PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES, + Long.parseLong(cmd.getOptionValue('V'))); + configuration.setLong( + PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, + Long.parseLong(cmd.getOptionValue('e'))); + } + } + + /** + * Execute the benchmark. + * + * @param args Typically the command line arguments. + * @throws Exception Any exception from the computation. + */ + public static void main(final String[] args) throws Exception { + System.exit(ToolRunner.run(new PageRankBenchmark(), args)); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java new file mode 100644 index 0000000..d6a8cb7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.benchmark; + +import org.apache.giraph.graph.MutableVertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * Shared computation of class Pregel-style PageRank computation for benchmark + * classes. + */ +public class PageRankComputation { + /** Number of supersteps */ + public static final String SUPERSTEP_COUNT = + "PageRankBenchmark.superstepCount"; + + /** + * Do not construct. + */ + private PageRankComputation() { } + + /** + * Generic page rank algorithm. + * + * @param vertex Vertex to compute on. + * @param messages Iterator of messages from previous superstep. + */ + public static void computePageRank( + MutableVertex vertex, Iterable messages) { + if (vertex.getSuperstep() >= 1) { + double sum = 0; + for (DoubleWritable message : messages) { + sum += message.get(); + } + DoubleWritable vertexValue = new DoubleWritable( + (0.15f / vertex.getTotalNumVertices()) + 0.85f * sum); + vertex.setValue(vertexValue); + } + + if (vertex.getSuperstep() < vertex.getConf().getInt(SUPERSTEP_COUNT, -1)) { + long edges = vertex.getNumEdges(); + vertex.sendMessageToAllEdges( + new DoubleWritable(vertex.getValue().get() / edges)); + } else { + vertex.voteToHalt(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java new file mode 100644 index 0000000..604c4a9 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.benchmark; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.graph.DefaultMasterCompute; +import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.graph.WorkerContext; +import org.apache.giraph.io.PseudoRandomVertexInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +import java.util.Random; + +/** + * Random Message Benchmark for evaluating the messaging performance. + */ +public class RandomMessageBenchmark implements Tool { + /** How many supersteps to run */ + public static final String SUPERSTEP_COUNT = + "RandomMessageBenchmark.superstepCount"; + /** How many bytes per message */ + public static final String NUM_BYTES_PER_MESSAGE = + "RandomMessageBenchmark.numBytesPerMessage"; + /** Default bytes per message */ + public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16; + /** How many messages per edge */ + public static final String NUM_MESSAGES_PER_EDGE = + "RandomMessageBenchmark.numMessagesPerEdge"; + /** Default messages per edge */ + public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1; + /** All bytes sent during this superstep */ + public static final String AGG_SUPERSTEP_TOTAL_BYTES = + "superstep total bytes sent"; + /** All bytes sent during this application */ + public static final String AGG_TOTAL_BYTES = "total bytes sent"; + /** All messages during this superstep */ + public static final String AGG_SUPERSTEP_TOTAL_MESSAGES = + "superstep total messages"; + /** All messages during this application */ + public static final String AGG_TOTAL_MESSAGES = "total messages"; + /** All millis during this superstep */ + public static final String AGG_SUPERSTEP_TOTAL_MILLIS = + "superstep total millis"; + /** All millis during this application */ + public static final String AGG_TOTAL_MILLIS = "total millis"; + /** Workers for that superstep */ + public static final String WORKERS = "workers"; + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(RandomMessageBenchmarkWorkerContext.class); + /** Configuration from Configurable */ + private Configuration conf; + + /** + * {@link WorkerContext} forRandomMessageBenchmark. + */ + public static class RandomMessageBenchmarkWorkerContext extends + WorkerContext { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(RandomMessageBenchmarkWorkerContext.class); + /** Bytes to be sent */ + private byte[] messageBytes; + /** Number of messages sent per edge */ + private int numMessagesPerEdge = -1; + /** Number of supersteps */ + private int numSupersteps = -1; + /** Random generator for random bytes message */ + private final Random random = new Random(System.currentTimeMillis()); + /** Start superstep millis */ + private long startSuperstepMillis = 0; + /** Total bytes */ + private long totalBytes = 0; + /** Total messages */ + private long totalMessages = 0; + /** Total millis */ + private long totalMillis = 0; + + @Override + public void preApplication() + throws InstantiationException, IllegalAccessException { + messageBytes = + new byte[getContext().getConfiguration(). + getInt(NUM_BYTES_PER_MESSAGE, + DEFAULT_NUM_BYTES_PER_MESSAGE)]; + numMessagesPerEdge = + getContext().getConfiguration(). + getInt(NUM_MESSAGES_PER_EDGE, + DEFAULT_NUM_MESSAGES_PER_EDGE); + numSupersteps = getContext().getConfiguration(). + getInt(SUPERSTEP_COUNT, -1); + } + + @Override + public void preSuperstep() { + long superstepBytes = this. + getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get(); + long superstepMessages = this. + getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get(); + long superstepMillis = this. + getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get(); + long workers = this.getAggregatedValue(WORKERS).get(); + + // For timing and tracking the supersteps + // - superstep 0 starts the time, but cannot display any stats + // since nothing has been aggregated yet + // - supersteps > 0 can display the stats + if (getSuperstep() == 0) { + startSuperstepMillis = System.currentTimeMillis(); + } else { + totalBytes += superstepBytes; + totalMessages += superstepMessages; + totalMillis += superstepMillis; + double superstepMegabytesPerSecond = + superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis; + double megabytesPerSecond = totalBytes * + workers * 1000d / 1024d / 1024d / totalMillis; + double superstepMessagesPerSecond = + superstepMessages * workers * 1000d / superstepMillis; + double messagesPerSecond = + totalMessages * workers * 1000d / totalMillis; + if (LOG.isInfoEnabled()) { + LOG.info("Outputing statistics for superstep " + getSuperstep()); + LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes); + LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes); + LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages); + LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages); + LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis); + LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis); + LOG.info(WORKERS + " : " + workers); + LOG.info("Superstep megabytes / second = " + + superstepMegabytesPerSecond); + LOG.info("Total megabytes / second = " + + megabytesPerSecond); + LOG.info("Superstep messages / second = " + + superstepMessagesPerSecond); + LOG.info("Total messages / second = " + + messagesPerSecond); + LOG.info("Superstep megabytes / second / worker = " + + superstepMegabytesPerSecond / workers); + LOG.info("Total megabytes / second / worker = " + + megabytesPerSecond / workers); + LOG.info("Superstep messages / second / worker = " + + superstepMessagesPerSecond / workers); + LOG.info("Total messages / second / worker = " + + messagesPerSecond / workers); + } + } + + aggregate(WORKERS, new LongWritable(1)); + } + + @Override + public void postSuperstep() { + long endSuperstepMillis = System.currentTimeMillis(); + long superstepMillis = endSuperstepMillis - startSuperstepMillis; + startSuperstepMillis = endSuperstepMillis; + aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis)); + } + + @Override + public void postApplication() { } + + /** + * Get the message bytes to be used for sending. + * + * @return Byte array used for messages. + */ + public byte[] getMessageBytes() { + return messageBytes; + } + + /** + * Get the number of edges per message. + * + * @return Messages per edge. + */ + public int getNumMessagePerEdge() { + return numMessagesPerEdge; + } + + /** + * Get the number of supersteps. + * + * @return Number of supersteps. + */ + public int getNumSupersteps() { + return numSupersteps; + } + + /** + * Randomize the message bytes. + */ + public void randomizeMessageBytes() { + random.nextBytes(messageBytes); + } + } + + /** + * Master compute associated with {@link RandomMessageBenchmark}. + * It registers required aggregators. + */ + public static class RandomMessageBenchmarkMasterCompute extends + DefaultMasterCompute { + @Override + public void initialize() throws InstantiationException, + IllegalAccessException { + registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES, + LongSumAggregator.class); + registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES, + LongSumAggregator.class); + registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS, + LongSumAggregator.class); + registerAggregator(WORKERS, + LongSumAggregator.class); + } + } + + /** + * Actual message computation (messaging in this case) + */ + public static class RandomMessageVertex extends EdgeListVertex< + LongWritable, DoubleWritable, DoubleWritable, BytesWritable> { + @Override + public void compute(Iterable messages) { + RandomMessageBenchmarkWorkerContext workerContext = + (RandomMessageBenchmarkWorkerContext) getWorkerContext(); + if (getSuperstep() < workerContext.getNumSupersteps()) { + for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) { + workerContext.randomizeMessageBytes(); + sendMessageToAllEdges( + new BytesWritable(workerContext.getMessageBytes())); + long bytesSent = workerContext.getMessageBytes().length * + getNumEdges(); + aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent)); + aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES, + new LongWritable(getNumEdges())); + } + } else { + voteToHalt(); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + options.addOption("h", "help", false, "Help"); + options.addOption("v", "verbose", false, "Verbose"); + options.addOption("w", + "workers", + true, + "Number of workers"); + options.addOption("b", + "bytes", + true, + "Message bytes per memssage"); + options.addOption("n", + "number", + true, + "Number of messages per edge"); + options.addOption("s", + "supersteps", + true, + "Supersteps to execute before finishing"); + options.addOption("V", + "aggregateVertices", + true, + "Aggregate vertices"); + options.addOption("e", + "edgesPerVertex", + true, + "Edges per vertex"); + options.addOption("f", + "flusher", + true, + "Number of flush threads"); + + HelpFormatter formatter = new HelpFormatter(); + if (args.length == 0) { + formatter.printHelp(getClass().getName(), options, true); + return 0; + } + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args); + if (cmd.hasOption('h')) { + formatter.printHelp(getClass().getName(), options, true); + return 0; + } + if (!cmd.hasOption('w')) { + LOG.info("Need to choose the number of workers (-w)"); + return -1; + } + if (!cmd.hasOption('s')) { + LOG.info("Need to set the number of supersteps (-s)"); + return -1; + } + if (!cmd.hasOption('V')) { + LOG.info("Need to set the aggregate vertices (-V)"); + return -1; + } + if (!cmd.hasOption('e')) { + LOG.info("Need to set the number of edges " + + "per vertex (-e)"); + return -1; + } + if (!cmd.hasOption('b')) { + LOG.info("Need to set the number of message bytes (-b)"); + return -1; + } + if (!cmd.hasOption('n')) { + LOG.info("Need to set the number of messages per edge (-n)"); + return -1; + } + int workers = Integer.parseInt(cmd.getOptionValue('w')); + GiraphJob job = new GiraphJob(getConf(), getClass().getName()); + job.getConfiguration().setVertexClass(RandomMessageVertex.class); + job.getConfiguration().setVertexInputFormatClass( + PseudoRandomVertexInputFormat.class); + job.getConfiguration().setWorkerContextClass( + RandomMessageBenchmarkWorkerContext.class); + job.getConfiguration().setMasterComputeClass( + RandomMessageBenchmarkMasterCompute.class); + job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f); + job.getConfiguration().setLong( + PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, + Long.parseLong(cmd.getOptionValue('V'))); + job.getConfiguration().setLong( + PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, + Long.parseLong(cmd.getOptionValue('e'))); + job.getConfiguration().setInt( + SUPERSTEP_COUNT, + Integer.parseInt(cmd.getOptionValue('s'))); + job.getConfiguration().setInt( + RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE, + Integer.parseInt(cmd.getOptionValue('b'))); + job.getConfiguration().setInt( + RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE, + Integer.parseInt(cmd.getOptionValue('n'))); + + boolean isVerbose = false; + if (cmd.hasOption('v')) { + isVerbose = true; + } + if (cmd.hasOption('s')) { + getConf().setInt(SUPERSTEP_COUNT, + Integer.parseInt(cmd.getOptionValue('s'))); + } + if (cmd.hasOption('f')) { + job.getConfiguration().setInt( + GiraphConstants.MSG_NUM_FLUSH_THREADS, + Integer.parseInt(cmd.getOptionValue('f'))); + } + if (job.run(isVerbose)) { + return 0; + } else { + return -1; + } + } + + /** + * Execute the benchmark. + * + * @param args Typically, this is the command line arguments. + * @throws Exception Any exception thrown during computation. + */ + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(new RandomMessageBenchmark(), args)); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java new file mode 100644 index 0000000..f12f8ac --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.benchmark; + +import java.io.IOException; +import org.apache.giraph.graph.RepresentativeVertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * Same benchmark code as {@link PageRankBenchmark}, but uses + * {@link org.apache.giraph.graph.RepresentativeVertex} + * implementation. + */ +public class RepresentativeVertexPageRankBenchmark extends + RepresentativeVertex { + @Override + public void compute(Iterable messages) throws + IOException { + PageRankComputation.computePageRank(this, messages); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java new file mode 100644 index 0000000..21fc0ac --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.benchmark; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.examples.MinimumDoubleCombiner; +import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.io.PseudoRandomVertexInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +import java.io.IOException; + +/** + * Single-source shortest paths benchmark. + */ +public class ShortestPathsBenchmark implements Tool { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(ShortestPathsBenchmark.class); + /** Configuration */ + private Configuration conf; + + /** + * Vertex implementation + */ + public static class ShortestPathsBenchmarkVertex extends + EdgeListVertex { + @Override + public void compute(Iterable messages) throws IOException { + ShortestPathsComputation.computeShortestPaths(this, messages); + } + } + + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public final int run(final String[] args) throws Exception { + Options options = new Options(); + options.addOption("h", "help", false, "Help"); + options.addOption("v", "verbose", false, "Verbose"); + options.addOption("w", + "workers", + true, + "Number of workers"); + options.addOption("V", + "aggregateVertices", + true, + "Aggregate vertices"); + options.addOption("e", + "edgesPerVertex", + true, + "Edges per vertex"); + options.addOption("c", + "vertexClass", + true, + "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex)"); + options.addOption("nc", + "noCombiner", + false, + "Don't use a combiner"); + HelpFormatter formatter = new HelpFormatter(); + if (args.length == 0) { + formatter.printHelp(getClass().getName(), options, true); + return 0; + } + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args); + if (cmd.hasOption('h')) { + formatter.printHelp(getClass().getName(), options, true); + return 0; + } + if (!cmd.hasOption('w')) { + LOG.info("Need to choose the number of workers (-w)"); + return -1; + } + if (!cmd.hasOption('V')) { + LOG.info("Need to set the aggregate vertices (-V)"); + return -1; + } + if (!cmd.hasOption('e')) { + LOG.info("Need to set the number of edges " + + "per vertex (-e)"); + return -1; + } + + int workers = Integer.parseInt(cmd.getOptionValue('w')); + GiraphJob job = new GiraphJob(getConf(), getClass().getName()); + if (!cmd.hasOption('c') || + (Integer.parseInt(cmd.getOptionValue('c')) == 1)) { + job.getConfiguration().setVertexClass(ShortestPathsBenchmarkVertex.class); + } else { + job.getConfiguration().setVertexClass( + HashMapVertexShortestPathsBenchmark.class); + } + LOG.info("Using class " + + job.getConfiguration().get(GiraphConstants.VERTEX_CLASS)); + job.getConfiguration().setVertexInputFormatClass( + PseudoRandomVertexInputFormat.class); + if (!cmd.hasOption("nc")) { + job.getConfiguration().setVertexCombinerClass( + MinimumDoubleCombiner.class); + } + job.getConfiguration().setWorkerConfiguration(workers, workers, 100.0f); + job.getConfiguration().setLong( + PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, + Long.parseLong(cmd.getOptionValue('V'))); + job.getConfiguration().setLong( + PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, + Long.parseLong(cmd.getOptionValue('e'))); + + boolean isVerbose = false; + if (cmd.hasOption('v')) { + isVerbose = true; + } + if (job.run(isVerbose)) { + return 0; + } else { + return -1; + } + } + + /** + * Execute the benchmark. + * + * @param args Typically the command line arguments. + * @throws Exception Any exception from the computation. + */ + public static void main(final String[] args) throws Exception { + System.exit(ToolRunner.run(new ShortestPathsBenchmark(), args)); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java new file mode 100644 index 0000000..1a5128d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.benchmark; + +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * Default single-source shortest paths computation. + */ +public class ShortestPathsComputation { + /** Source id. */ + public static final String SOURCE_ID = "ShortestPathsBenchmark.sourceId"; + /** Default source id. */ + public static final long SOURCE_ID_DEFAULT = 1; + + /** Do not construct. */ + private ShortestPathsComputation() { }; + + /** + * Is this vertex the source? + * + * @param vertex Candidate vertex + * @return Whether the vertex is the source. + */ + private static boolean isSource(Vertex vertex) { + return vertex.getId().get() == + vertex.getContext().getConfiguration().getLong(SOURCE_ID, + SOURCE_ID_DEFAULT); + } + + /** + * Generic single-source shortest paths algorithm. + * + * @param vertex Vertex to run + * @param messages Incoming messages for vertex + */ + public static void computeShortestPaths( + Vertex vertex, + Iterable messages) { + if (vertex.getSuperstep() == 0) { + vertex.setValue(new DoubleWritable(Double.MAX_VALUE)); + } + + double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE; + for (DoubleWritable message : messages) { + minDist = Math.min(minDist, message.get()); + } + + if (minDist < vertex.getValue().get()) { + vertex.setValue(new DoubleWritable(minDist)); + for (Edge edge : vertex.getEdges()) { + double distance = minDist + edge.getValue().get(); + vertex.sendMessage(edge.getTargetVertexId(), + new DoubleWritable(distance)); + } + } + + vertex.voteToHalt(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java new file mode 100644 index 0000000..66743fc --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of benchmarks for performance testing and optimization + */ +package org.apache.giraph.benchmark; http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java new file mode 100644 index 0000000..303ed06 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/ApplicationState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +/** + * State of the BSP application + */ +public enum ApplicationState { + /** Shouldn't be seen, just an initial state */ + UNKNOWN, + /** Start from a desired superstep */ + START_SUPERSTEP, + /** Unrecoverable */ + FAILED, + /** Successful completion */ + FINISHED, +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java new file mode 100644 index 0000000..bce84b1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import org.apache.giraph.conf.GiraphConstants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This InputFormat supports the BSP model by ensuring that the user specifies + * how many splits (number of mappers) should be started simultaneously. + * The number of splits depends on whether the master and worker processes are + * separate. It is not meant to do any meaningful split of user-data. + */ +public class BspInputFormat extends InputFormat { + /** Class Logger */ + private static final Logger LOG = Logger.getLogger(BspInputFormat.class); + + /** + * Get the correct number of mappers based on the configuration + * + * @param conf Configuration to determine the number of mappers + * @return Maximum number of tasks + */ + public static int getMaxTasks(Configuration conf) { + int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0); + boolean splitMasterWorker = + conf.getBoolean(GiraphConstants.SPLIT_MASTER_WORKER, + GiraphConstants.SPLIT_MASTER_WORKER_DEFAULT); + int maxTasks = maxWorkers; + if (splitMasterWorker) { + int zkServers = + conf.getInt(GiraphConstants.ZOOKEEPER_SERVER_COUNT, + GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT); + maxTasks += zkServers; + } + if (LOG.isDebugEnabled()) { + LOG.debug("getMaxTasks: Max workers = " + maxWorkers + + ", split master/worker = " + splitMasterWorker + + ", total max tasks = " + maxTasks); + } + return maxTasks; + } + + @Override + public List getSplits(JobContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + int maxTasks = getMaxTasks(conf); + if (maxTasks <= 0) { + throw new InterruptedException( + "getSplits: Cannot have maxTasks <= 0 - " + maxTasks); + } + List inputSplitList = new ArrayList(); + for (int i = 0; i < maxTasks; ++i) { + inputSplitList.add(new BspInputSplit()); + } + return inputSplitList; + } + + @Override + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new BspRecordReader(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java new file mode 100644 index 0000000..2258917 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputSplit.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * This InputSplit will not give any ordering or location data. + * It is used internally by BspInputFormat (which determines + * how many tasks to run the application on). Users should not use this + * directly. + */ +public class BspInputSplit extends InputSplit implements Writable { + /** Number of splits */ + private int numSplits = -1; + /** Split index */ + private int splitIndex = -1; + + /** + * Reflection constructor. + */ + public BspInputSplit() { } + + /** + * Constructor used by {@link BspInputFormat}. + * + * @param splitIndex Index of this split. + * @param numSplits Total number of splits. + */ + public BspInputSplit(int splitIndex, int numSplits) { + this.splitIndex = splitIndex; + this.numSplits = numSplits; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[]{}; + } + + @Override + public void readFields(DataInput in) throws IOException { + splitIndex = in.readInt(); + numSplits = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(splitIndex); + out.writeInt(numSplits); + } + + /** + * Get the index of this split. + * + * @return Index of this split. + */ + public int getSplitIndex() { + return splitIndex; + } + + /** + * Get the number of splits for this application. + * + * @return Total number of splits. + */ + public int getNumSplits() { + return numSplits; + } + + @Override + public String toString() { + return "'" + getClass().getCanonicalName() + + ", index=" + getSplitIndex() + ", num=" + getNumSplits(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java new file mode 100644 index 0000000..9e43ca6 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import java.io.IOException; + +import org.apache.giraph.graph.BspUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +/** + * This is for internal use only. Allows the vertex output format routines + * to be called as if a normal Hadoop job. + */ +public class BspOutputFormat extends OutputFormat { + /** Class logger */ + private static Logger LOG = Logger.getLogger(BspOutputFormat.class); + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) == + null) { + LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" + + " will not check anything"); + return; + } + BspUtils.createVertexOutputFormat(context.getConfiguration()). + checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) == + null) { + LOG.warn("getOutputCommitter: Returning " + + "ImmutableOutputCommiter (does nothing)."); + return new ImmutableOutputCommitter(); + } + return BspUtils.createVertexOutputFormat(context.getConfiguration()). + getOutputCommitter(context); + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new BspRecordWriter(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java new file mode 100644 index 0000000..5646018 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordReader.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.io.Text; + +/** + * Only returns a single key-value pair so that the map() can run. + */ +class BspRecordReader extends RecordReader { + /** Singular key object */ + private static final Text ONLY_KEY = new Text("only key"); + /** Single value object */ + private static final Text ONLY_VALUE = new Text("only value"); + + /** Has the one record been seen? */ + private boolean seenRecord = false; + + @Override + public void close() throws IOException { + return; + } + + @Override + public float getProgress() throws IOException { + return seenRecord ? 1f : 0f; + } + + @Override + public Text getCurrentKey() throws IOException, InterruptedException { + return ONLY_KEY; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return ONLY_VALUE; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (!seenRecord) { + seenRecord = true; + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java new file mode 100644 index 0000000..c6bde1e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Used by {@link BspOutputFormat} since some versions of Hadoop + * require that a RecordWriter is returned from getRecordWriter. + * Does nothing, except insures that write is never called. + */ +public class BspRecordWriter extends RecordWriter { + + @Override + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + // Do nothing + } + + @Override + public void write(Text key, Text value) + throws IOException, InterruptedException { + throw new IOException("write: Cannot write with " + + getClass().getName() + + ". Should never be called"); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java new file mode 100644 index 0000000..e28ebe1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import org.apache.giraph.graph.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; +import java.util.List; + +/** + * Basic service interface shared by both {@link CentralizedServiceMaster} and + * {@link CentralizedServiceWorker}. + * + * @param Vertex id + * @param Vertex value + * @param Edge value + * @param Message data + */ +@SuppressWarnings("rawtypes") +public interface CentralizedService { + + + /** + * Get the current global superstep of the application to work on. + * + * @return global superstep (begins at INPUT_SUPERSTEP) + */ + long getSuperstep(); + + /** + * Get the restarted superstep + * + * @return -1 if not manually restarted, otherwise the superstep id + */ + long getRestartedSuperstep(); + + /** + * Given a superstep, should it be checkpointed based on the + * checkpoint frequency? + * + * @param superstep superstep to check against frequency + * @return true if checkpoint frequency met or superstep is 1. + */ + boolean checkpointFrequencyMet(long superstep); + + /** + * Get list of workers + * + * @return List of workers + */ + List getWorkerInfoList(); + + /** + * Clean up the service (no calls may be issued after this) + * + * @throws IOException + * @throws InterruptedException + */ + void cleanup() throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java new file mode 100644 index 0000000..a328737 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import org.apache.giraph.graph.MasterAggregatorHandler; +import org.apache.giraph.graph.MasterInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; + +/** + * At most, there will be one active master at a time, but many threads can + * be trying to be the active master. + * + * @param Vertex id + * @param Vertex value + * @param Edge value + * @param Message data + */ +@SuppressWarnings("rawtypes") +public interface CentralizedServiceMaster extends + CentralizedService { + /** + * Setup (must be called prior to any other function) + */ + void setup(); + + /** + * Become the master. + * @return true if became the master, false if the application is done. + */ + boolean becomeMaster(); + + /** + * Get master information + * + * @return Master information + */ + MasterInfo getMasterInfo(); + + /** + * Create the {@link InputSplit} objects from the index range based on the + * user-defined VertexInputFormat. The {@link InputSplit} objects will + * processed by the workers later on during the INPUT_SUPERSTEP. + * + * @return Number of splits. Returns -1 on failure to create + * valid input splits. + */ + int createVertexInputSplits(); + + /** + * Create the {@link InputSplit} objects from the index range based on the + * user-defined EdgeInputFormat. The {@link InputSplit} objects will + * processed by the workers later on during the INPUT_SUPERSTEP. + * + * @return Number of splits. Returns -1 on failure to create + * valid input splits. + */ + int createEdgeInputSplits(); + + /** + * Master coordinates the superstep + * + * @return State of the application as a result of this superstep + * @throws InterruptedException + * @throws KeeperException + */ + SuperstepState coordinateSuperstep() + throws KeeperException, InterruptedException; + + /** + * Master can decide to restart from the last good checkpoint if a + * worker fails during a superstep. + * + * @param checkpoint Checkpoint to restart from + */ + void restartFromCheckpoint(long checkpoint); + + /** + * Get the last known good checkpoint + * + * @return Last good superstep number + * @throws IOException + */ + long getLastGoodCheckpoint() throws IOException; + + /** + * If the master decides that this job doesn't have the resources to + * continue, it can fail the job. It can also designate what to do next. + * Typically this is mainly informative. + * + * @param state State of the application. + * @param applicationAttempt Attempt to start on + * @param desiredSuperstep Superstep to restart from (if applicable) + */ + void setJobState(ApplicationState state, + long applicationAttempt, + long desiredSuperstep); + + /** + * Get master aggregator handler + * + * @return Master aggregator handler + */ + MasterAggregatorHandler getAggregatorHandler(); + + /** + * Superstep has finished. + */ + void postSuperstep(); + + /** + * Application has finished. + */ + void postApplication(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java new file mode 100644 index 0000000..f359bd4 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.giraph.comm.ServerData; +import org.apache.giraph.comm.WorkerClient; +import org.apache.giraph.graph.FinishedSuperstepStats; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.MasterInfo; +import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.graph.WorkerAggregatorHandler; +import org.apache.giraph.graph.partition.PartitionStore; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.GraphMapper; +import org.apache.giraph.graph.partition.Partition; +import org.apache.giraph.graph.partition.PartitionOwner; +import org.apache.giraph.graph.partition.PartitionStats; +import org.apache.giraph.graph.WorkerInfo; +import org.apache.giraph.graph.WorkerContext; + +/** + * All workers should have access to this centralized service to + * execute the following methods. + * + * @param Vertex id + * @param Vertex value + * @param Edge value + * @param Message data + */ +@SuppressWarnings("rawtypes") +public interface CentralizedServiceWorker + extends CentralizedService { + /** + * Setup (must be called prior to any other function) + * + * @return Finished superstep stats for the input superstep + */ + FinishedSuperstepStats setup(); + + /** + * Get the worker information + * + * @return Worker information + */ + WorkerInfo getWorkerInfo(); + + /** + * Get the worker client (for instantiating WorkerClientRequestProcessor + * instances. + * + * @return Worker client + */ + WorkerClient getWorkerClient(); + + /** + * Get the worker context. + * + * @return worker's WorkerContext + */ + WorkerContext getWorkerContext(); + + /** + * Get the partition store for this worker. + * The partitions contain the vertices for + * this worker and can be used to run compute() for the vertices or do + * checkpointing. + * + * @return The partition store for this worker. + */ + PartitionStore getPartitionStore(); + + /** + * Both the vertices and the messages need to be checkpointed in order + * for them to be used. This is done after all messages have been + * delivered, but prior to a superstep starting. + */ + void storeCheckpoint() throws IOException; + + /** + * Load the vertices, edges, messages from the beginning of a superstep. + * Will load the vertex partitions as designated by the master and set the + * appropriate superstep. + * + * @param superstep which checkpoint to use + * @return Graph-wide vertex and edge counts + * @throws IOException + */ + VertexEdgeCount loadCheckpoint(long superstep) throws IOException; + + /** + * Take all steps prior to actually beginning the computation of a + * superstep. + * + * @param graphState Current graph state + * @return Collection of all the partition owners from the master for this + * superstep. + */ + Collection startSuperstep( + GraphState graphState); + + /** + * Worker is done with its portion of the superstep. Report the + * worker level statistics after the computation. + * + * @param graphState Current graph state + * @param partitionStatsList All the partition stats for this worker + * @return Stats of the superstep completion + */ + FinishedSuperstepStats finishSuperstep( + GraphState graphState, + List partitionStatsList); + + /** + * Get the partition that a vertex id would belong to. + * + * @param vertexId Id of the vertex that is used to find the correct + * partition. + * @return Correct partition if exists on this worker, null otherwise. + */ + Partition getPartition(I vertexId); + + /** + * Get the partition id that a vertex id would belong to. + * + * @param vertexId Vertex id + * @return Partition id + */ + Integer getPartitionId(I vertexId); + + /** + * Whether a partition with given id exists on this worker. + * + * @param partitionId Partition id + * @return True iff this worker has the specified partition + */ + boolean hasPartition(Integer partitionId); + + /** + * Every client will need to get a partition owner from a vertex id so that + * they know which worker to sent the request to. + * + * @param vertexId Vertex index to look for + * @return PartitionOnwer that should contain this vertex if it exists + */ + PartitionOwner getVertexPartitionOwner(I vertexId); + + /** + * Get all partition owners. + * + * @return Iterable through partition owners + */ + Iterable getPartitionOwners(); + + /** + * Look up a vertex on a worker given its vertex index. + * + * @param vertexId Vertex index to look for + * @return Vertex if it exists on this worker. + */ + Vertex getVertex(I vertexId); + + /** + * If desired by the user, vertex partitions are redistributed among + * workers according to the chosen WorkerGraphPartitioner. + * + * @param masterSetPartitionOwners Partition owner info passed from the + * master. + */ + void exchangeVertexPartitions( + Collection masterSetPartitionOwners); + + /** + * Get master info + * + * @return Master info + */ + MasterInfo getMasterInfo(); + + /** + * Get the GraphMapper that this service is using. Vertices need to know + * this. + * + * @return BspMapper + */ + GraphMapper getGraphMapper(); + + /** + * Operations that will be called if there is a failure by a worker. + */ + void failureCleanup(); + + /** + * Get server data + * + * @return Server data + */ + ServerData getServerData(); + + /** + * Get worker aggregator handler + * + * @return Worker aggregator handler + */ + WorkerAggregatorHandler getAggregatorHandler(); + + /** + * Final preparation for superstep, called after startSuperstep and + * potential loading from checkpoint, right before the computation started + * TODO how to avoid this additional function + */ + void prepareSuperstep(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java b/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java new file mode 100644 index 0000000..feb4041 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This output committer doesn't do anything, meant for the case + * where output isn't desired, or as a base for not using + * FileOutputCommitter. + */ +public class ImmutableOutputCommitter extends OutputCommitter { + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + return false; + } + + @Override + public void setupJob(JobContext context) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + } + + @Override + /*if[HADOOP_NON_SECURE] + public void cleanupJob(JobContext jobContext) throws IOException { + } + else[HADOOP_NON_SECURE]*/ + /*end[HADOOP_NON_SECURE]*/ + public void commitJob(JobContext jobContext) throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java new file mode 100644 index 0000000..c384fbf --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +/** + * State of a coordinated superstep + */ +public enum SuperstepState { + /** Nothing happened yet */ + INITIAL, + /** A worker died during this superstep */ + WORKER_FAILURE, + /** This superstep completed correctly */ + THIS_SUPERSTEP_DONE, + /** All supersteps are complete */ + ALL_SUPERSTEPS_DONE, +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java b/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java new file mode 100644 index 0000000..b5e7dc3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of generic bulk synchronous processing objects. + */ +package org.apache.giraph.bsp; http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java new file mode 100644 index 0000000..65f2079 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import org.apache.giraph.graph.Aggregator; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; + +/** + * Interface for master to send messages to workers + */ +public interface MasterClient { + /** + * Make sure that all the connections to workers have been established. + */ + void openConnections(); + + /** + * Sends aggregator to its owner + * + * @param aggregatorName Name of the aggregator + * @param aggregatorClass Class of the aggregator + * @param aggregatedValue Value of the aggregator + * @throws IOException + */ + void sendAggregator(String aggregatorName, + Class aggregatorClass, + Writable aggregatedValue) throws IOException; + + /** + * Flush aggregated values cache. + */ + void finishSendingAggregatedValues() throws IOException; + + /** + * Flush all outgoing messages. This will synchronously ensure that all + * messages have been send and delivered prior to returning. + */ + void flush(); + + /** + * Closes all connections. + */ + void closeConnections(); +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java new file mode 100644 index 0000000..de991f8 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import java.net.InetSocketAddress; + +/** + * Interface for master to receive messages from workers + */ +public interface MasterServer { + /** + * Get server address + * + * @return Address used by this server + */ + InetSocketAddress getMyAddress(); + + /** + * Shuts down. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java b/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java new file mode 100644 index 0000000..7f7fa43 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/MsgList.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.utils.ArrayListWritable; +import org.apache.hadoop.io.Writable; + +/** + * Wrapper around {@link ArrayListWritable} that allows the message class to + * be set prior to calling readFields(). + * + * @param message type + */ +public class MsgList extends ArrayListWritable { + /** Defining a layout version for a serializable class. */ + private static final long serialVersionUID = 100L; + + /** + * Default constructor. + */ + public MsgList() { + super(); + } + + /** + * Copy constructor. + * + * @param msgList List of messages for writing. + */ + public MsgList(MsgList msgList) { + super(msgList); + } + + @SuppressWarnings("unchecked") + @Override + public void setClass() { + setClass((Class) BspUtils.getMessageValueClass(getConf())); + } +}