Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 620B9D1F6 for ; Thu, 9 Aug 2012 21:44:28 +0000 (UTC) Received: (qmail 67009 invoked by uid 500); 9 Aug 2012 21:44:28 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 66983 invoked by uid 500); 9 Aug 2012 21:44:27 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 66973 invoked by uid 99); 9 Aug 2012 21:44:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Aug 2012 21:44:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Aug 2012 21:44:23 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id AF94023888FD; Thu, 9 Aug 2012 21:43:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1371498 [1/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/io/ src/main/java/org/apache/giraph/lib/ src/test/java/org/apache/giraph/ src/test/ja... Date: Thu, 09 Aug 2012 21:43:37 -0000 To: commits@giraph.apache.org From: jghoman@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120809214339.AF94023888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jghoman Date: Thu Aug 9 21:43:36 2012 New Revision: 1371498 URL: http://svn.apache.org/viewvc?rev=1371498&view=rev Log: GIRAPH-218: Consolidate all I/O Format classes under one roof in lib/ directory. Contributed by Eli Reisman. Added: giraph/trunk/src/main/java/org/apache/giraph/io/ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java giraph/trunk/src/test/java/org/apache/giraph/io/ giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java Removed: giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java giraph/trunk/src/main/java/org/apache/giraph/lib/ giraph/trunk/src/test/java/org/apache/giraph/lib/ Modified: giraph/trunk/CHANGELOG giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java Modified: giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1371498&r1=1371497&r2=1371498&view=diff ============================================================================== --- giraph/trunk/CHANGELOG (original) +++ giraph/trunk/CHANGELOG Thu Aug 9 21:43:36 2012 @@ -2,6 +2,9 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-218: Consolidate all I/O Format classes under one roof in lib/ directory. + (Eli Reisman via jghoman) + GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta) GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP by # of Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1371498&r1=1371497&r2=1371498&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Thu Aug 9 21:43:36 2012 @@ -26,6 +26,7 @@ import org.apache.commons.cli.PosixParse import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.io.PseudoRandomVertexInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1371498&r1=1371497&r2=1371498&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Thu Aug 9 21:43:36 2012 @@ -28,6 +28,7 @@ import org.apache.giraph.graph.DefaultMa import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.WorkerContext; +import org.apache.giraph.io.PseudoRandomVertexInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DoubleWritable; Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java?rev=1371498&r1=1371497&r2=1371498&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java Thu Aug 9 21:43:36 2012 @@ -27,6 +27,7 @@ import org.apache.giraph.examples.Minimu import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.io.PseudoRandomVertexInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1371498&r1=1371497&r2=1371498&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Thu Aug 9 21:43:36 2012 @@ -29,6 +29,8 @@ import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.WorkerContext; +import org.apache.giraph.io.GeneratedVertexInputFormat; +import org.apache.giraph.io.IdWithValueTextOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; @@ -212,7 +214,7 @@ public class SimpleCheckpointVertex exte GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName()); bspJob.setVertexClass(getClass()); bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class); - bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class); + bspJob.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class); bspJob.setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class); int minWorkers = Integer.parseInt(cmd.getOptionValue('w')); Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java?rev=1371498&r1=1371497&r2=1371498&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java Thu Aug 9 21:43:36 2012 @@ -24,7 +24,7 @@ import org.apache.hadoop.io.LongWritable /** * A simple use of the Identity Vertex for taking care of Long, Double, * Double, Double type Inputformat Good for use with - * lib.LongDoubleDoubleAdjacencyListVertexInputFormat + * io.LongDoubleDoubleAdjacencyListVertexInputFormat */ public abstract class SimpleLongDoubleDoubleDoubleIdentityVertex extends Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1371498&r1=1371497&r2=1371498&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Aug 9 21:43:36 2012 @@ -28,8 +28,9 @@ import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; import org.apache.giraph.graph.VertexWriter; import org.apache.giraph.graph.WorkerContext; -import org.apache.giraph.lib.TextVertexOutputFormat; -import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter; +import org.apache.giraph.io.GeneratedVertexInputFormat; +import org.apache.giraph.io.TextVertexOutputFormat; +import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; @@ -221,8 +222,8 @@ public class SimplePageRankVertex extend * Simple VertexInputFormat that supports {@link SimplePageRankVertex} */ public static class SimplePageRankVertexInputFormat extends - GeneratedVertexInputFormat { + GeneratedVertexInputFormat { @Override public VertexReader createVertexReader(InputSplit split, Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1371498&r1=1371497&r2=1371498&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Thu Aug 9 21:43:36 2012 @@ -23,8 +23,9 @@ import org.apache.giraph.graph.EdgeListV import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; import org.apache.giraph.graph.VertexWriter; -import org.apache.giraph.lib.TextVertexOutputFormat; -import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter; +import org.apache.giraph.io.GeneratedVertexInputFormat; +import org.apache.giraph.io.TextVertexOutputFormat; +import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -110,8 +111,8 @@ public class SimpleSuperstepVertex exten * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex} */ public static class SimpleSuperstepVertexInputFormat extends - GeneratedVertexInputFormat { + GeneratedVertexInputFormat { @Override public VertexReader createVertexReader(InputSplit split, TaskAttemptContext context) Added: giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io; + +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexWriter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * OutputFormat to write out the graph nodes as text, value-separated (by + * tabs, by default). With the default delimiter, a vertex is written out as: + * + * []+ + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ +@SuppressWarnings("rawtypes") +public class AdjacencyListTextVertexOutputFormat + extends TextVertexOutputFormat { + + /** + * Vertex writer associated wtih {@link AdjacencyListTextVertexOutputFormat}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + */ + static class AdjacencyListVertexWriter extends TextVertexWriter { + /** Split delimiter */ + public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; + /** Default split delimiter */ + public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; + /** Cached split delimeter */ + private String delimiter; + + /** + * Constructor with writer. + * + * @param recordWriter Record writer used for writing. + */ + public AdjacencyListVertexWriter(RecordWriter recordWriter) { + super(recordWriter); + } + + @Override + public void writeVertex(Vertex vertex) throws IOException, + InterruptedException { + if (delimiter == null) { + delimiter = getContext().getConfiguration() + .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + } + + StringBuffer sb = new StringBuffer(vertex.getId().toString()); + sb.append(delimiter); + sb.append(vertex.getValue()); + + for (Edge edge : vertex.getEdges()) { + sb.append(delimiter).append(edge.getTargetVertexId()); + sb.append(delimiter).append(edge.getValue()); + } + + getRecordWriter().write(new Text(sb.toString()), null); + } + } + + @Override + public VertexWriter createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new AdjacencyListVertexWriter + (textOutputFormat.getRecordWriter(context)); + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io; + +import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.TextVertexInputFormat.TextVertexReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.RecordReader; + +import com.google.common.collect.Maps; + +import java.io.IOException; +import java.util.Map; + +/** + * VertexReader that readers lines of text with vertices encoded as adjacency + * lists and converts each token to the correct type. For example, a graph + * with vertices as integers and values as doubles could be encoded as: + * 1 0.1 2 0.2 3 0.3 + * to represent a vertex named 1, with 0.1 as its value and two edges, to + * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message data + */ +@SuppressWarnings("rawtypes") +public abstract class AdjacencyListVertexReader extends + TextVertexInputFormat.TextVertexReader { + /** Delimiter for split */ + public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter"; + /** Default delimiter for split */ + public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; + /** Cached delimiter used for split */ + private String splitValue = null; + + /** + * Utility for doing any cleaning of each line before it is tokenized. + */ + public interface LineSanitizer { + /** + * Clean string s before attempting to tokenize it. + * + * @param s String to be cleaned. + * @return Sanitized string. + */ + String sanitize(String s); + } + + /** + * Sanitizer from constructor. + */ + private final LineSanitizer sanitizer; + + /** + * Constructor with line record reader. + * + * @param lineRecordReader Reader from {@link TextVertexReader}. + */ + public AdjacencyListVertexReader( + RecordReader lineRecordReader) { + super(lineRecordReader); + sanitizer = null; + } + + /** + * Constructor with line record reader. + * + * @param lineRecordReader Reader from {@link TextVertexReader}. + * @param sanitizer Sanitizer to be used. + */ + public AdjacencyListVertexReader( + RecordReader lineRecordReader, + LineSanitizer sanitizer) { + super(lineRecordReader); + this.sanitizer = sanitizer; + } + + /** + * Store the Id for this line in an instance of its correct type. + * + * @param s Id of vertex from line + * @param id Instance of Id's type, in which to store its value + */ + public abstract void decodeId(String s, I id); + + /** + * Store the value for this line in an instance of its correct type. + * @param s Value from line + * @param value Instance of value's type, in which to store its value + */ + public abstract void decodeValue(String s, V value); + + /** + * Store an edge from the line into an instance of a correctly typed Edge + * @param id The edge's id from the line + * @param value The edge's value from the line + * @param edge Instance of edge in which to store the id and value + */ + public abstract void decodeEdge(String id, String value, Edge edge); + + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + @Override + public Vertex getCurrentVertex() + throws IOException, InterruptedException { + Configuration conf = getContext().getConfiguration(); + String line = getRecordReader().getCurrentValue().toString(); + Vertex vertex = BspUtils.createVertex(conf); + + if (sanitizer != null) { + line = sanitizer.sanitize(line); + } + + if (splitValue == null) { + splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + } + + String [] values = line.split(splitValue); + + if ((values.length < 2) || (values.length % 2 != 0)) { + throw new IllegalArgumentException( + "Line did not split correctly: " + line); + } + + I vertexId = BspUtils.createVertexId(conf); + decodeId(values[0], vertexId); + + V value = BspUtils.createVertexValue(conf); + decodeValue(values[1], value); + + int i = 2; + Map edges = Maps.newHashMap(); + Edge edge = new Edge(); + while (i < values.length) { + decodeEdge(values[i], values[i + 1], edge); + edges.put(edge.getTargetVertexId(), edge.getValue()); + i += 2; + } + vertex.initialize(vertexId, value, edges, null); + return vertex; + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.bsp.BspInputSplit; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This VertexInputFormat is meant for testing/debugging. It simply generates + * some vertex data that can be consumed by test applications. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +@SuppressWarnings("rawtypes") +public abstract class GeneratedVertexInputFormat< + I extends WritableComparable, V extends Writable, E extends Writable, + M extends Writable> extends VertexInputFormat { + @Override + public List getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { + // This is meaningless, the VertexReader will generate all the test + // data. + List inputSplitList = new ArrayList(); + for (int i = 0; i < numWorkers; ++i) { + inputSplitList.add(new BspInputSplit(i, numWorkers)); + } + return inputSplitList; + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexWriter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Write out Vertices' IDs and values, but not their edges nor edges' values. + * This is a useful output format when the final value of the vertex is + * all that's needed. The boolean configuration parameter reverse.id.and.value + * allows reversing the output of id and value. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ +@SuppressWarnings("rawtypes") +public class IdWithValueTextOutputFormat + extends TextVertexOutputFormat { + + /** + * Vertex writer used with {@link IdWithValueTextOutputFormat}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + */ + static class IdWithValueVertexWriter extends TextVertexWriter { + /** Specify the output delimiter */ + public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; + /** Default output delimiter */ + public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; + /** Reverse id and value order? */ + public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value"; + /** Default is to not reverse id and value order. */ + public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false; + /** Saved delimiter */ + private String delimiter; + + /** + * Constructor with record writer. + * + * @param recordWriter Writer from LineRecordWriter. + */ + public IdWithValueVertexWriter(RecordWriter recordWriter) { + super(recordWriter); + } + + @Override + public void writeVertex(Vertex vertex) throws IOException, + InterruptedException { + if (delimiter == null) { + delimiter = getContext().getConfiguration() + .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + } + + String first; + String second; + boolean reverseOutput = getContext().getConfiguration() + .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT); + + if (reverseOutput) { + first = vertex.getValue().toString(); + second = vertex.getId().toString(); + } else { + first = vertex.getId().toString(); + second = vertex.getValue().toString(); + } + + Text line = new Text(first + delimiter + second); + + getRecordWriter().write(line, null); + } + } + + @Override + public VertexWriter createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new IdWithValueVertexWriter + (textOutputFormat.getRecordWriter(context)); + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.io.IOException; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for + * unweighted graphs with int ids. + * + * Each line consists of: vertex neighbor1 neighbor2 ... + */ +public class IntIntNullIntTextInputFormat extends + TextVertexInputFormat { + + @Override + public VertexReader + createVertexReader(InputSplit split, TaskAttemptContext context) + throws IOException { + return new IntIntNullIntVertexReader( + textInputFormat.createRecordReader(split, context)); + } + + /** + * Vertex reader associated with {@link IntIntNullIntTextInputFormat}. + */ + public static class IntIntNullIntVertexReader extends + TextVertexInputFormat.TextVertexReader { + /** Separator of the vertex and neighbors */ + private static final Pattern SEPARATOR = Pattern.compile("[\t ]"); + + /** + * Constructor with the line reader. + * + * @param lineReader Internal line reader. + */ + public IntIntNullIntVertexReader(RecordReader + lineReader) { + super(lineReader); + } + + @Override + public Vertex + getCurrentVertex() throws IOException, InterruptedException { + Vertex + vertex = BspUtils.createVertex(getContext().getConfiguration()); + + String[] tokens = SEPARATOR.split(getRecordReader() + .getCurrentValue().toString()); + Map edges = + Maps.newHashMapWithExpectedSize(tokens.length - 1); + for (int n = 1; n < tokens.length; n++) { + edges.put(new IntWritable(Integer.parseInt(tokens[n])), + NullWritable.get()); + } + + IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0])); + vertex.initialize(vertexId, vertexId, edges, + Lists.newArrayList()); + + return vertex; + } + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +/** + * Keeps the vertex keys for the input/output vertex format + */ +public class JsonBase64VertexFormat { + /** Vertex id key */ + public static final String VERTEX_ID_KEY = "vertexId"; + /** Vertex value key*/ + public static final String VERTEX_VALUE_KEY = "vertexValue"; + /** Edge value array key (all the edges are stored here) */ + public static final String EDGE_ARRAY_KEY = "edgeArray"; + + /** + * Don't construct. + */ + private JsonBase64VertexFormat() { } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import com.google.common.collect.Maps; + +import net.iharder.Base64; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Map; + +/** + * Simple way to represent the structure of the graph with a JSON object. + * The actual vertex ids, values, edges are stored by the + * Writable serialized bytes that are Byte64 encoded. + * Works with {@link JsonBase64VertexOutputFormat} + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message data + */ +@SuppressWarnings("rawtypes") +public class JsonBase64VertexInputFormat + extends TextVertexInputFormat { + /** + * Simple reader that supports {@link JsonBase64VertexInputFormat} + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message data + */ + private static class JsonBase64VertexReader + extends TextVertexReader { + /** + * Only constructor. Requires the LineRecordReader + * + * @param lineRecordReader Line record reader to read from + */ + public JsonBase64VertexReader( + RecordReader lineRecordReader) { + super(lineRecordReader); + } + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + + @Override + public Vertex getCurrentVertex() + throws IOException, InterruptedException { + Configuration conf = getContext().getConfiguration(); + Vertex vertex = BspUtils.createVertex(conf); + + Text line = getRecordReader().getCurrentValue(); + JSONObject vertexObject; + try { + vertexObject = new JSONObject(line.toString()); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get the vertex", e); + } + DataInput input = null; + byte[] decodedWritable = null; + I vertexId = null; + try { + decodedWritable = Base64.decode( + vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY)); + input = new DataInputStream( + new ByteArrayInputStream(decodedWritable)); + vertexId = BspUtils.createVertexId(conf); + vertexId.readFields(input); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get vertex id", e); + } + V vertexValue = null; + try { + decodedWritable = Base64.decode( + vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY)); + input = new DataInputStream( + new ByteArrayInputStream(decodedWritable)); + vertexValue = BspUtils.createVertexValue(conf); + vertexValue.readFields(input); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get vertex value", e); + } + JSONArray edgeArray = null; + try { + edgeArray = vertexObject.getJSONArray( + JsonBase64VertexFormat.EDGE_ARRAY_KEY); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get edge array", e); + } + Map edgeMap = Maps.newHashMap(); + for (int i = 0; i < edgeArray.length(); ++i) { + try { + decodedWritable = Base64.decode(edgeArray.getString(i)); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get edge value", e); + } + input = new DataInputStream( + new ByteArrayInputStream(decodedWritable)); + I targetVertexId = + BspUtils.createVertexId(getContext().getConfiguration()); + targetVertexId.readFields(input); + E edgeValue = + BspUtils.createEdgeValue(getContext().getConfiguration()); + edgeValue.readFields(input); + edgeMap.put(targetVertexId, edgeValue); + } + vertex.initialize(vertexId, vertexValue, edgeMap, null); + return vertex; + } + } + + @Override + public VertexReader createVertexReader( + InputSplit split, TaskAttemptContext context) throws IOException { + return new JsonBase64VertexReader( + textInputFormat.createRecordReader(split, context)); + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexWriter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import net.iharder.Base64; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Simple way to represent the structure of the graph with a JSON object. + * The actual vertex ids, values, edges are stored by the + * Writable serialized bytes that are Byte64 encoded. + * Works with {@link JsonBase64VertexInputFormat} + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ +@SuppressWarnings("rawtypes") +public class JsonBase64VertexOutputFormat extends + TextVertexOutputFormat { + /** + * Simple writer that supports {@link JsonBase64VertexOutputFormat} + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ + private static class JsonBase64VertexWriter extends + TextVertexWriter { + /** + * Only constructor. Requires the LineRecordWriter + * + * @param lineRecordWriter Line record writer to write to + */ + public JsonBase64VertexWriter( + RecordWriter lineRecordWriter) { + super(lineRecordWriter); + } + + @Override + public void writeVertex(Vertex vertex) + throws IOException, InterruptedException { + ByteArrayOutputStream outputStream = + new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + JSONObject vertexObject = new JSONObject(); + vertex.getId().write(output); + try { + vertexObject.put( + JsonBase64VertexFormat.VERTEX_ID_KEY, + Base64.encodeBytes(outputStream.toByteArray())); + } catch (JSONException e) { + throw new IllegalStateException( + "writerVertex: Failed to insert vertex id", e); + } + outputStream.reset(); + vertex.getValue().write(output); + try { + vertexObject.put( + JsonBase64VertexFormat.VERTEX_VALUE_KEY, + Base64.encodeBytes(outputStream.toByteArray())); + } catch (JSONException e) { + throw new IllegalStateException( + "writerVertex: Failed to insert vertex value", e); + } + JSONArray edgeArray = new JSONArray(); + for (Edge edge : vertex.getEdges()) { + outputStream.reset(); + edge.getTargetVertexId().write(output); + edge.getValue().write(output); + edgeArray.put(Base64.encodeBytes(outputStream.toByteArray())); + } + try { + vertexObject.put( + JsonBase64VertexFormat.EDGE_ARRAY_KEY, + edgeArray); + } catch (JSONException e) { + throw new IllegalStateException( + "writerVertex: Failed to insert edge array", e); + } + getRecordWriter().write(new Text(vertexObject.toString()), null); + } + } + + @Override + public VertexWriter createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new JsonBase64VertexWriter( + textOutputFormat.getRecordWriter(context)); + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io; + +import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.json.JSONArray; +import org.json.JSONException; + +import com.google.common.collect.Maps; + +import java.io.IOException; +import java.util.Map; + +/** + * VertexInputFormat that features long vertex ID's, + * double vertex values and float + * out-edge weights, and double message types, + * specified in JSON format. + */ +public class JsonLongDoubleFloatDoubleVertexInputFormat extends + TextVertexInputFormat { + + @Override + public VertexReader createVertexReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return new JsonLongDoubleFloatDoubleVertexReader( + textInputFormat.createRecordReader(split, context)); + } + + /** + * VertexReader that features double vertex + * values and float out-edge weights. The + * files should be in the following JSON format: + * JSONArray(, , + * JSONArray(JSONArray(, ), ...)) + * Here is an example with vertex id 1, vertex value 4.3, and two edges. + * First edge has a destination vertex 2, edge value 2.1. + * Second edge has a destination vertex 3, edge value 0.7. + * [1,4.3,[[2,2.1],[3,0.7]]] + */ + static class JsonLongDoubleFloatDoubleVertexReader extends + TextVertexReader { + + /** + * Constructor with the line record reader. + * + * @param lineRecordReader Will read from this line. + */ + public JsonLongDoubleFloatDoubleVertexReader( + RecordReader lineRecordReader) { + super(lineRecordReader); + } + + @Override + public Vertex getCurrentVertex() + throws IOException, InterruptedException { + Vertex + vertex = BspUtils.createVertex(getContext().getConfiguration()); + + Text line = getRecordReader().getCurrentValue(); + try { + JSONArray jsonVertex = new JSONArray(line.toString()); + LongWritable vertexId = new LongWritable(jsonVertex.getLong(0)); + DoubleWritable vertexValue = + new DoubleWritable(jsonVertex.getDouble(1)); + Map edges = Maps.newHashMap(); + JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2); + for (int i = 0; i < jsonEdgeArray.length(); ++i) { + JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i); + edges.put(new LongWritable(jsonEdge.getLong(0)), + new FloatWritable((float) jsonEdge.getDouble(1))); + } + vertex.initialize(vertexId, vertexValue, edges, null); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Couldn't get vertex from line " + line, e); + } + return vertex; + } + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexWriter; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.json.JSONArray; +import org.json.JSONException; + +import java.io.IOException; + +/** + * VertexOutputFormat that supports JSON encoded vertices featuring + * double values and float out-edge weights + */ +public class JsonLongDoubleFloatDoubleVertexOutputFormat extends + TextVertexOutputFormat { + @Override + public VertexWriter + createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + RecordWriter recordWriter = + textOutputFormat.getRecordWriter(context); + return new JsonLongDoubleFloatDoubleVertexWriter(recordWriter); + } + + /** + * VertexWriter that supports vertices with double + * values and float out-edge weights. + */ + static class JsonLongDoubleFloatDoubleVertexWriter extends + TextVertexWriter { + + /** + * Vertex writer with the internal line writer. + * + * @param lineRecordWriter Wil actually be written to. + */ + public JsonLongDoubleFloatDoubleVertexWriter( + RecordWriter lineRecordWriter) { + super(lineRecordWriter); + } + + @Override + public void writeVertex(Vertex vertex) throws IOException, InterruptedException { + JSONArray jsonVertex = new JSONArray(); + try { + jsonVertex.put(vertex.getId().get()); + jsonVertex.put(vertex.getValue().get()); + JSONArray jsonEdgeArray = new JSONArray(); + for (Edge edge : vertex.getEdges()) { + JSONArray jsonEdge = new JSONArray(); + jsonEdge.put(edge.getTargetVertexId().get()); + jsonEdge.put(edge.getValue().get()); + jsonEdgeArray.put(jsonEdge); + } + jsonVertex.put(jsonEdgeArray); + } catch (JSONException e) { + throw new IllegalArgumentException( + "writeVertex: Couldn't write vertex " + vertex); + } + getRecordWriter().write(new Text(jsonVertex.toString()), null); + } + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io; + +import org.apache.giraph.graph.Edge; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * InputFormat for reading graphs stored as (ordered) adjacency lists + * with the vertex ids longs and the vertex values and edges doubles. + * For example: + * 22 0.1 45 0.3 99 0.44 + * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99, + * with values of 0.3 and 0.44, respectively. + * + * @param Message data + */ +public class LongDoubleDoubleAdjacencyListVertexInputFormat + extends TextVertexInputFormat { + + /** + * VertexReader associated with + * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}. + * + * @param Message data. + */ + static class VertexReader extends + AdjacencyListVertexReader { + + /** + * Constructor with Line record reader. + * + * @param lineRecordReader Reader to internally use. + */ + VertexReader(RecordReader lineRecordReader) { + super(lineRecordReader); + } + + /** + * Constructor with Line record reader and sanitizer. + * + * @param lineRecordReader Reader to internally use. + * @param sanitizer Line sanitizer. + */ + VertexReader(RecordReader lineRecordReader, + LineSanitizer sanitizer) { + super(lineRecordReader, sanitizer); + } + + @Override + public void decodeId(String s, LongWritable id) { + id.set(Long.valueOf(s)); + } + + @Override + public void decodeValue(String s, DoubleWritable value) { + value.set(Double.valueOf(s)); + } + + @Override + public void decodeEdge( + String s1, + String s2, + Edge textIntWritableEdge) { + textIntWritableEdge.setTargetVertexId(new LongWritable(Long.valueOf(s1))); + textIntWritableEdge.setValue(new DoubleWritable(Double.valueOf(s2))); + } + } + + @Override + public org.apache.giraph.graph.VertexReader createVertexReader( + InputSplit split, + TaskAttemptContext context) throws IOException { + return new VertexReader(textInputFormat.createRecordReader( + split, context)); + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.bsp.BspInputSplit; +import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +import com.google.common.collect.Maps; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** + * This VertexInputFormat is meant for large scale testing. It allows the user + * to create an input data source that a variable number of aggregate vertices + * and edges per vertex that is repeatable for the exact same parameter + * (pseudo-random). + * + * @param Message data + */ +public class PseudoRandomVertexInputFormat extends + VertexInputFormat { + /** Set the number of aggregate vertices. */ + public static final String AGGREGATE_VERTICES = + "pseudoRandomVertexReader.aggregateVertices"; + /** Set the number of edges per vertex (pseudo-random destination). */ + public static final String EDGES_PER_VERTEX = + "pseudoRandomVertexReader.edgesPerVertex"; + + @Override + public final List getSplits(final JobContext context, + final int numWorkers) throws IOException, InterruptedException { + // This is meaningless, the PseudoRandomVertexReader will generate + // all the test data + List inputSplitList = new ArrayList(); + for (int i = 0; i < numWorkers; ++i) { + inputSplitList.add(new BspInputSplit(i, numWorkers)); + } + return inputSplitList; + } + + @Override + public VertexReader + createVertexReader(InputSplit split, TaskAttemptContext context) + throws IOException { + return new PseudoRandomVertexReader(); + } + + /** + * Used by {@link PseudoRandomVertexInputFormat} to read + * pseudo-randomly generated data. + */ + private static class PseudoRandomVertexReader implements + VertexReader { + /** Logger. */ + private static final Logger LOG = + Logger.getLogger(PseudoRandomVertexReader.class); + /** Starting vertex id. */ + private long startingVertexId = -1; + /** Vertices read so far. */ + private long verticesRead = 0; + /** Total vertices to read (on this split alone). */ + private long totalSplitVertices = -1; + /** Aggregate vertices (all input splits). */ + private long aggregateVertices = -1; + /** Edges per vertex. */ + private long edgesPerVertex = -1; + /** BspInputSplit (used only for index). */ + private BspInputSplit bspInputSplit; + /** Saved configuration */ + private Configuration configuration; + + /** + * Default constructor for reflection. + */ + public PseudoRandomVertexReader() { + } + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException { + configuration = context.getConfiguration(); + aggregateVertices = + configuration.getLong( + PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0); + if (aggregateVertices <= 0) { + throw new IllegalArgumentException( + PseudoRandomVertexInputFormat.AGGREGATE_VERTICES + " <= 0"); + } + if (inputSplit instanceof BspInputSplit) { + bspInputSplit = (BspInputSplit) inputSplit; + long extraVertices = + aggregateVertices % bspInputSplit.getNumSplits(); + totalSplitVertices = + aggregateVertices / bspInputSplit.getNumSplits(); + if (bspInputSplit.getSplitIndex() < extraVertices) { + ++totalSplitVertices; + } + startingVertexId = (bspInputSplit.getSplitIndex() * + (aggregateVertices / bspInputSplit.getNumSplits())) + + Math.min(bspInputSplit.getSplitIndex(), + extraVertices); + } else { + throw new IllegalArgumentException( + "initialize: Got " + inputSplit.getClass() + + " instead of " + BspInputSplit.class); + } + edgesPerVertex = configuration.getLong( + PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 0); + if (edgesPerVertex <= 0) { + throw new IllegalArgumentException( + PseudoRandomVertexInputFormat.EDGES_PER_VERTEX + " <= 0"); + } + } + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return totalSplitVertices > verticesRead; + } + + @Override + public Vertex + getCurrentVertex() throws IOException, InterruptedException { + Vertex + vertex = BspUtils.createVertex(configuration); + long vertexId = startingVertexId + verticesRead; + // Seed on the vertex id to keep the vertex data the same when + // on different number of workers, but other parameters are the + // same. + Random rand = new Random(vertexId); + DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble()); + Map edges = + Maps.newHashMapWithExpectedSize((int) edgesPerVertex); + for (long i = 0; i < edgesPerVertex; ++i) { + LongWritable destVertexId = null; + do { + destVertexId = + new LongWritable(Math.abs(rand.nextLong()) % + aggregateVertices); + } while (edges.containsKey(destVertexId)); + edges.put(destVertexId, new DoubleWritable(rand.nextDouble())); + } + vertex.initialize(new LongWritable(vertexId), vertexValue, edges, null); + ++verticesRead; + if (LOG.isDebugEnabled()) { + LOG.debug("next: Return vertexId=" + + vertex.getId().get() + + ", vertexValue=" + vertex.getValue() + + ", edges=" + vertex.getEdges()); + } + return vertex; + } + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException { + return verticesRead * 100.0f / totalSplitVertices; + } + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; + +import java.io.IOException; +import java.util.List; + +/** + * Sequence file vertex input format based on {@link SequenceFileInputFormat}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + * @param Value type + */ +@SuppressWarnings("rawtypes") +public class SequenceFileVertexInputFormat> + extends VertexInputFormat { + /** Internal input format */ + protected SequenceFileInputFormat sequenceFileInputFormat = + new SequenceFileInputFormat(); + + @Override + public List getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { + return sequenceFileInputFormat.getSplits(context); + } + + @Override + public VertexReader createVertexReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return new SequenceFileVertexReader( + sequenceFileInputFormat.createRecordReader(split, context)); + } + + /** + * Vertex reader used with {@link SequenceFileVertexInputFormat}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + * @param Value type + */ + public static class SequenceFileVertexReader> + implements VertexReader { + /** Internal record reader from {@link SequenceFileInputFormat} */ + private final RecordReader recordReader; + + /** + * Constructor with record reader. + * + * @param recordReader Reader from {@link SequenceFileInputFormat}. + */ + public SequenceFileVertexReader(RecordReader recordReader) { + this.recordReader = recordReader; + } + + @Override public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, InterruptedException { + recordReader.initialize(inputSplit, context); + } + + @Override public boolean nextVertex() throws IOException, + InterruptedException { + return recordReader.nextKeyValue(); + } + + @Override public Vertex getCurrentVertex() + throws IOException, InterruptedException { + return recordReader.getCurrentValue(); + } + + + @Override public void close() throws IOException { + recordReader.close(); + } + + @Override public float getProgress() throws IOException, + InterruptedException { + return recordReader.getProgress(); + } + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io; + +import org.apache.giraph.graph.Edge; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Class to read graphs stored as adjacency lists with ids represented by + * Strings and values as doubles. This is a good inputformat for reading + * graphs where the id types do not matter and can be stashed in a String. + * + * @param Message type. + */ +public class TextDoubleDoubleAdjacencyListVertexInputFormat + extends TextVertexInputFormat { + + + /** + * Vertex reader used with + * {@link TextDoubleDoubleAdjacencyListVertexInputFormat} + * + * @param Message type. + */ + static class VertexReader extends + AdjacencyListVertexReader { + /** + * Constructor without sanitzer. + * + * @param lineRecordReader Internal reader. + */ + VertexReader(RecordReader lineRecordReader) { + super(lineRecordReader); + } + + /** + * Constructor with {@link LineRecordReader} + * + * @param lineRecordReader Internal reader. + * @param sanitizer Sanitizer of the lines. + */ + VertexReader(RecordReader lineRecordReader, + LineSanitizer sanitizer) { + super(lineRecordReader, sanitizer); + } + + @Override + public void decodeId(String s, Text id) { + id.set(s); + } + + @Override + public void decodeValue(String s, DoubleWritable value) { + value.set(Double.valueOf(s)); + } + + @Override + public void decodeEdge(String s1, String s2, + Edge textIntWritableEdge) { + textIntWritableEdge.setTargetVertexId(new Text(s1)); + textIntWritableEdge.setValue(new DoubleWritable(Double.valueOf(s2))); + } + } + + @Override + public org.apache.giraph.graph.VertexReader createVertexReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return new VertexReader(textInputFormat.createRecordReader( + split, context)); + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +import java.io.IOException; +import java.util.List; + +/** + * Abstract class that users should subclass to use their own text based + * vertex output format. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public abstract class TextVertexInputFormat + extends VertexInputFormat { + /** Uses the TextInputFormat to do everything */ + protected TextInputFormat textInputFormat = new TextInputFormat(); + + /** + * Abstract class to be implemented by the user based on their specific + * vertex input. Easiest to ignore the key value separator and only use + * key instead. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ + public abstract static class TextVertexReader + implements VertexReader { + /** Internal line record reader */ + private final RecordReader lineRecordReader; + /** Context passed to initialize */ + private TaskAttemptContext context; + + /** + * Initialize with the LineRecordReader. + * + * @param lineRecordReader Line record reader from TextInputFormat + */ + public TextVertexReader( + RecordReader lineRecordReader) { + this.lineRecordReader = lineRecordReader; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + lineRecordReader.initialize(inputSplit, context); + this.context = context; + } + + @Override + public void close() throws IOException { + lineRecordReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return lineRecordReader.getProgress(); + } + + /** + * Get the line record reader. + * + * @return Record reader to be used for reading. + */ + protected RecordReader getRecordReader() { + return lineRecordReader; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + protected TaskAttemptContext getContext() { + return context; + } + } + + @Override + public List getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { + // Ignore the hint of numWorkers here since we are using TextInputFormat + // to do this for us + return textInputFormat.getSplits(context); + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import java.io.IOException; + +import org.apache.giraph.graph.VertexOutputFormat; +import org.apache.giraph.graph.VertexWriter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +/** + * Abstract class that users should subclass to use their own text based + * vertex output format. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ +@SuppressWarnings("rawtypes") +public abstract class TextVertexOutputFormat + extends VertexOutputFormat { + /** Uses the TextOutputFormat to do everything */ + protected TextOutputFormat textOutputFormat = + new TextOutputFormat(); + + /** + * Abstract class to be implemented by the user based on their specific + * vertex output. Easiest to ignore the key value separator and only use + * key instead. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ + public abstract static class TextVertexWriter implements VertexWriter { + /** Context passed to initialize */ + private TaskAttemptContext context; + /** Internal line record writer */ + private final RecordWriter lineRecordWriter; + + /** + * Initialize with the LineRecordWriter. + * + * @param lineRecordWriter Line record writer from TextOutputFormat + */ + public TextVertexWriter(RecordWriter lineRecordWriter) { + this.lineRecordWriter = lineRecordWriter; + } + + @Override + public void initialize(TaskAttemptContext context) throws IOException { + this.context = context; + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + lineRecordWriter.close(context); + } + + /** + * Get the line record writer. + * + * @return Record writer to be used for writing. + */ + public RecordWriter getRecordWriter() { + return lineRecordWriter; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + public TaskAttemptContext getContext() { + return context; + } + } + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + textOutputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return textOutputFormat.getOutputCommitter(context); + } +} Added: giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java?rev=1371498&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java Thu Aug 9 21:43:36 2012 @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of reusable library Giraph objects. + */ +package org.apache.giraph.io;