giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [11/23] GIRAPH-409: Refactor / cleanups (nitay)
Date Fri, 04 Jan 2013 20:52:39 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphFileInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphFileInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphFileInputFormat.java
new file mode 100644
index 0000000..9c160fe
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphFileInputFormat.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]
+import org.apache.hadoop.mapreduce.security.TokenCache;
+end[HADOOP_NON_SECURE]*/
+
+/**
+ * Provides functionality similar to {@link FileInputFormat},
+ * but allows for different data sources (vertex and edge data).
+ *
+ * @param <K> Key
+ * @param <V> Value
+ */
+public abstract class GiraphFileInputFormat<K, V>
+    extends FileInputFormat<K, V> {
+  /** Vertex input file paths. */
+  public static final String VERTEX_INPUT_DIR = "giraph.vertex.input.dir";
+  /** Edge input file paths. */
+  public static final String EDGE_INPUT_DIR = "giraph.edge.input.dir";
+  /** Number of vertex input files. */
+  public static final String NUM_VERTEX_INPUT_FILES =
+      "giraph.input.vertex.num.files";
+  /** Number of edge input files. */
+  public static final String NUM_EDGE_INPUT_FILES =
+      "giraph.input.edge.num.files";
+
+  /** Split slop. */
+  private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+  /** Filter for hidden files. */
+  private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(GiraphFileInputFormat.class);
+
+  /**
+   * Add a {@link org.apache.hadoop.fs.Path} to the list of vertex inputs.
+   *
+   * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
+   * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
+   *                                              vertex inputs
+   */
+  public static void addVertexInputPath(Job job, Path path) throws IOException {
+    Configuration conf = job.getConfiguration();
+    path = path.getFileSystem(conf).makeQualified(path);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get(VERTEX_INPUT_DIR);
+    conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
+  }
+
+  /**
+   * Add a {@link org.apache.hadoop.fs.Path} to the list of edge inputs.
+   *
+   * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
+   * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
+   *                                              edge inputs
+   */
+  public static void addEdgeInputPath(Job job, Path path) throws IOException {
+    Configuration conf = job.getConfiguration();
+    path = path.getFileSystem(conf).makeQualified(path);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get(EDGE_INPUT_DIR);
+    conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
+  }
+
+  /**
+   * Get the list of vertex input {@link Path}s.
+   *
+   * @param context The job
+   * @return The list of input {@link Path}s
+   */
+  public static Path[] getVertexInputPaths(JobContext context) {
+    String dirs = context.getConfiguration().get(VERTEX_INPUT_DIR, "");
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of edge input {@link Path}s.
+   *
+   * @param context The job
+   * @return The list of input {@link Path}s
+   */
+  public static Path[] getEdgeInputPaths(JobContext context) {
+    String dirs = context.getConfiguration().get(EDGE_INPUT_DIR, "");
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * HIDDEN_FILE_FILTER together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    /** List of filters. */
+    private List<PathFilter> filters;
+
+    /**
+     * Constructor.
+     *
+     * @param filters The list of filters
+     */
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    /**
+     * True iff all filters accept the given path.
+     *
+     * @param path The path to check
+     * @return Whether the path is accepted
+     */
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Common method for listing vertex/edge input directories.
+   *
+   * @param job The job
+   * @param dirs list of vertex/edge input paths
+   * @return Array of FileStatus objects
+   * @throws IOException
+   */
+  private List<FileStatus> listStatus(JobContext job, Path[] dirs)
+    throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
+        job.getConfiguration());
+end[HADOOP_NON_SECURE]*/
+
+    List<IOException> errors = new ArrayList<IOException>();
+
+    // creates a MultiPathFilter with the HIDDEN_FILE_FILTER and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(HIDDEN_FILE_FILTER);
+    PathFilter jobFilter = getInputPathFilter(job);
+    if (jobFilter != null) {
+      filters.add(jobFilter);
+    }
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (Path p : dirs) {
+      FileSystem fs = p.getFileSystem(job.getConfiguration());
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat: matches) {
+          if (globStat.isDir()) {
+            Collections.addAll(result, fs.listStatus(globStat.getPath()));
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  /**
+   * List vertex input directories.
+   *
+   * @param job the job to list vertex input paths for
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listVertexStatus(JobContext job)
+    throws IOException {
+    return listStatus(job, getVertexInputPaths(job));
+  }
+
+  /**
+   * List edge input directories.
+   *
+   * @param job the job to list edge input paths for
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listEdgeStatus(JobContext job)
+    throws IOException {
+    return listStatus(job, getEdgeInputPaths(job));
+  }
+
+  /**
+   * Common method for generating the list of vertex/edge input splits.
+   *
+   * @param job The job
+   * @param files Array of FileStatus objects for vertex/edge input files
+   * @return The list of vertex/edge input splits
+   * @throws IOException
+   */
+  private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
+    throws IOException {
+    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
+    long maxSize = getMaxSplitSize(job);
+
+    // generate splits
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    for (FileStatus file: files) {
+      Path path = file.getPath();
+      FileSystem fs = path.getFileSystem(job.getConfiguration());
+      long length = file.getLen();
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+      if ((length != 0) && isSplitable(job, path)) {
+        long blockSize = file.getBlockSize();
+        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+          int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+          splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+              blkLocations[blkIndex].getHosts()));
+          bytesRemaining -= splitSize;
+        }
+
+        if (bytesRemaining != 0) {
+          splits.add(new FileSplit(path, length - bytesRemaining,
+              bytesRemaining,
+              blkLocations[blkLocations.length - 1].getHosts()));
+        }
+      } else if (length != 0) {
+        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+      } else {
+        //Create empty hosts array for zero length files
+        splits.add(new FileSplit(path, 0, length, new String[0]));
+      }
+    }
+    return splits;
+  }
+
+  /**
+   * Generate the list of vertex input splits.
+   *
+   * @param job The job
+   * @return The list of vertex input splits
+   * @throws IOException
+   */
+  public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
+    List<FileStatus> files = listVertexStatus(job);
+    List<InputSplit> splits = getSplits(job, files);
+    // Save the number of input files in the job-conf
+    job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
+    LOG.debug("Total # of vertex splits: " + splits.size());
+    return splits;
+  }
+
+  /**
+   * Generate the list of edge input splits.
+   *
+   * @param job The job
+   * @return The list of edge input splits
+   * @throws IOException
+   */
+  public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
+    List<FileStatus> files = listEdgeStatus(job);
+    List<InputSplit> splits = getSplits(job, files);
+    // Save the number of input files in the job-conf
+    job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
+    LOG.debug("Total # of edge splits: " + splits.size());
+    return splits;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextInputFormat.java
new file mode 100644
index 0000000..978116d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GiraphTextInputFormat.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+
+/**
+ * Provides functionality similar to {@link org.apache.hadoop
+ * .mapreduce.lib.input.TextInputFormat}, but allows for different data
+ * sources (vertex and edge data).
+ */
+public class GiraphTextInputFormat
+    extends GiraphFileInputFormat<LongWritable, Text> {
+  @Override
+  public RecordReader<LongWritable, Text>
+  createRecordReader(InputSplit split, TaskAttemptContext context) {
+    return new LineRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    CompressionCodec codec =
+        new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return codec == null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
new file mode 100644
index 0000000..352f054
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Write out Vertices' IDs and values, but not their edges nor edges' values.
+ * This is a useful output format when the final value of the vertex is
+ * all that's needed. The boolean configuration parameter reverse.id.and.value
+ * allows reversing the output of id and value.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class IdWithValueTextOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextVertexOutputFormat<I, V, E> {
+
+  /** Specify the output delimiter */
+  public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+  /** Default output delimiter */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+  /** Reverse id and value order? */
+  public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
+  /** Default is to not reverse id and value order. */
+  public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
+
+  @Override
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
+    return new IdWithValueVertexWriter();
+  }
+
+  /**
+   * Vertex writer used with {@link IdWithValueTextOutputFormat}.
+   */
+  protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine {
+    /** Saved delimiter */
+    private String delimiter;
+    /** Cached reserve option */
+    private boolean reverseOutput;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      super.initialize(context);
+      Configuration conf = context.getConfiguration();
+      delimiter = conf
+          .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+      reverseOutput = conf
+          .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
+    }
+
+    @Override
+    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException {
+      String first;
+      String second;
+      if (reverseOutput) {
+        first = vertex.getValue().toString();
+        second = vertex.getId().toString();
+      } else {
+        first = vertex.getId().toString();
+        second = vertex.getValue().toString();
+      }
+      Text line = new Text(first + delimiter + second);
+      return line;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
new file mode 100644
index 0000000..755cf17
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: vertex neighbor1 neighbor2 ...
+ */
+public class IntIntNullIntTextInputFormat extends
+    TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
+    IntWritable> {
+  /** Separator of the vertex and neighbors */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+    throws IOException {
+    return new IntIntNullIntVertexReader();
+  }
+
+  /**
+   * Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
+   */
+  public class IntIntNullIntVertexReader extends
+    TextVertexReaderFromEachLineProcessed<String[]> {
+    /**
+     * Cached vertex id for the current line
+     */
+    private IntWritable id;
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      id = new IntWritable(Integer.parseInt(tokens[0]));
+      return tokens;
+    }
+
+    @Override
+    protected IntWritable getId(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected IntWritable getValue(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
+        String[] tokens) throws IOException {
+      List<Edge<IntWritable, NullWritable>> edges =
+          Lists.newArrayListWithCapacity(tokens.length - 1);
+      for (int n = 1; n < tokens.length; n++) {
+        edges.add(new Edge<IntWritable, NullWritable>(
+            new IntWritable(Integer.parseInt(tokens[n])),
+            NullWritable.get()));
+      }
+      return edges;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
new file mode 100644
index 0000000..a7dbef8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.utils.IntPair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexValueInputFormat}
+ * for integer ids and values.
+ *
+ * Each line consists of: id, value
+ *
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class IntIntTextVertexValueInputFormat<E extends Writable,
+    M extends Writable> extends
+    TextVertexValueInputFormat<IntWritable, IntWritable, E, M> {
+  /** Separator for id and value */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextVertexValueReader createVertexValueReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntIntTextVertexValueReader();
+  }
+
+  /**
+   * {@link org.apache.giraph.io.VertexValueReader} associated with
+   * {@link IntIntTextVertexValueInputFormat}.
+   */
+  public class IntIntTextVertexValueReader extends
+      TextVertexValueReaderFromEachLineProcessed<IntPair> {
+
+    @Override
+    protected IntPair preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      return new IntPair(Integer.valueOf(tokens[0]),
+          Integer.valueOf(tokens[1]));
+    }
+
+    @Override
+    protected IntWritable getId(IntPair data) throws IOException {
+      return new IntWritable(data.getFirst());
+    }
+
+    @Override
+    protected IntWritable getValue(IntPair data) throws IOException {
+      return new IntWritable(data.getSecond());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
new file mode 100644
index 0000000..dda3f2f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
+ * unweighted graphs without edges or values, just vertices with ids.
+ *
+ * Each line is just simply the vertex id.
+ */
+public class IntNullNullNullTextInputFormat extends TextVertexInputFormat<
+    IntWritable, NullWritable, NullWritable, NullWritable> {
+  @Override
+  public TextVertexReader createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntNullNullNullVertexReader();
+  }
+
+  /**
+   * Reader for this InputFormat.
+   */
+  public class IntNullNullNullVertexReader extends
+      TextVertexReaderFromEachLineProcessed<String> {
+    /** Cached vertex id */
+    private IntWritable id;
+
+    @Override
+    protected String preprocessLine(Text line) throws IOException {
+      id = new IntWritable(Integer.parseInt(line.toString()));
+      return line.toString();
+    }
+
+    @Override
+    protected IntWritable getId(String line) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected NullWritable getValue(String line) throws IOException {
+      return NullWritable.get();
+    }
+
+    @Override
+    protected Iterable<Edge<IntWritable, NullWritable>> getEdges(String line)
+      throws IOException {
+      return ImmutableList.of();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullTextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullTextEdgeInputFormat.java
new file mode 100644
index 0000000..108f96c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullTextEdgeInputFormat.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.utils.IntPair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.EdgeInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: source_vertex, target_vertex
+ */
+public class IntNullTextEdgeInputFormat extends
+    TextEdgeInputFormat<IntWritable, NullWritable> {
+  /** Splitter for endpoints */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntNullTextEdgeReader();
+  }
+
+  /**
+   * {@link org.apache.giraph.io.EdgeReader} associated with
+   * {@link IntNullTextEdgeInputFormat}.
+   */
+  public class IntNullTextEdgeReader extends
+      TextEdgeReaderFromEachLineProcessed<IntPair> {
+    @Override
+    protected IntPair preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      return new IntPair(Integer.valueOf(tokens[0]),
+          Integer.valueOf(tokens[1]));
+    }
+
+    @Override
+    protected IntWritable getSourceVertexId(IntPair endpoints)
+      throws IOException {
+      return new IntWritable(endpoints.getFirst());
+    }
+
+    @Override
+    protected IntWritable getTargetVertexId(IntPair endpoints)
+      throws IOException {
+      return new IntWritable(endpoints.getSecond());
+    }
+
+    @Override
+    protected NullWritable getValue(IntPair endpoints) throws IOException {
+      return NullWritable.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexFormat.java
new file mode 100644
index 0000000..90cc444
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexFormat.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+/**
+ * Keeps the vertex keys for the input/output vertex format
+ */
+public class JsonBase64VertexFormat {
+  /** Vertex id key */
+  public static final String VERTEX_ID_KEY = "vertexId";
+  /** Vertex value key*/
+  public static final String VERTEX_VALUE_KEY = "vertexValue";
+  /** Edge value array key (all the edges are stored here) */
+  public static final String EDGE_ARRAY_KEY = "edgeArray";
+
+  /**
+   * Don't construct.
+   */
+  private JsonBase64VertexFormat() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
new file mode 100644
index 0000000..a6e17ec
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import net.iharder.Base64;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Simple way to represent the structure of the graph with a JSON object.
+ * The actual vertex ids, values, edges are stored by the
+ * Writable serialized bytes that are Byte64 encoded.
+ * Works with {@link JsonBase64VertexOutputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class JsonBase64VertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends TextVertexInputFormat<I, V, E, M> {
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new JsonBase64VertexReader();
+  }
+
+  /**
+   * Simple reader that supports {@link JsonBase64VertexInputFormat}
+   */
+  protected class JsonBase64VertexReader extends
+    TextVertexReaderFromEachLineProcessed<JSONObject> {
+
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+    }
+
+    @Override
+    protected JSONObject preprocessLine(Text line) {
+      try {
+        return new JSONObject(line.toString());
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get the vertex", e);
+      }
+    }
+
+    @Override
+    protected I getId(JSONObject vertexObject) throws IOException {
+      try {
+        byte[] decodedWritable = Base64.decode(
+            vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
+        DataInput input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        I vertexId = getConf().createVertexId();
+        vertexId.readFields(input);
+        return vertexId;
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get vertex id", e);
+      }
+    }
+
+    @Override
+    protected V getValue(JSONObject vertexObject) throws IOException {
+      try {
+        byte[] decodedWritable = Base64.decode(
+            vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
+        DataInputStream input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        V vertexValue = getConf().createVertexValue();
+        vertexValue.readFields(input);
+        return vertexValue;
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get vertex value", e);
+      }
+    }
+
+    @Override
+    protected Iterable<Edge<I, E>> getEdges(JSONObject vertexObject) throws
+    IOException {
+      JSONArray edgeArray = null;
+      try {
+        edgeArray = vertexObject.getJSONArray(
+          JsonBase64VertexFormat.EDGE_ARRAY_KEY);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get edge array", e);
+      }
+      byte[] decodedWritable;
+      List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(
+          edgeArray.length());
+      for (int i = 0; i < edgeArray.length(); ++i) {
+        try {
+          decodedWritable = Base64.decode(edgeArray.getString(i));
+        } catch (JSONException e) {
+          throw new IllegalArgumentException(
+            "next: Failed to get edge value", e);
+        }
+        DataInputStream input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        I targetVertexId = getConf().createVertexId();
+        targetVertexId.readFields(input);
+        E edgeValue = getConf().createEdgeValue();
+        edgeValue.readFields(input);
+        edges.add(new Edge<I, E>(targetVertexId, edgeValue));
+      }
+      return edges;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
new file mode 100644
index 0000000..0599742
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import net.iharder.Base64;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Simple way to represent the structure of the graph with a JSON object.
+ * The actual vertex ids, values, edges are stored by the
+ * Writable serialized bytes that are Byte64 encoded.
+ * Works with {@link JsonBase64VertexInputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class JsonBase64VertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    TextVertexOutputFormat<I, V, E> {
+
+  @Override
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
+    return new JsonBase64VertexWriter();
+  }
+
+  /**
+   * Simple writer that supports {@link JsonBase64VertexOutputFormat}
+   */
+  protected class JsonBase64VertexWriter extends TextVertexWriterToEachLine {
+
+    @Override
+    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException {
+      ByteArrayOutputStream outputStream =
+          new ByteArrayOutputStream();
+      DataOutput output = new DataOutputStream(outputStream);
+      JSONObject vertexObject = new JSONObject();
+      vertex.getId().write(output);
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.VERTEX_ID_KEY,
+          Base64.encodeBytes(outputStream.toByteArray()));
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert vertex id", e);
+      }
+      outputStream.reset();
+      vertex.getValue().write(output);
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.VERTEX_VALUE_KEY,
+          Base64.encodeBytes(outputStream.toByteArray()));
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert vertex value", e);
+      }
+      JSONArray edgeArray = new JSONArray();
+      for (Edge<I, E> edge : vertex.getEdges()) {
+        outputStream.reset();
+        edge.getTargetVertexId().write(output);
+        edge.getValue().write(output);
+        edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
+      }
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.EDGE_ARRAY_KEY,
+          edgeArray);
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert edge array", e);
+      }
+      return new Text(vertexObject.toString());
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
new file mode 100644
index 0000000..443c79a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+  * VertexInputFormat that features <code>long</code> vertex ID's,
+  * <code>double</code> vertex values and <code>float</code>
+  * out-edge weights, and <code>double</code> message types,
+  *  specified in JSON format.
+  */
+public class JsonLongDoubleFloatDoubleVertexInputFormat extends
+  TextVertexInputFormat<LongWritable, DoubleWritable,
+  FloatWritable, DoubleWritable> {
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new JsonLongDoubleFloatDoubleVertexReader();
+  }
+
+ /**
+  * VertexReader that features <code>double</code> vertex
+  * values and <code>float</code> out-edge weights. The
+  * files should be in the following JSON format:
+  * JSONArray(<vertex id>, <vertex value>,
+  *   JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
+  * Here is an example with vertex id 1, vertex value 4.3, and two edges.
+  * First edge has a destination vertex 2, edge value 2.1.
+  * Second edge has a destination vertex 3, edge value 0.7.
+  * [1,4.3,[[2,2.1],[3,0.7]]]
+  */
+  class JsonLongDoubleFloatDoubleVertexReader extends
+    TextVertexReaderFromEachLineProcessedHandlingExceptions<JSONArray,
+    JSONException> {
+
+    @Override
+    protected JSONArray preprocessLine(Text line) throws JSONException {
+      return new JSONArray(line.toString());
+    }
+
+    @Override
+    protected LongWritable getId(JSONArray jsonVertex) throws JSONException,
+              IOException {
+      return new LongWritable(jsonVertex.getLong(0));
+    }
+
+    @Override
+    protected DoubleWritable getValue(JSONArray jsonVertex) throws
+      JSONException, IOException {
+      return new DoubleWritable(jsonVertex.getDouble(1));
+    }
+
+    @Override
+    protected Iterable<Edge<LongWritable, FloatWritable>> getEdges(
+        JSONArray jsonVertex) throws JSONException, IOException {
+      JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
+      List<Edge<LongWritable, FloatWritable>> edges =
+          Lists.newArrayListWithCapacity(jsonEdgeArray.length());
+      for (int i = 0; i < jsonEdgeArray.length(); ++i) {
+        JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
+        edges.add(new Edge<LongWritable, FloatWritable>(
+            new LongWritable(jsonEdge.getLong(0)),
+            new FloatWritable((float) jsonEdge.getDouble(1))));
+      }
+      return edges;
+    }
+
+    @Override
+    protected Vertex<LongWritable, DoubleWritable, FloatWritable,
+              DoubleWritable> handleException(Text line, JSONArray jsonVertex,
+                  JSONException e) {
+      throw new IllegalArgumentException(
+          "Couldn't get vertex from line " + line, e);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
new file mode 100644
index 0000000..9a751ae
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+
+import java.io.IOException;
+
+/**
+ * VertexOutputFormat that supports JSON encoded vertices featuring
+ * <code>double</code> values and <code>float</code> out-edge weights
+ */
+public class JsonLongDoubleFloatDoubleVertexOutputFormat extends
+  TextVertexOutputFormat<LongWritable, DoubleWritable,
+  FloatWritable> {
+
+  @Override
+  public TextVertexWriter createVertexWriter(
+      TaskAttemptContext context) {
+    return new JsonLongDoubleFloatDoubleVertexWriter();
+  }
+
+ /**
+  * VertexWriter that supports vertices with <code>double</code>
+  * values and <code>float</code> out-edge weights.
+  */
+  private class JsonLongDoubleFloatDoubleVertexWriter extends
+    TextVertexWriterToEachLine {
+    @Override
+    public Text convertVertexToLine(
+      Vertex<LongWritable, DoubleWritable,
+        FloatWritable, ?> vertex
+    ) throws IOException {
+      JSONArray jsonVertex = new JSONArray();
+      try {
+        jsonVertex.put(vertex.getId().get());
+        jsonVertex.put(vertex.getValue().get());
+        JSONArray jsonEdgeArray = new JSONArray();
+        for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+          JSONArray jsonEdge = new JSONArray();
+          jsonEdge.put(edge.getTargetVertexId().get());
+          jsonEdge.put(edge.getValue().get());
+          jsonEdgeArray.put(jsonEdge);
+        }
+        jsonVertex.put(jsonEdgeArray);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "writeVertex: Couldn't write vertex " + vertex);
+      }
+      return new Text(jsonVertex.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
new file mode 100644
index 0000000..ee3dc38
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * InputFormat for reading graphs stored as (ordered) adjacency lists
+ * with the vertex ids longs and the vertex values and edges doubles.
+ * For example:
+ * 22 0.1 45 0.3 99 0.44
+ * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
+ * with values of 0.3 and 0.44, respectively.
+ *
+ * @param <M> Message data
+ */
+public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+    extends AdjacencyListTextVertexInputFormat<LongWritable, DoubleWritable,
+    DoubleWritable, M> {
+
+  @Override
+  public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new LongDoubleDoubleAdjacencyListVertexReader(null);
+  }
+
+  /**
+   * VertexReader associated with
+   * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}.
+   */
+  protected class LongDoubleDoubleAdjacencyListVertexReader extends
+      AdjacencyListTextVertexReader {
+
+    /**
+     * Constructor with {@link LineSanitizer}.
+     *
+     * @param lineSanitizer the sanitizer to use for reading
+     */
+    public LongDoubleDoubleAdjacencyListVertexReader(LineSanitizer
+        lineSanitizer) {
+      super(lineSanitizer);
+    }
+
+    @Override
+    public LongWritable decodeId(String s) {
+      return new LongWritable(Long.valueOf(s));
+    }
+
+    @Override
+    public DoubleWritable decodeValue(String s) {
+      return new DoubleWritable(Double.valueOf(s));
+    }
+
+    @Override
+    public Edge<LongWritable, DoubleWritable> decodeEdge(
+        String s1,
+        String s2) {
+      return new Edge<LongWritable, DoubleWritable>(
+          new LongWritable(Long.valueOf(s1)),
+          new DoubleWritable(Double.valueOf(s2)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
new file mode 100644
index 0000000..4428688
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import com.google.common.collect.Sets;
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * This {@link EdgeInputFormat} generates pseudo-random edges on the fly.
+ * As with {@link PseudoRandomVertexInputFormat}, the user specifies the
+ * number of vertices and the number of edges per vertex.
+ */
+public class PseudoRandomEdgeInputFormat
+    extends EdgeInputFormat<LongWritable, DoubleWritable> {
+  /** Set the number of aggregate vertices. */
+  public static final String AGGREGATE_VERTICES =
+      "pseudoRandomEdgeInputFormat.aggregateVertices";
+  /** Set the number of edges per vertex (pseudo-random destination). */
+  public static final String EDGES_PER_VERTEX =
+      "pseudoRandomEdgeInputFormat.edgesPerVertex";
+
+  @Override
+  public final List<InputSplit> getSplits(final JobContext context,
+                                          final int numWorkers)
+    throws IOException, InterruptedException {
+    // This is meaningless, the PseudoRandomEdgeReader will generate
+    // all the test data
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < numWorkers; ++i) {
+      inputSplitList.add(new BspInputSplit(i, numWorkers));
+    }
+    return inputSplitList;
+  }
+
+  @Override
+  public EdgeReader<LongWritable, DoubleWritable> createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new PseudoRandomEdgeReader();
+  }
+
+  /**
+   * {@link EdgeReader} that generates pseudo-random edges.
+   */
+  private static class PseudoRandomEdgeReader
+      implements EdgeReader<LongWritable, DoubleWritable> {
+    /** Logger. */
+    private static final Logger LOG =
+        Logger.getLogger(PseudoRandomEdgeReader.class);
+    /** Starting vertex id. */
+    private long startingVertexId = -1;
+    /** Vertices read so far. */
+    private long verticesRead = 0;
+    /** Total vertices to read (on this split alone). */
+    private long totalSplitVertices = -1;
+    /** Current vertex id. */
+    private LongWritable currentVertexId = new LongWritable(-1);
+    /** Edges read for the current vertex. */
+    private int currentVertexEdgesRead = 0;
+    /** Target vertices of edges for current vertex. */
+    private Set<LongWritable> currentVertexDestVertices = Sets.newHashSet();
+    /** Random number generator for the current vertex (for consistency
+     * across runs on different numbers of workers). */
+    private Random random = new Random();
+    /** Aggregate vertices (all input splits). */
+    private long aggregateVertices = -1;
+    /** Edges per vertex. */
+    private long edgesPerVertex = -1;
+    /** BspInputSplit (used only for index). */
+    private BspInputSplit bspInputSplit;
+    /** Saved configuration */
+    private ImmutableClassesGiraphConfiguration configuration;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      configuration = new ImmutableClassesGiraphConfiguration(
+          context.getConfiguration());
+      aggregateVertices =
+          configuration.getLong(
+              PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES, 0);
+      if (aggregateVertices <= 0) {
+        throw new IllegalArgumentException(
+            PseudoRandomEdgeInputFormat.AGGREGATE_VERTICES + " <= 0");
+      }
+      if (inputSplit instanceof BspInputSplit) {
+        bspInputSplit = (BspInputSplit) inputSplit;
+        long extraVertices =
+            aggregateVertices % bspInputSplit.getNumSplits();
+        totalSplitVertices =
+            aggregateVertices / bspInputSplit.getNumSplits();
+        if (bspInputSplit.getSplitIndex() < extraVertices) {
+          ++totalSplitVertices;
+        }
+        startingVertexId = (bspInputSplit.getSplitIndex() *
+            (aggregateVertices / bspInputSplit.getNumSplits())) +
+            Math.min(bspInputSplit.getSplitIndex(),
+                extraVertices);
+      } else {
+        throw new IllegalArgumentException(
+            "initialize: Got " + inputSplit.getClass() +
+                " instead of " + BspInputSplit.class);
+      }
+      edgesPerVertex = configuration.getLong(
+          PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, 0);
+      if (edgesPerVertex <= 0) {
+        throw new IllegalArgumentException(
+            PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX + " <= 0");
+      }
+    }
+
+    @Override
+    public boolean nextEdge() throws IOException, InterruptedException {
+      return totalSplitVertices > verticesRead + 1 ||
+          (totalSplitVertices == verticesRead + 1 &&
+              edgesPerVertex > currentVertexEdgesRead);
+    }
+
+    @Override
+    public EdgeWithSource<LongWritable, DoubleWritable> getCurrentEdge()
+      throws IOException, InterruptedException {
+      if (currentVertexEdgesRead == edgesPerVertex) {
+        ++verticesRead;
+        currentVertexId = new LongWritable(-1);
+      }
+
+      if (currentVertexId.get() == -1) {
+        currentVertexId.set(startingVertexId + verticesRead);
+        currentVertexEdgesRead = 0;
+        // Seed on the vertex id to keep the vertex data the same when
+        // on different number of workers, but other parameters are the
+        // same.
+        random.setSeed(currentVertexId.get());
+        currentVertexDestVertices.clear();
+      }
+
+      LongWritable destVertexId;
+      do {
+        destVertexId =
+            new LongWritable(Math.abs(random.nextLong()) %
+                aggregateVertices);
+      } while (currentVertexDestVertices.contains(destVertexId));
+      ++currentVertexEdgesRead;
+      currentVertexDestVertices.add(destVertexId);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
+            "" + destVertexId + ")");
+      }
+      return new EdgeWithSource<LongWritable, DoubleWritable>(
+          currentVertexId,
+          new Edge<LongWritable, DoubleWritable>(
+              destVertexId,
+              new DoubleWritable(random.nextDouble())));
+    }
+
+    @Override
+    public void close() throws IOException { }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return (verticesRead * edgesPerVertex + currentVertexEdgesRead) *
+          100.0f / (totalSplitVertices * edgesPerVertex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
new file mode 100644
index 0000000..db1b38a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * This VertexInputFormat is meant for large scale testing.  It allows the user
+ * to create an input data source that a variable number of aggregate vertices
+ * and edges per vertex that is repeatable for the exact same parameter
+ * (pseudo-random).
+ *
+ * @param <M> Message data
+ */
+public class PseudoRandomVertexInputFormat<M extends Writable> extends
+    VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> {
+  /** Set the number of aggregate vertices. */
+  public static final String AGGREGATE_VERTICES =
+      "pseudoRandomVertexInputFormat.aggregateVertices";
+  /** Set the number of edges per vertex (pseudo-random destination). */
+  public static final String EDGES_PER_VERTEX =
+      "pseudoRandomVertexInputFormat.edgesPerVertex";
+
+  @Override
+  public final List<InputSplit> getSplits(final JobContext context,
+      final int numWorkers) throws IOException, InterruptedException {
+    // This is meaningless, the PseudoRandomVertexReader will generate
+    // all the test data
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < numWorkers; ++i) {
+      inputSplitList.add(new BspInputSplit(i, numWorkers));
+    }
+    return inputSplitList;
+  }
+
+  @Override
+  public VertexReader<LongWritable, DoubleWritable, DoubleWritable, M>
+  createVertexReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    return new PseudoRandomVertexReader<M>();
+  }
+
+  /**
+   * Used by {@link PseudoRandomVertexInputFormat} to read
+   * pseudo-randomly generated data.
+   */
+  private static class PseudoRandomVertexReader<M extends Writable> implements
+      VertexReader<LongWritable, DoubleWritable, DoubleWritable, M> {
+    /** Logger. */
+    private static final Logger LOG =
+        Logger.getLogger(PseudoRandomVertexReader.class);
+    /** Starting vertex id. */
+    private long startingVertexId = -1;
+    /** Vertices read so far. */
+    private long verticesRead = 0;
+    /** Total vertices to read (on this split alone). */
+    private long totalSplitVertices = -1;
+    /** Aggregate vertices (all input splits). */
+    private long aggregateVertices = -1;
+    /** Edges per vertex. */
+    private long edgesPerVertex = -1;
+    /** BspInputSplit (used only for index). */
+    private BspInputSplit bspInputSplit;
+    /** Saved configuration */
+    private ImmutableClassesGiraphConfiguration configuration;
+
+    /**
+     * Default constructor for reflection.
+     */
+    public PseudoRandomVertexReader() {
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit,
+        TaskAttemptContext context) throws IOException {
+      configuration = new ImmutableClassesGiraphConfiguration(
+          context.getConfiguration());
+      aggregateVertices =
+        configuration.getLong(
+          PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0);
+      if (aggregateVertices <= 0) {
+        throw new IllegalArgumentException(
+            PseudoRandomVertexInputFormat.AGGREGATE_VERTICES + " <= 0");
+      }
+      if (inputSplit instanceof BspInputSplit) {
+        bspInputSplit = (BspInputSplit) inputSplit;
+        long extraVertices =
+            aggregateVertices % bspInputSplit.getNumSplits();
+        totalSplitVertices =
+            aggregateVertices / bspInputSplit.getNumSplits();
+        if (bspInputSplit.getSplitIndex() < extraVertices) {
+          ++totalSplitVertices;
+        }
+        startingVertexId = (bspInputSplit.getSplitIndex() *
+            (aggregateVertices / bspInputSplit.getNumSplits())) +
+            Math.min(bspInputSplit.getSplitIndex(),
+                     extraVertices);
+      } else {
+        throw new IllegalArgumentException(
+            "initialize: Got " + inputSplit.getClass() +
+            " instead of " + BspInputSplit.class);
+      }
+      edgesPerVertex = configuration.getLong(
+        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 0);
+      if (edgesPerVertex <= 0) {
+        throw new IllegalArgumentException(
+          PseudoRandomVertexInputFormat.EDGES_PER_VERTEX + " <= 0");
+      }
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return totalSplitVertices > verticesRead;
+    }
+
+    @Override
+    public Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
+    getCurrentVertex() throws IOException, InterruptedException {
+      Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
+      vertex = configuration.createVertex();
+      long vertexId = startingVertexId + verticesRead;
+      // Seed on the vertex id to keep the vertex data the same when
+      // on different number of workers, but other parameters are the
+      // same.
+      Random rand = new Random(vertexId);
+      DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
+      List<Edge<LongWritable, DoubleWritable>> edges =
+          Lists.newArrayListWithCapacity((int) edgesPerVertex);
+      Set<LongWritable> destVertices = Sets.newHashSet();
+      for (long i = 0; i < edgesPerVertex; ++i) {
+        LongWritable destVertexId = null;
+        do {
+          destVertexId =
+            new LongWritable(Math.abs(rand.nextLong()) %
+              aggregateVertices);
+        } while (destVertices.contains(destVertexId));
+        edges.add(new Edge<LongWritable, DoubleWritable>(
+            destVertexId, new DoubleWritable(rand.nextDouble())));
+        destVertices.add(destVertexId);
+      }
+      vertex.initialize(new LongWritable(vertexId), vertexValue, edges);
+      ++verticesRead;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("next: Return vertexId=" +
+            vertex.getId().get() +
+            ", vertexValue=" + vertex.getValue() +
+            ", edges=" + vertex.getEdges());
+      }
+      return vertex;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return verticesRead * 100.0f / totalSplitVertices;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
new file mode 100644
index 0000000..9428b87
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Sequence file vertex input format based on {@link SequenceFileInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param <X> Value type
+ */
+@SuppressWarnings("rawtypes")
+public class SequenceFileVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable,
+    X extends Vertex<I, V, E, M>>
+    extends VertexInputFormat<I, V, E, M> {
+  /** Internal input format */
+  protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
+    new SequenceFileInputFormat<I, X>();
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    return sequenceFileInputFormat.getSplits(context);
+  }
+
+  @Override
+  public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return new SequenceFileVertexReader<I, V, E, M, X>(
+        sequenceFileInputFormat.createRecordReader(split, context));
+  }
+
+  /**
+   * Vertex reader used with {@link SequenceFileVertexInputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param <X> Value type
+   */
+  public static class SequenceFileVertexReader<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable,
+      X extends Vertex<I, V, E, M>>
+      implements VertexReader<I, V, E, M> {
+    /** Internal record reader from {@link SequenceFileInputFormat} */
+    private final RecordReader<I, X> recordReader;
+
+    /**
+     * Constructor with record reader.
+     *
+     * @param recordReader Reader from {@link SequenceFileInputFormat}.
+     */
+    public SequenceFileVertexReader(RecordReader<I, X> recordReader) {
+      this.recordReader = recordReader;
+    }
+
+    @Override public void initialize(InputSplit inputSplit,
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      recordReader.initialize(inputSplit, context);
+    }
+
+    @Override public boolean nextVertex() throws IOException,
+        InterruptedException {
+      return recordReader.nextKeyValue();
+    }
+
+    @Override public Vertex<I, V, E, M> getCurrentVertex()
+      throws IOException, InterruptedException {
+      return recordReader.getCurrentValue();
+    }
+
+
+    @Override public void close() throws IOException {
+      recordReader.close();
+    }
+
+    @Override public float getProgress() throws IOException,
+        InterruptedException {
+      return recordReader.getProgress();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
new file mode 100644
index 0000000..2b7abd8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Class to read graphs stored as adjacency lists with ids represented by
+ * Strings and values as doubles.  This is a good inputformat for reading
+ * graphs where the id types do not matter and can be stashed in a String.
+ *
+ * @param <M> Message type.
+ */
+public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+    extends AdjacencyListTextVertexInputFormat<Text, DoubleWritable,
+            DoubleWritable, M>  {
+
+  @Override
+  public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new TextDoubleDoubleAdjacencyListVertexReader(null);
+  }
+
+  /**
+   * Vertex reader used with
+   * {@link TextDoubleDoubleAdjacencyListVertexInputFormat}
+   */
+  protected class TextDoubleDoubleAdjacencyListVertexReader extends
+      AdjacencyListTextVertexReader {
+
+    /**
+     * Constructor with {@link LineSanitizer}.
+     *
+     * @param lineSanitizer the sanitizer to use for reading
+     */
+    public TextDoubleDoubleAdjacencyListVertexReader(LineSanitizer
+        lineSanitizer) {
+      super(lineSanitizer);
+    }
+
+    @Override
+    public Text decodeId(String s) {
+      return new Text(s);
+    }
+
+    @Override
+    public DoubleWritable decodeValue(String s) {
+      return new DoubleWritable(Double.valueOf(s));
+    }
+
+    @Override
+    public Edge<Text, DoubleWritable> decodeEdge(String s1, String s2) {
+      return new Edge<Text, DoubleWritable>(new Text(s1),
+          new DoubleWritable(Double.valueOf(s2)));
+    }
+  }
+
+}


Mime
View raw message