giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgho...@apache.org
Subject svn commit: r1371498 [1/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/io/ src/main/java/org/apache/giraph/lib/ src/test/java/org/apache/giraph/ src/test/ja...
Date Thu, 09 Aug 2012 21:43:37 GMT
Author: jghoman
Date: Thu Aug  9 21:43:36 2012
New Revision: 1371498

URL: http://svn.apache.org/viewvc?rev=1371498&view=rev
Log:
GIRAPH-218: Consolidate all I/O Format classes under one roof in lib/ directory.  Contributed by Eli Reisman.

Added:
    giraph/trunk/src/main/java/org/apache/giraph/io/
    giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java
    giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java
    giraph/trunk/src/test/java/org/apache/giraph/io/
    giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
Removed:
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/lib/
    giraph/trunk/src/test/java/org/apache/giraph/lib/
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
    giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
    giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
    giraph/trunk/src/test/java/org/apache/giraph/TestVertexTypes.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1371498&r1=1371497&r2=1371498&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Aug  9 21:43:36 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-218: Consolidate all I/O Format classes under one roof in lib/ directory.
+  (Eli Reisman via jghoman)
+
   GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)
 
   GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP by # of 

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1371498&r1=1371497&r2=1371498&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Thu Aug  9 21:43:36 2012
@@ -26,6 +26,7 @@ import org.apache.commons.cli.PosixParse
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1371498&r1=1371497&r2=1371498&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Thu Aug  9 21:43:36 2012
@@ -28,6 +28,7 @@ import org.apache.giraph.graph.DefaultMa
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DoubleWritable;

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java?rev=1371498&r1=1371497&r2=1371498&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java Thu Aug  9 21:43:36 2012
@@ -27,6 +27,7 @@ import org.apache.giraph.examples.Minimu
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1371498&r1=1371497&r2=1371498&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Thu Aug  9 21:43:36 2012
@@ -29,6 +29,8 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.io.GeneratedVertexInputFormat;
+import org.apache.giraph.io.IdWithValueTextOutputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.FloatWritable;
@@ -212,7 +214,7 @@ public class SimpleCheckpointVertex exte
     GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
     bspJob.setVertexClass(getClass());
     bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
-    bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
+    bspJob.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
     bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
     bspJob.setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class);
     int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java?rev=1371498&r1=1371497&r2=1371498&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java Thu Aug  9 21:43:36 2012
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.LongWritable
 /**
  * A simple use of the Identity Vertex for taking care of Long, Double,
  * Double, Double type Inputformat Good for use with
- * lib.LongDoubleDoubleAdjacencyListVertexInputFormat
+ * io.LongDoubleDoubleAdjacencyListVertexInputFormat
  */
 
 public abstract class SimpleLongDoubleDoubleDoubleIdentityVertex extends

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1371498&r1=1371497&r2=1371498&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Aug  9 21:43:36 2012
@@ -28,8 +28,9 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.graph.WorkerContext;
-import org.apache.giraph.lib.TextVertexOutputFormat;
-import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter;
+import org.apache.giraph.io.GeneratedVertexInputFormat;
+import org.apache.giraph.io.TextVertexOutputFormat;
+import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -221,8 +222,8 @@ public class SimplePageRankVertex extend
    * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
    */
   public static class SimplePageRankVertexInputFormat extends
-      GeneratedVertexInputFormat<LongWritable,
-        DoubleWritable, FloatWritable, DoubleWritable> {
+    GeneratedVertexInputFormat<LongWritable,
+            DoubleWritable, FloatWritable, DoubleWritable> {
     @Override
     public VertexReader<LongWritable, DoubleWritable,
     FloatWritable, DoubleWritable> createVertexReader(InputSplit split,

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1371498&r1=1371497&r2=1371498&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Thu Aug  9 21:43:36 2012
@@ -23,8 +23,9 @@ import org.apache.giraph.graph.EdgeListV
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.graph.VertexWriter;
-import org.apache.giraph.lib.TextVertexOutputFormat;
-import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter;
+import org.apache.giraph.io.GeneratedVertexInputFormat;
+import org.apache.giraph.io.TextVertexOutputFormat;
+import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -110,8 +111,8 @@ public class SimpleSuperstepVertex exten
    * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex}
    */
   public static class SimpleSuperstepVertexInputFormat extends
-      GeneratedVertexInputFormat<LongWritable,
-      IntWritable, FloatWritable, IntWritable> {
+    GeneratedVertexInputFormat<LongWritable,
+          IntWritable, FloatWritable, IntWritable> {
     @Override
     public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
     createVertexReader(InputSplit split, TaskAttemptContext context)

Added: giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * OutputFormat to write out the graph nodes as text, value-separated (by
+ * tabs, by default).  With the default delimiter, a vertex is written out as:
+ *
+ * <VertexId><tab><Vertex Value><tab>[<EdgeId><tab><EdgeValue>]+
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextVertexOutputFormat<I, V, E> {
+
+  /**
+   * Vertex writer associated wtih {@link AdjacencyListTextVertexOutputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   */
+  static class AdjacencyListVertexWriter<I extends WritableComparable, V extends
+      Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
+    /** Split delimiter */
+    public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+    /** Default split delimiter */
+    public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+    /** Cached split delimeter */
+    private String delimiter;
+
+    /**
+     * Constructor with writer.
+     *
+     * @param recordWriter Record writer used for writing.
+     */
+    public AdjacencyListVertexWriter(RecordWriter<Text, Text> recordWriter) {
+      super(recordWriter);
+    }
+
+    @Override
+    public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+    InterruptedException {
+      if (delimiter == null) {
+        delimiter = getContext().getConfiguration()
+            .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+      }
+
+      StringBuffer sb = new StringBuffer(vertex.getId().toString());
+      sb.append(delimiter);
+      sb.append(vertex.getValue());
+
+      for (Edge<I, E> edge : vertex.getEdges()) {
+        sb.append(delimiter).append(edge.getTargetVertexId());
+        sb.append(delimiter).append(edge.getValue());
+      }
+
+      getRecordWriter().write(new Text(sb.toString()), null);
+    }
+  }
+
+  @Override
+  public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new AdjacencyListVertexWriter<I, V, E>
+    (textOutputFormat.getRecordWriter(context));
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListVertexReader.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.TextVertexInputFormat.TextVertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * VertexReader that readers lines of text with vertices encoded as adjacency
+ * lists and converts each token to the correct type.  For example, a graph
+ * with vertices as integers and values as doubles could be encoded as:
+ *   1 0.1 2 0.2 3 0.3
+ * to represent a vertex named 1, with 0.1 as its value and two edges, to
+ * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class AdjacencyListVertexReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    TextVertexInputFormat.TextVertexReader<I, V, E, M> {
+  /** Delimiter for split */
+  public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
+  /** Default delimiter for split */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+  /** Cached delimiter used for split */
+  private String splitValue = null;
+
+  /**
+   * Utility for doing any cleaning of each line before it is tokenized.
+   */
+  public interface LineSanitizer {
+    /**
+     * Clean string s before attempting to tokenize it.
+     *
+     * @param s String to be cleaned.
+     * @return Sanitized string.
+     */
+    String sanitize(String s);
+  }
+
+  /**
+   * Sanitizer from constructor.
+   */
+  private final LineSanitizer sanitizer;
+
+  /**
+   * Constructor with line record reader.
+   *
+   * @param lineRecordReader Reader from {@link TextVertexReader}.
+   */
+  public AdjacencyListVertexReader(
+      RecordReader<LongWritable, Text> lineRecordReader) {
+    super(lineRecordReader);
+    sanitizer = null;
+  }
+
+  /**
+   * Constructor with line record reader.
+   *
+   * @param lineRecordReader Reader from {@link TextVertexReader}.
+   * @param sanitizer Sanitizer to be used.
+   */
+  public AdjacencyListVertexReader(
+      RecordReader<LongWritable, Text> lineRecordReader,
+      LineSanitizer sanitizer) {
+    super(lineRecordReader);
+    this.sanitizer = sanitizer;
+  }
+
+  /**
+   * Store the Id for this line in an instance of its correct type.
+   *
+   * @param s Id of vertex from line
+   * @param id Instance of Id's type, in which to store its value
+   */
+  public abstract void decodeId(String s, I id);
+
+  /**
+   * Store the value for this line in an instance of its correct type.
+   * @param s Value from line
+   * @param value Instance of value's type, in which to store its value
+   */
+  public abstract void decodeValue(String s, V value);
+
+  /**
+   * Store an edge from the line into an instance of a correctly typed Edge
+   * @param id The edge's id from the line
+   * @param value The edge's value from the line
+   * @param edge Instance of edge in which to store the id and value
+   */
+  public abstract void decodeEdge(String id, String value, Edge<I, E> edge);
+
+
+  @Override
+  public boolean nextVertex() throws IOException, InterruptedException {
+    return getRecordReader().nextKeyValue();
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getCurrentVertex()
+    throws IOException, InterruptedException {
+    Configuration conf = getContext().getConfiguration();
+    String line = getRecordReader().getCurrentValue().toString();
+    Vertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+
+    if (sanitizer != null) {
+      line = sanitizer.sanitize(line);
+    }
+
+    if (splitValue == null) {
+      splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+    }
+
+    String [] values = line.split(splitValue);
+
+    if ((values.length < 2) || (values.length % 2 != 0)) {
+      throw new IllegalArgumentException(
+        "Line did not split correctly: " + line);
+    }
+
+    I vertexId = BspUtils.<I>createVertexId(conf);
+    decodeId(values[0], vertexId);
+
+    V value = BspUtils.<V>createVertexValue(conf);
+    decodeValue(values[1], value);
+
+    int i = 2;
+    Map<I, E> edges = Maps.newHashMap();
+    Edge<I, E> edge = new Edge<I, E>();
+    while (i < values.length) {
+      decodeEdge(values[i], values[i + 1], edge);
+      edges.put(edge.getTargetVertexId(), edge.getValue());
+      i += 2;
+    }
+    vertex.initialize(vertexId, value, edges, null);
+    return vertex;
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This VertexInputFormat is meant for testing/debugging.  It simply generates
+ * some vertex data that can be consumed by test applications.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexInputFormat<
+    I extends WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> extends VertexInputFormat<I, V, E, M> {
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // This is meaningless, the VertexReader will generate all the test
+    // data.
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < numWorkers; ++i) {
+      inputSplitList.add(new BspInputSplit(i, numWorkers));
+    }
+    return inputSplitList;
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Write out Vertices' IDs and values, but not their edges nor edges' values.
+ * This is a useful output format when the final value of the vertex is
+ * all that's needed. The boolean configuration parameter reverse.id.and.value
+ * allows reversing the output of id and value.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class IdWithValueTextOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextVertexOutputFormat<I, V, E> {
+
+  /**
+   * Vertex writer used with {@link IdWithValueTextOutputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   */
+  static class IdWithValueVertexWriter<I extends WritableComparable, V extends
+      Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
+    /** Specify the output delimiter */
+    public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+    /** Default output delimiter */
+    public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+    /** Reverse id and value order? */
+    public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
+    /** Default is to not reverse id and value order. */
+    public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
+    /** Saved delimiter */
+    private String delimiter;
+
+    /**
+     * Constructor with record writer.
+     *
+     * @param recordWriter Writer from LineRecordWriter.
+     */
+    public IdWithValueVertexWriter(RecordWriter<Text, Text> recordWriter) {
+      super(recordWriter);
+    }
+
+    @Override
+    public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+    InterruptedException {
+      if (delimiter == null) {
+        delimiter = getContext().getConfiguration()
+            .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+      }
+
+      String first;
+      String second;
+      boolean reverseOutput = getContext().getConfiguration()
+          .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
+
+      if (reverseOutput) {
+        first = vertex.getValue().toString();
+        second = vertex.getId().toString();
+      } else {
+        first = vertex.getId().toString();
+        second = vertex.getValue().toString();
+      }
+
+      Text line = new Text(first + delimiter + second);
+
+      getRecordWriter().write(line, null);
+    }
+  }
+
+  @Override
+  public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new IdWithValueVertexWriter<I, V, E>
+    (textOutputFormat.getRecordWriter(context));
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: vertex neighbor1 neighbor2 ...
+ */
+public class IntIntNullIntTextInputFormat extends
+    TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
+    IntWritable> {
+
+  @Override
+  public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
+  createVertexReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    return new IntIntNullIntVertexReader(
+        textInputFormat.createRecordReader(split, context));
+  }
+
+  /**
+   * Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
+   */
+  public static class IntIntNullIntVertexReader extends
+      TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
+      NullWritable, IntWritable> {
+    /** Separator of the vertex and neighbors */
+    private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+    /**
+     * Constructor with the line reader.
+     *
+     * @param lineReader Internal line reader.
+     */
+    public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
+    lineReader) {
+      super(lineReader);
+    }
+
+    @Override
+    public Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
+    getCurrentVertex() throws IOException, InterruptedException {
+      Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
+      vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
+      IntWritable>createVertex(getContext().getConfiguration());
+
+      String[] tokens = SEPARATOR.split(getRecordReader()
+          .getCurrentValue().toString());
+      Map<IntWritable, NullWritable> edges =
+          Maps.newHashMapWithExpectedSize(tokens.length - 1);
+      for (int n = 1; n < tokens.length; n++) {
+        edges.put(new IntWritable(Integer.parseInt(tokens[n])),
+            NullWritable.get());
+      }
+
+      IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0]));
+      vertex.initialize(vertexId, vertexId, edges,
+          Lists.<IntWritable>newArrayList());
+
+      return vertex;
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+/**
+ * Keeps the vertex keys for the input/output vertex format
+ */
+public class JsonBase64VertexFormat {
+  /** Vertex id key */
+  public static final String VERTEX_ID_KEY = "vertexId";
+  /** Vertex value key*/
+  public static final String VERTEX_VALUE_KEY = "vertexValue";
+  /** Edge value array key (all the edges are stored here) */
+  public static final String EDGE_ARRAY_KEY = "edgeArray";
+
+  /**
+   * Don't construct.
+   */
+  private JsonBase64VertexFormat() { }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Maps;
+
+import net.iharder.Base64;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Simple way to represent the structure of the graph with a JSON object.
+ * The actual vertex ids, values, edges are stored by the
+ * Writable serialized bytes that are Byte64 encoded.
+ * Works with {@link JsonBase64VertexOutputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class JsonBase64VertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends TextVertexInputFormat<I, V, E, M> {
+  /**
+   * Simple reader that supports {@link JsonBase64VertexInputFormat}
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   * @param <M> Message data
+   */
+  private static class JsonBase64VertexReader<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      extends TextVertexReader<I, V, E, M> {
+    /**
+     * Only constructor.  Requires the LineRecordReader
+     *
+     * @param lineRecordReader Line record reader to read from
+     */
+    public JsonBase64VertexReader(
+        RecordReader<LongWritable, Text> lineRecordReader) {
+      super(lineRecordReader);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    @Override
+    public Vertex<I, V, E, M> getCurrentVertex()
+      throws IOException, InterruptedException {
+      Configuration conf = getContext().getConfiguration();
+      Vertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+
+      Text line = getRecordReader().getCurrentValue();
+      JSONObject vertexObject;
+      try {
+        vertexObject = new JSONObject(line.toString());
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get the vertex", e);
+      }
+      DataInput input = null;
+      byte[] decodedWritable = null;
+      I vertexId = null;
+      try {
+        decodedWritable = Base64.decode(
+          vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
+        input = new DataInputStream(
+          new ByteArrayInputStream(decodedWritable));
+        vertexId = BspUtils.<I>createVertexId(conf);
+        vertexId.readFields(input);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get vertex id", e);
+      }
+      V vertexValue = null;
+      try {
+        decodedWritable = Base64.decode(
+          vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
+        input = new DataInputStream(
+          new ByteArrayInputStream(decodedWritable));
+        vertexValue = BspUtils.<V>createVertexValue(conf);
+        vertexValue.readFields(input);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get vertex value", e);
+      }
+      JSONArray edgeArray = null;
+      try {
+        edgeArray = vertexObject.getJSONArray(
+          JsonBase64VertexFormat.EDGE_ARRAY_KEY);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get edge array", e);
+      }
+      Map<I, E> edgeMap = Maps.newHashMap();
+      for (int i = 0; i < edgeArray.length(); ++i) {
+        try {
+          decodedWritable = Base64.decode(edgeArray.getString(i));
+        } catch (JSONException e) {
+          throw new IllegalArgumentException(
+            "next: Failed to get edge value", e);
+        }
+        input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        I targetVertexId =
+            BspUtils.<I>createVertexId(getContext().getConfiguration());
+        targetVertexId.readFields(input);
+        E edgeValue =
+            BspUtils.<E>createEdgeValue(getContext().getConfiguration());
+        edgeValue.readFields(input);
+        edgeMap.put(targetVertexId, edgeValue);
+      }
+      vertex.initialize(vertexId, vertexValue, edgeMap, null);
+      return vertex;
+    }
+  }
+
+  @Override
+  public VertexReader<I, V, E, M> createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new JsonBase64VertexReader<I, V, E, M>(
+      textInputFormat.createRecordReader(split, context));
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import net.iharder.Base64;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Simple way to represent the structure of the graph with a JSON object.
+ * The actual vertex ids, values, edges are stored by the
+ * Writable serialized bytes that are Byte64 encoded.
+ * Works with {@link JsonBase64VertexInputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class JsonBase64VertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    TextVertexOutputFormat<I, V, E> {
+  /**
+   * Simple writer that supports {@link JsonBase64VertexOutputFormat}
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   */
+  private static class JsonBase64VertexWriter<I extends WritableComparable,
+      V extends Writable, E extends Writable> extends
+      TextVertexWriter<I, V, E> {
+    /**
+     * Only constructor.  Requires the LineRecordWriter
+     *
+     * @param lineRecordWriter Line record writer to write to
+     */
+    public JsonBase64VertexWriter(
+        RecordWriter<Text, Text> lineRecordWriter) {
+      super(lineRecordWriter);
+    }
+
+    @Override
+    public void writeVertex(Vertex<I, V, E, ?> vertex)
+      throws IOException, InterruptedException {
+      ByteArrayOutputStream outputStream =
+          new ByteArrayOutputStream();
+      DataOutput output = new DataOutputStream(outputStream);
+      JSONObject vertexObject = new JSONObject();
+      vertex.getId().write(output);
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.VERTEX_ID_KEY,
+          Base64.encodeBytes(outputStream.toByteArray()));
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert vertex id", e);
+      }
+      outputStream.reset();
+      vertex.getValue().write(output);
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.VERTEX_VALUE_KEY,
+          Base64.encodeBytes(outputStream.toByteArray()));
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert vertex value", e);
+      }
+      JSONArray edgeArray = new JSONArray();
+      for (Edge<I, E> edge : vertex.getEdges()) {
+        outputStream.reset();
+        edge.getTargetVertexId().write(output);
+        edge.getValue().write(output);
+        edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
+      }
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.EDGE_ARRAY_KEY,
+          edgeArray);
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert edge array", e);
+      }
+      getRecordWriter().write(new Text(vertexObject.toString()), null);
+    }
+  }
+
+  @Override
+  public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new JsonBase64VertexWriter<I, V, E>(
+        textOutputFormat.getRecordWriter(context));
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+  * VertexInputFormat that features <code>long</code> vertex ID's,
+  * <code>double</code> vertex values and <code>float</code>
+  * out-edge weights, and <code>double</code> message types,
+  *  specified in JSON format.
+  */
+public class JsonLongDoubleFloatDoubleVertexInputFormat extends
+  TextVertexInputFormat<LongWritable, DoubleWritable,
+  FloatWritable, DoubleWritable> {
+
+  @Override
+  public VertexReader<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> createVertexReader(InputSplit split,
+    TaskAttemptContext context) throws IOException {
+    return new JsonLongDoubleFloatDoubleVertexReader(
+      textInputFormat.createRecordReader(split, context));
+  }
+
+ /**
+  * VertexReader that features <code>double</code> vertex
+  * values and <code>float</code> out-edge weights. The
+  * files should be in the following JSON format:
+  * JSONArray(<vertex id>, <vertex value>,
+  *   JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
+  * Here is an example with vertex id 1, vertex value 4.3, and two edges.
+  * First edge has a destination vertex 2, edge value 2.1.
+  * Second edge has a destination vertex 3, edge value 0.7.
+  * [1,4.3,[[2,2.1],[3,0.7]]]
+  */
+  static class JsonLongDoubleFloatDoubleVertexReader extends
+    TextVertexReader<LongWritable, DoubleWritable,
+    FloatWritable, DoubleWritable> {
+
+  /**
+    * Constructor with the line record reader.
+    *
+    * @param lineRecordReader Will read from this line.
+    */
+    public JsonLongDoubleFloatDoubleVertexReader(
+      RecordReader<LongWritable, Text> lineRecordReader) {
+      super(lineRecordReader);
+    }
+
+    @Override
+    public Vertex<LongWritable, DoubleWritable, FloatWritable,
+          DoubleWritable> getCurrentVertex()
+      throws IOException, InterruptedException {
+      Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+          vertex = BspUtils.<LongWritable, DoubleWritable, FloatWritable,
+          DoubleWritable>createVertex(getContext().getConfiguration());
+
+      Text line = getRecordReader().getCurrentValue();
+      try {
+        JSONArray jsonVertex = new JSONArray(line.toString());
+        LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
+        DoubleWritable vertexValue =
+          new DoubleWritable(jsonVertex.getDouble(1));
+        Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
+        JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
+        for (int i = 0; i < jsonEdgeArray.length(); ++i) {
+          JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
+          edges.put(new LongWritable(jsonEdge.getLong(0)),
+            new FloatWritable((float) jsonEdge.getDouble(1)));
+        }
+        vertex.initialize(vertexId, vertexValue, edges, null);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Couldn't get vertex from line " + line, e);
+      }
+      return vertex;
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import java.io.IOException;
+
+/**
+ * VertexOutputFormat that supports JSON encoded vertices featuring
+ * <code>double</code> values and <code>float</code> out-edge weights
+ */
+public class JsonLongDoubleFloatDoubleVertexOutputFormat extends
+  TextVertexOutputFormat<LongWritable, DoubleWritable,
+  FloatWritable> {
+  @Override
+  public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+  createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    RecordWriter<Text, Text> recordWriter =
+      textOutputFormat.getRecordWriter(context);
+    return new JsonLongDoubleFloatDoubleVertexWriter(recordWriter);
+  }
+
+ /**
+  * VertexWriter that supports vertices with <code>double</code>
+  * values and <code>float</code> out-edge weights.
+  */
+  static class JsonLongDoubleFloatDoubleVertexWriter extends
+    TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
+
+   /**
+    * Vertex writer with the internal line writer.
+    *
+    * @param lineRecordWriter Wil actually be written to.
+    */
+    public JsonLongDoubleFloatDoubleVertexWriter(
+      RecordWriter<Text, Text> lineRecordWriter) {
+      super(lineRecordWriter);
+    }
+
+    @Override
+    public void writeVertex(Vertex<LongWritable, DoubleWritable,
+          FloatWritable, ?> vertex) throws IOException, InterruptedException {
+      JSONArray jsonVertex = new JSONArray();
+      try {
+        jsonVertex.put(vertex.getId().get());
+        jsonVertex.put(vertex.getValue().get());
+        JSONArray jsonEdgeArray = new JSONArray();
+        for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+          JSONArray jsonEdge = new JSONArray();
+          jsonEdge.put(edge.getTargetVertexId().get());
+          jsonEdge.put(edge.getValue().get());
+          jsonEdgeArray.put(jsonEdge);
+        }
+        jsonVertex.put(jsonEdgeArray);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "writeVertex: Couldn't write vertex " + vertex);
+      }
+      getRecordWriter().write(new Text(jsonVertex.toString()), null);
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * InputFormat for reading graphs stored as (ordered) adjacency lists
+ * with the vertex ids longs and the vertex values and edges doubles.
+ * For example:
+ * 22 0.1 45 0.3 99 0.44
+ * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
+ * with values of 0.3 and 0.44, respectively.
+ *
+ * @param <M> Message data
+ */
+public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+    extends TextVertexInputFormat<LongWritable, DoubleWritable,
+    DoubleWritable, M> {
+
+  /**
+   * VertexReader associated with
+   * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}.
+   *
+   * @param <M> Message data.
+   */
+  static class VertexReader<M extends Writable> extends
+      AdjacencyListVertexReader<LongWritable, DoubleWritable,
+      DoubleWritable, M> {
+
+    /**
+     * Constructor with Line record reader.
+     *
+     * @param lineRecordReader Reader to internally use.
+     */
+    VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+      super(lineRecordReader);
+    }
+
+    /**
+     * Constructor with Line record reader and sanitizer.
+     *
+     * @param lineRecordReader Reader to internally use.
+     * @param sanitizer Line sanitizer.
+     */
+    VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
+        LineSanitizer sanitizer) {
+      super(lineRecordReader, sanitizer);
+    }
+
+    @Override
+    public void decodeId(String s, LongWritable id) {
+      id.set(Long.valueOf(s));
+    }
+
+    @Override
+    public void decodeValue(String s, DoubleWritable value) {
+      value.set(Double.valueOf(s));
+    }
+
+    @Override
+    public void decodeEdge(
+        String s1,
+        String s2,
+        Edge<LongWritable, DoubleWritable> textIntWritableEdge) {
+      textIntWritableEdge.setTargetVertexId(new LongWritable(Long.valueOf(s1)));
+      textIntWritableEdge.setValue(new DoubleWritable(Double.valueOf(s2)));
+    }
+  }
+
+  @Override
+  public org.apache.giraph.graph.VertexReader<LongWritable,
+  DoubleWritable, DoubleWritable, M> createVertexReader(
+      InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return new VertexReader<M>(textInputFormat.createRecordReader(
+        split, context));
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * This VertexInputFormat is meant for large scale testing.  It allows the user
+ * to create an input data source that a variable number of aggregate vertices
+ * and edges per vertex that is repeatable for the exact same parameter
+ * (pseudo-random).
+ *
+ * @param <M> Message data
+ */
+public class PseudoRandomVertexInputFormat<M extends Writable> extends
+    VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> {
+  /** Set the number of aggregate vertices. */
+  public static final String AGGREGATE_VERTICES =
+      "pseudoRandomVertexReader.aggregateVertices";
+  /** Set the number of edges per vertex (pseudo-random destination). */
+  public static final String EDGES_PER_VERTEX =
+      "pseudoRandomVertexReader.edgesPerVertex";
+
+  @Override
+  public final List<InputSplit> getSplits(final JobContext context,
+      final int numWorkers) throws IOException, InterruptedException {
+    // This is meaningless, the PseudoRandomVertexReader will generate
+    // all the test data
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < numWorkers; ++i) {
+      inputSplitList.add(new BspInputSplit(i, numWorkers));
+    }
+    return inputSplitList;
+  }
+
+  @Override
+  public VertexReader<LongWritable, DoubleWritable, DoubleWritable, M>
+  createVertexReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    return new PseudoRandomVertexReader<M>();
+  }
+
+  /**
+   * Used by {@link PseudoRandomVertexInputFormat} to read
+   * pseudo-randomly generated data.
+   */
+  private static class PseudoRandomVertexReader<M extends Writable> implements
+      VertexReader<LongWritable, DoubleWritable, DoubleWritable, M> {
+    /** Logger. */
+    private static final Logger LOG =
+        Logger.getLogger(PseudoRandomVertexReader.class);
+    /** Starting vertex id. */
+    private long startingVertexId = -1;
+    /** Vertices read so far. */
+    private long verticesRead = 0;
+    /** Total vertices to read (on this split alone). */
+    private long totalSplitVertices = -1;
+    /** Aggregate vertices (all input splits). */
+    private long aggregateVertices = -1;
+    /** Edges per vertex. */
+    private long edgesPerVertex = -1;
+    /** BspInputSplit (used only for index). */
+    private BspInputSplit bspInputSplit;
+    /** Saved configuration */
+    private Configuration configuration;
+
+    /**
+     * Default constructor for reflection.
+     */
+    public PseudoRandomVertexReader() {
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit,
+        TaskAttemptContext context) throws IOException {
+      configuration = context.getConfiguration();
+      aggregateVertices =
+        configuration.getLong(
+          PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0);
+      if (aggregateVertices <= 0) {
+        throw new IllegalArgumentException(
+            PseudoRandomVertexInputFormat.AGGREGATE_VERTICES + " <= 0");
+      }
+      if (inputSplit instanceof BspInputSplit) {
+        bspInputSplit = (BspInputSplit) inputSplit;
+        long extraVertices =
+            aggregateVertices % bspInputSplit.getNumSplits();
+        totalSplitVertices =
+            aggregateVertices / bspInputSplit.getNumSplits();
+        if (bspInputSplit.getSplitIndex() < extraVertices) {
+          ++totalSplitVertices;
+        }
+        startingVertexId = (bspInputSplit.getSplitIndex() *
+            (aggregateVertices / bspInputSplit.getNumSplits())) +
+            Math.min(bspInputSplit.getSplitIndex(),
+                     extraVertices);
+      } else {
+        throw new IllegalArgumentException(
+            "initialize: Got " + inputSplit.getClass() +
+            " instead of " + BspInputSplit.class);
+      }
+      edgesPerVertex = configuration.getLong(
+        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 0);
+      if (edgesPerVertex <= 0) {
+        throw new IllegalArgumentException(
+          PseudoRandomVertexInputFormat.EDGES_PER_VERTEX + " <= 0");
+      }
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return totalSplitVertices > verticesRead;
+    }
+
+    @Override
+    public Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
+    getCurrentVertex() throws IOException, InterruptedException {
+      Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
+      vertex = BspUtils.createVertex(configuration);
+      long vertexId = startingVertexId + verticesRead;
+      // Seed on the vertex id to keep the vertex data the same when
+      // on different number of workers, but other parameters are the
+      // same.
+      Random rand = new Random(vertexId);
+      DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
+      Map<LongWritable, DoubleWritable> edges =
+          Maps.newHashMapWithExpectedSize((int) edgesPerVertex);
+      for (long i = 0; i < edgesPerVertex; ++i) {
+        LongWritable destVertexId = null;
+        do {
+          destVertexId =
+            new LongWritable(Math.abs(rand.nextLong()) %
+              aggregateVertices);
+        } while (edges.containsKey(destVertexId));
+        edges.put(destVertexId, new DoubleWritable(rand.nextDouble()));
+      }
+      vertex.initialize(new LongWritable(vertexId), vertexValue, edges, null);
+      ++verticesRead;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("next: Return vertexId=" +
+                  vertex.getId().get() +
+                  ", vertexValue=" + vertex.getValue() +
+                  ", edges=" + vertex.getEdges());
+      }
+      return vertex;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return verticesRead * 100.0f / totalSplitVertices;
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Sequence file vertex input format based on {@link SequenceFileInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param <X> Value type
+ */
+@SuppressWarnings("rawtypes")
+public class SequenceFileVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable,
+    X extends Vertex<I, V, E, M>>
+    extends VertexInputFormat<I, V, E, M> {
+  /** Internal input format */
+  protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
+    new SequenceFileInputFormat<I, X>();
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return sequenceFileInputFormat.getSplits(context);
+  }
+
+  @Override
+  public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return new SequenceFileVertexReader<I, V, E, M, X>(
+        sequenceFileInputFormat.createRecordReader(split, context));
+  }
+
+  /**
+   * Vertex reader used with {@link SequenceFileVertexInputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param <X> Value type
+   */
+  public static class SequenceFileVertexReader<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable,
+      X extends Vertex<I, V, E, M>>
+      implements VertexReader<I, V, E, M> {
+    /** Internal record reader from {@link SequenceFileInputFormat} */
+    private final RecordReader<I, X> recordReader;
+
+    /**
+     * Constructor with record reader.
+     *
+     * @param recordReader Reader from {@link SequenceFileInputFormat}.
+     */
+    public SequenceFileVertexReader(RecordReader<I, X> recordReader) {
+      this.recordReader = recordReader;
+    }
+
+    @Override public void initialize(InputSplit inputSplit,
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      recordReader.initialize(inputSplit, context);
+    }
+
+    @Override public boolean nextVertex() throws IOException,
+        InterruptedException {
+      return recordReader.nextKeyValue();
+    }
+
+    @Override public Vertex<I, V, E, M> getCurrentVertex()
+      throws IOException, InterruptedException {
+      return recordReader.getCurrentValue();
+    }
+
+
+    @Override public void close() throws IOException {
+      recordReader.close();
+    }
+
+    @Override public float getProgress() throws IOException,
+        InterruptedException {
+      return recordReader.getProgress();
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Class to read graphs stored as adjacency lists with ids represented by
+ * Strings and values as doubles.  This is a good inputformat for reading
+ * graphs where the id types do not matter and can be stashed in a String.
+ *
+ * @param <M> Message type.
+ */
+public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+    extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable, M>  {
+
+
+  /**
+   * Vertex reader used with
+   * {@link TextDoubleDoubleAdjacencyListVertexInputFormat}
+   *
+   * @param <M> Message type.
+   */
+  static class VertexReader<M extends Writable> extends
+      AdjacencyListVertexReader<Text, DoubleWritable, DoubleWritable, M> {
+    /**
+     * Constructor without sanitzer.
+     *
+     * @param lineRecordReader Internal reader.
+     */
+    VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+      super(lineRecordReader);
+    }
+
+    /**
+     * Constructor with {@link LineRecordReader}
+     *
+     * @param lineRecordReader Internal reader.
+     * @param sanitizer Sanitizer of the lines.
+     */
+    VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
+        LineSanitizer sanitizer) {
+      super(lineRecordReader, sanitizer);
+    }
+
+    @Override
+    public void decodeId(String s, Text id) {
+      id.set(s);
+    }
+
+    @Override
+    public void decodeValue(String s, DoubleWritable value) {
+      value.set(Double.valueOf(s));
+    }
+
+    @Override
+    public void decodeEdge(String s1, String s2,
+                           Edge<Text, DoubleWritable> textIntWritableEdge) {
+      textIntWritableEdge.setTargetVertexId(new Text(s1));
+      textIntWritableEdge.setValue(new DoubleWritable(Double.valueOf(s2)));
+    }
+  }
+
+  @Override
+  public org.apache.giraph.graph.VertexReader<Text, DoubleWritable,
+      DoubleWritable, M> createVertexReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return new VertexReader<M>(textInputFormat.createRecordReader(
+        split, context));
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * vertex output format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VertexInputFormat<I, V, E, M> {
+  /** Uses the TextInputFormat to do everything */
+  protected TextInputFormat textInputFormat = new TextInputFormat();
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex input.  Easiest to ignore the key value separator and only use
+   * key instead.
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   */
+  public abstract static class TextVertexReader<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      implements VertexReader<I, V, E, M> {
+    /** Internal line record reader */
+    private final RecordReader<LongWritable, Text> lineRecordReader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    /**
+     * Initialize with the LineRecordReader.
+     *
+     * @param lineRecordReader Line record reader from TextInputFormat
+     */
+    public TextVertexReader(
+        RecordReader<LongWritable, Text> lineRecordReader) {
+      this.lineRecordReader = lineRecordReader;
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      lineRecordReader.initialize(inputSplit, context);
+      this.context = context;
+    }
+
+    @Override
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return lineRecordReader.getProgress();
+    }
+
+    /**
+     * Get the line record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected RecordReader<LongWritable, Text> getRecordReader() {
+      return lineRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // Ignore the hint of numWorkers here since we are using TextInputFormat
+    // to do this for us
+    return textInputFormat.getSplits(context);
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * vertex output format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends VertexOutputFormat<I, V, E> {
+  /** Uses the TextOutputFormat to do everything */
+  protected TextOutputFormat<Text, Text> textOutputFormat =
+      new TextOutputFormat<Text, Text>();
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex output.  Easiest to ignore the key value separator and only use
+   * key instead.
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   */
+  public abstract static class TextVertexWriter<I extends WritableComparable,
+      V extends Writable, E extends Writable> implements VertexWriter<I, V, E> {
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+    /** Internal line record writer */
+    private final RecordWriter<Text, Text> lineRecordWriter;
+
+    /**
+     * Initialize with the LineRecordWriter.
+     *
+     * @param lineRecordWriter Line record writer from TextOutputFormat
+     */
+    public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+      this.lineRecordWriter = lineRecordWriter;
+    }
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException {
+      this.context = context;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      lineRecordWriter.close(context);
+    }
+
+    /**
+     * Get the line record writer.
+     *
+     * @return Record writer to be used for writing.
+     */
+    public RecordWriter<Text, Text> getRecordWriter() {
+      return lineRecordWriter;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    public TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    textOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return textOutputFormat.getOutputCommitter(context);
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java?rev=1371498&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/package-info.java Thu Aug  9 21:43:36 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of reusable library Giraph objects.
+ */
+package org.apache.giraph.io;



Mime
View raw message