giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [12/23] GIRAPH-409: Refactor / cleanups (nitay)
Date Fri, 04 Jan 2013 20:52:39 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
deleted file mode 100644
index bac0a39..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * This VertexInputFormat is meant for large scale testing.  It allows the user
- * to create an input data source that a variable number of aggregate vertices
- * and edges per vertex that is repeatable for the exact same parameter
- * (pseudo-random).
- *
- * @param <M> Message data
- */
-public class PseudoRandomVertexInputFormat<M extends Writable> extends
-    VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> {
-  /** Set the number of aggregate vertices. */
-  public static final String AGGREGATE_VERTICES =
-      "pseudoRandomVertexInputFormat.aggregateVertices";
-  /** Set the number of edges per vertex (pseudo-random destination). */
-  public static final String EDGES_PER_VERTEX =
-      "pseudoRandomVertexInputFormat.edgesPerVertex";
-
-  @Override
-  public final List<InputSplit> getSplits(final JobContext context,
-      final int numWorkers) throws IOException, InterruptedException {
-    // This is meaningless, the PseudoRandomVertexReader will generate
-    // all the test data
-    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-    for (int i = 0; i < numWorkers; ++i) {
-      inputSplitList.add(new BspInputSplit(i, numWorkers));
-    }
-    return inputSplitList;
-  }
-
-  @Override
-  public VertexReader<LongWritable, DoubleWritable, DoubleWritable, M>
-  createVertexReader(InputSplit split, TaskAttemptContext context)
-    throws IOException {
-    return new PseudoRandomVertexReader<M>();
-  }
-
-  /**
-   * Used by {@link PseudoRandomVertexInputFormat} to read
-   * pseudo-randomly generated data.
-   */
-  private static class PseudoRandomVertexReader<M extends Writable> implements
-      VertexReader<LongWritable, DoubleWritable, DoubleWritable, M> {
-    /** Logger. */
-    private static final Logger LOG =
-        Logger.getLogger(PseudoRandomVertexReader.class);
-    /** Starting vertex id. */
-    private long startingVertexId = -1;
-    /** Vertices read so far. */
-    private long verticesRead = 0;
-    /** Total vertices to read (on this split alone). */
-    private long totalSplitVertices = -1;
-    /** Aggregate vertices (all input splits). */
-    private long aggregateVertices = -1;
-    /** Edges per vertex. */
-    private long edgesPerVertex = -1;
-    /** BspInputSplit (used only for index). */
-    private BspInputSplit bspInputSplit;
-    /** Saved configuration */
-    private ImmutableClassesGiraphConfiguration configuration;
-
-    /**
-     * Default constructor for reflection.
-     */
-    public PseudoRandomVertexReader() {
-    }
-
-    @Override
-    public void initialize(InputSplit inputSplit,
-        TaskAttemptContext context) throws IOException {
-      configuration = new ImmutableClassesGiraphConfiguration(
-          context.getConfiguration());
-      aggregateVertices =
-        configuration.getLong(
-          PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0);
-      if (aggregateVertices <= 0) {
-        throw new IllegalArgumentException(
-            PseudoRandomVertexInputFormat.AGGREGATE_VERTICES + " <= 0");
-      }
-      if (inputSplit instanceof BspInputSplit) {
-        bspInputSplit = (BspInputSplit) inputSplit;
-        long extraVertices =
-            aggregateVertices % bspInputSplit.getNumSplits();
-        totalSplitVertices =
-            aggregateVertices / bspInputSplit.getNumSplits();
-        if (bspInputSplit.getSplitIndex() < extraVertices) {
-          ++totalSplitVertices;
-        }
-        startingVertexId = (bspInputSplit.getSplitIndex() *
-            (aggregateVertices / bspInputSplit.getNumSplits())) +
-            Math.min(bspInputSplit.getSplitIndex(),
-                     extraVertices);
-      } else {
-        throw new IllegalArgumentException(
-            "initialize: Got " + inputSplit.getClass() +
-            " instead of " + BspInputSplit.class);
-      }
-      edgesPerVertex = configuration.getLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 0);
-      if (edgesPerVertex <= 0) {
-        throw new IllegalArgumentException(
-          PseudoRandomVertexInputFormat.EDGES_PER_VERTEX + " <= 0");
-      }
-    }
-
-    @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-      return totalSplitVertices > verticesRead;
-    }
-
-    @Override
-    public Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
-    getCurrentVertex() throws IOException, InterruptedException {
-      Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
-      vertex = configuration.createVertex();
-      long vertexId = startingVertexId + verticesRead;
-      // Seed on the vertex id to keep the vertex data the same when
-      // on different number of workers, but other parameters are the
-      // same.
-      Random rand = new Random(vertexId);
-      DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
-      List<Edge<LongWritable, DoubleWritable>> edges =
-          Lists.newArrayListWithCapacity((int) edgesPerVertex);
-      Set<LongWritable> destVertices = Sets.newHashSet();
-      for (long i = 0; i < edgesPerVertex; ++i) {
-        LongWritable destVertexId = null;
-        do {
-          destVertexId =
-            new LongWritable(Math.abs(rand.nextLong()) %
-              aggregateVertices);
-        } while (destVertices.contains(destVertexId));
-        edges.add(new Edge<LongWritable, DoubleWritable>(
-            destVertexId, new DoubleWritable(rand.nextDouble())));
-        destVertices.add(destVertexId);
-      }
-      vertex.initialize(new LongWritable(vertexId), vertexValue, edges);
-      ++verticesRead;
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("next: Return vertexId=" +
-            vertex.getId().get() +
-            ", vertexValue=" + vertex.getValue() +
-            ", edges=" + vertex.getEdges());
-      }
-      return vertex;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-      return verticesRead * 100.0f / totalSplitVertices;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
deleted file mode 100644
index a984089..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Sequence file vertex input format based on {@link SequenceFileInputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @param <X> Value type
- */
-@SuppressWarnings("rawtypes")
-public class SequenceFileVertexInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable,
-    X extends Vertex<I, V, E, M>>
-    extends VertexInputFormat<I, V, E, M> {
-  /** Internal input format */
-  protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
-    new SequenceFileInputFormat<I, X>();
-
-  @Override
-  public List<InputSplit> getSplits(JobContext context, int numWorkers)
-    throws IOException, InterruptedException {
-    return sequenceFileInputFormat.getSplits(context);
-  }
-
-  @Override
-  public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
-      TaskAttemptContext context) throws IOException {
-    return new SequenceFileVertexReader<I, V, E, M, X>(
-        sequenceFileInputFormat.createRecordReader(split, context));
-  }
-
-  /**
-   * Vertex reader used with {@link SequenceFileVertexInputFormat}.
-   *
-   * @param <I> Vertex id
-   * @param <V> Vertex data
-   * @param <E> Edge data
-   * @param <M> Message data
-   * @param <X> Value type
-   */
-  public static class SequenceFileVertexReader<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable,
-      X extends Vertex<I, V, E, M>>
-      implements VertexReader<I, V, E, M> {
-    /** Internal record reader from {@link SequenceFileInputFormat} */
-    private final RecordReader<I, X> recordReader;
-
-    /**
-     * Constructor with record reader.
-     *
-     * @param recordReader Reader from {@link SequenceFileInputFormat}.
-     */
-    public SequenceFileVertexReader(RecordReader<I, X> recordReader) {
-      this.recordReader = recordReader;
-    }
-
-    @Override public void initialize(InputSplit inputSplit,
-        TaskAttemptContext context) throws IOException, InterruptedException {
-      recordReader.initialize(inputSplit, context);
-    }
-
-    @Override public boolean nextVertex() throws IOException,
-        InterruptedException {
-      return recordReader.nextKeyValue();
-    }
-
-    @Override public Vertex<I, V, E, M> getCurrentVertex()
-      throws IOException, InterruptedException {
-      return recordReader.getCurrentValue();
-    }
-
-
-    @Override public void close() throws IOException {
-      recordReader.close();
-    }
-
-    @Override public float getProgress() throws IOException,
-        InterruptedException {
-      return recordReader.getProgress();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
deleted file mode 100644
index a009000..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Class to read graphs stored as adjacency lists with ids represented by
- * Strings and values as doubles.  This is a good inputformat for reading
- * graphs where the id types do not matter and can be stashed in a String.
- *
- * @param <M> Message type.
- */
-public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
-    extends AdjacencyListTextVertexInputFormat<Text, DoubleWritable,
-            DoubleWritable, M>  {
-
-  @Override
-  public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
-      TaskAttemptContext context) {
-    return new TextDoubleDoubleAdjacencyListVertexReader(null);
-  }
-
-  /**
-   * Vertex reader used with
-   * {@link TextDoubleDoubleAdjacencyListVertexInputFormat}
-   */
-  protected class TextDoubleDoubleAdjacencyListVertexReader extends
-      AdjacencyListTextVertexReader {
-
-    /**
-     * Constructor with {@link LineSanitizer}.
-     *
-     * @param lineSanitizer the sanitizer to use for reading
-     */
-    public TextDoubleDoubleAdjacencyListVertexReader(LineSanitizer
-        lineSanitizer) {
-      super(lineSanitizer);
-    }
-
-    @Override
-    public Text decodeId(String s) {
-      return new Text(s);
-    }
-
-    @Override
-    public DoubleWritable decodeValue(String s) {
-      return new DoubleWritable(Double.valueOf(s));
-    }
-
-    @Override
-    public Edge<Text, DoubleWritable> decodeEdge(String s1, String s2) {
-      return new Edge<Text, DoubleWritable>(new Text(s1),
-          new DoubleWritable(Double.valueOf(s2)));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
deleted file mode 100644
index e4cbf94..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeInputFormat;
-import org.apache.giraph.graph.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Abstract class that users should subclass to use their own text based
- * edge output format.
- *
- * @param <I> Vertex id
- * @param <E> Edge data
- */
-@SuppressWarnings("rawtypes")
-public abstract class TextEdgeInputFormat<I extends WritableComparable,
-    E extends Writable> extends EdgeInputFormat<I, E> {
-  /** Underlying GiraphTextInputFormat. */
-  protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
-
-  @Override
-  public List<InputSplit> getSplits(
-      JobContext context, int numWorkers) throws IOException,
-      InterruptedException {
-    // Ignore the hint of numWorkers here since we are using
-    // GiraphTextInputFormat to do this for us
-    return textInputFormat.getEdgeSplits(context);
-  }
-
-  @Override
-  public abstract TextEdgeReader createEdgeReader(
-      InputSplit split, TaskAttemptContext context) throws IOException;
-
-  /**
-   * {@link EdgeReader} for {@link TextEdgeInputFormat}.
-   */
-  protected abstract class TextEdgeReader implements EdgeReader<I, E> {
-    /** Internal line record reader */
-    private RecordReader<LongWritable, Text> lineRecordReader;
-    /** Context passed to initialize */
-    private TaskAttemptContext context;
-    /**
-     * Cached configuration. We don't care about vertex value and message type.
-     */
-    private ImmutableClassesGiraphConfiguration<I, NullWritable, E,
-        NullWritable> conf;
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      this.context = context;
-      conf = new ImmutableClassesGiraphConfiguration<I, NullWritable, E,
-          NullWritable>(context.getConfiguration());
-      lineRecordReader = createLineRecordReader(inputSplit, context);
-      lineRecordReader.initialize(inputSplit, context);
-    }
-
-    /**
-     * Create the line record reader. Override this to use a different
-     * underlying record reader (useful for testing).
-     *
-     * @param inputSplit
-     *          the split to read
-     * @param context
-     *          the context passed to initialize
-     * @return
-     *         the record reader to be used
-     * @throws IOException
-     *           exception that can be thrown during creation
-     * @throws InterruptedException
-     *           exception that can be thrown during creation
-     */
-    protected RecordReader<LongWritable, Text>
-    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      return textInputFormat.createRecordReader(inputSplit, context);
-    }
-
-    @Override
-    public void close() throws IOException {
-      lineRecordReader.close();
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return lineRecordReader.getProgress();
-    }
-
-    /**
-     * Get the line record reader.
-     *
-     * @return Record reader to be used for reading.
-     */
-    protected RecordReader<LongWritable, Text> getRecordReader() {
-      return lineRecordReader;
-    }
-
-    /**
-     * Get the context.
-     *
-     * @return Context passed to initialize.
-     */
-    protected TaskAttemptContext getContext() {
-      return context;
-    }
-
-    /**
-     * Get the configuration.
-     *
-     * @return Configuration for this reader
-     */
-    protected ImmutableClassesGiraphConfiguration<I, NullWritable, E,
-        NullWritable> getConf() {
-      return conf;
-    }
-  }
-
-  /**
-   * Abstract class to be implemented by the user to read an edge from each
-   * text line.
-   */
-  protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader {
-    @Override
-    public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
-        InterruptedException {
-      Text line = getRecordReader().getCurrentValue();
-      I sourceVertexId = getSourceVertexId(line);
-      I targetVertexId = getTargetVertexId(line);
-      E edgeValue = getValue(line);
-      return new EdgeWithSource<I, E>(sourceVertexId,
-          new Edge<I, E>(targetVertexId, edgeValue));
-    }
-
-    @Override
-    public final boolean nextEdge() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
-    }
-
-    /**
-     * Reads source vertex id from the current line.
-     *
-     * @param line
-     *          the current line
-     * @return
-     *         the source vertex id corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getSourceVertexId(Text line) throws IOException;
-
-
-    /**
-     * Reads target vertex id from the current line.
-     *
-     * @param line
-     *          the current line
-     * @return
-     *         the target vertex id corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getTargetVertexId(Text line) throws IOException;
-
-    /**
-     * Reads edge value from the current line.
-     *
-     * @param line
-     *          the current line
-     * @return
-     *         the edge value corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract E getValue(Text line) throws IOException;
-  }
-
-  /**
-   * Abstract class to be implemented by the user to read an edge from each
-   * text line after preprocessing it.
-   *
-   * @param <T>
-   *          The resulting type of preprocessing.
-   */
-  protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends
-      TextEdgeReader {
-    @Override
-    public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
-        InterruptedException {
-      Text line = getRecordReader().getCurrentValue();
-      T processed = preprocessLine(line);
-      I sourceVertexId = getSourceVertexId(processed);
-      I targetVertexId = getTargetVertexId(processed);
-      E edgeValue = getValue(processed);
-      return new EdgeWithSource<I, E>(sourceVertexId,
-          new Edge<I, E>(targetVertexId, edgeValue));
-    }
-
-    @Override
-    public final boolean nextEdge() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
-    }
-
-    /**
-     * Preprocess the line so other methods can easily read necessary
-     * information for creating edge
-     *
-     * @param line
-     *          the current line to be read
-     * @return
-     *         the preprocessed object
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract T preprocessLine(Text line) throws IOException;
-
-    /**
-     * Reads target vertex id from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the target vertex id
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getTargetVertexId(T line) throws IOException;
-
-    /**
-     * Reads source vertex id from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the source vertex id
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getSourceVertexId(T line) throws IOException;
-
-    /**
-     * Reads edge value from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the edge value
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract E getValue(T line) throws IOException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
deleted file mode 100644
index e085473..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Abstract class that users should subclass to use their own text based
- * vertex input format.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class TextVertexInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends VertexInputFormat<I, V, E, M> {
-
-  /** Uses the GiraphTextInputFormat to do everything */
-  protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
-
-  @Override
-  public List<InputSplit> getSplits(JobContext context, int numWorkers)
-    throws IOException, InterruptedException {
-    // Ignore the hint of numWorkers here since we are using
-    // GiraphTextInputFormat to do this for us
-    return textInputFormat.getVertexSplits(context);
-  }
-
-  /**
-   * The factory method which produces the {@link TextVertexReader} used by this
-   * input format.
-   *
-   * @param split
-   *          the split to be read
-   * @param context
-   *          the information about the task
-   * @return
-   *         the text vertex reader to be used
-   */
-  @Override
-  public abstract TextVertexReader createVertexReader(InputSplit split,
-      TaskAttemptContext context) throws IOException;
-
-  /**
-   * Abstract class to be implemented by the user based on their specific
-   * vertex input. Easiest to ignore the key value separator and only use
-   * key instead.
-   *
-   * When reading a vertex from each line, extend
-   * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line
-   * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you
-   * need common exception handling while preprocessing, then extend
-   * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}.
-   */
-  protected abstract class TextVertexReader implements
-    VertexReader<I, V, E, M> {
-    /** Internal line record reader */
-    private RecordReader<LongWritable, Text> lineRecordReader;
-    /** Context passed to initialize */
-    private TaskAttemptContext context;
-    /** Cached configuration */
-    private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      this.context = context;
-      conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
-          context.getConfiguration());
-      lineRecordReader = createLineRecordReader(inputSplit, context);
-      lineRecordReader.initialize(inputSplit, context);
-    }
-
-    /**
-     * Create the line record reader. Override this to use a different
-     * underlying record reader (useful for testing).
-     *
-     * @param inputSplit
-     *          the split to read
-     * @param context
-     *          the context passed to initialize
-     * @return
-     *         the record reader to be used
-     * @throws IOException
-     *           exception that can be thrown during creation
-     * @throws InterruptedException
-     *           exception that can be thrown during creation
-     */
-    protected RecordReader<LongWritable, Text>
-    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      return textInputFormat.createRecordReader(inputSplit, context);
-    }
-
-    @Override
-    public void close() throws IOException {
-      lineRecordReader.close();
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return lineRecordReader.getProgress();
-    }
-
-    /**
-     * Get the line record reader.
-     *
-     * @return Record reader to be used for reading.
-     */
-    protected RecordReader<LongWritable, Text> getRecordReader() {
-      return lineRecordReader;
-    }
-
-    /**
-     * Get the context.
-     *
-     * @return Context passed to initialize.
-     */
-    protected TaskAttemptContext getContext() {
-      return context;
-    }
-
-    /**
-     * Get the configuration.
-     *
-     * @return Configuration for this reader
-     */
-    protected ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
-      return conf;
-    }
-  }
-
-  /**
-   * Abstract class to be implemented by the user to read a vertex from each
-   * text line.
-   */
-  protected abstract class TextVertexReaderFromEachLine extends
-    TextVertexReader {
-
-    @Override
-    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
-    InterruptedException {
-      Text line = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, M> vertex = getConf().createVertex();
-      vertex.initialize(getId(line), getValue(line), getEdges(line));
-      return vertex;
-    }
-
-    @Override
-    public final boolean nextVertex() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
-    }
-
-    /**
-     * Reads vertex id from the current line.
-     *
-     * @param line
-     *          the current line
-     * @return
-     *         the vertex id corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getId(Text line) throws IOException;
-
-    /**
-     * Reads vertex value from the current line.
-     *
-     * @param line
-     *          the current line
-     * @return
-     *         the vertex value corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract V getValue(Text line) throws IOException;
-
-    /**
-     * Reads edges value from the current line.
-     *
-     * @param line
-     *          the current line
-     * @return
-     *         the edges
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract Iterable<Edge<I, E>> getEdges(Text line) throws
-        IOException;
-
-  }
-
-  /**
-   * Abstract class to be implemented by the user to read a vertex from each
-   * text line after preprocessing it.
-   *
-   * @param <T>
-   *          The resulting type of preprocessing.
-   */
-  protected abstract class TextVertexReaderFromEachLineProcessed<T> extends
-      TextVertexReader {
-
-    @Override
-    public final boolean nextVertex() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
-    }
-
-    @Override
-    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
-    InterruptedException {
-      Text line = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, M> vertex;
-      T processed = preprocessLine(line);
-      vertex = getConf().createVertex();
-      vertex.initialize(getId(processed), getValue(processed),
-          getEdges(processed));
-      return vertex;
-    }
-
-    /**
-     * Preprocess the line so other methods can easily read necessary
-     * information for creating vertex.
-     *
-     * @param line
-     *          the current line to be read
-     * @return
-     *         the preprocessed object
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract T preprocessLine(Text line) throws IOException;
-
-    /**
-     * Reads vertex id from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex id
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getId(T line) throws IOException;
-
-    /**
-     * Reads vertex value from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex value
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract V getValue(T line) throws IOException;
-
-    /**
-     * Reads edges from the preprocessed line.
-     *
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the edges
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract Iterable<Edge<I, E>> getEdges(T line) throws
-        IOException;
-
-  }
-
-  // CHECKSTYLE: stop RedundantThrows
-  /**
-   * Abstract class to be implemented by the user to read a vertex from each
-   * text line after preprocessing it with exception handling.
-   *
-   * @param <T>
-   *          The resulting type of preprocessing.
-   * @param <X>
-   *          The exception type that can be thrown due to preprocessing.
-   */
-  protected abstract class
-  TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends
-    Throwable> extends TextVertexReader {
-
-    @Override
-    public final boolean nextVertex() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
-        InterruptedException {
-      // Note we are reading from value only since key is the line number
-      Text line = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, M> vertex;
-      T processed = null;
-      try {
-        processed = preprocessLine(line);
-        Configuration conf = getContext().getConfiguration();
-        vertex = getConf().createVertex();
-        vertex.initialize(getId(processed), getValue(processed),
-            getEdges(processed));
-      } catch (IOException e) {
-        throw e;
-      // CHECKSTYLE: stop IllegalCatch
-      } catch (Throwable t) {
-        return handleException(line, processed, (X) t);
-      // CHECKSTYLE: resume IllegalCatch
-      }
-      return vertex;
-    }
-
-    /**
-     * Preprocess the line so other methods can easily read necessary
-     * information for creating vertex.
-     *
-     * @param line
-     *          the current line to be read
-     * @return
-     *         the preprocessed object
-     * @throws X
-     *           exception that can be thrown while preprocessing the line
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract T preprocessLine(Text line) throws X, IOException;
-
-    /**
-     * Reads vertex id from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex id
-     * @throws X
-     *           exception that can be thrown while reading the preprocessed
-     *           object
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getId(T line) throws X, IOException;
-
-    /**
-     * Reads vertex value from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex value
-     * @throws X
-     *           exception that can be thrown while reading the preprocessed
-     *           object
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract V getValue(T line) throws X, IOException;
-
-    /**
-     * Reads edges from the preprocessed line.
-     *
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the edges
-     * @throws X
-     *           exception that can be thrown while reading the preprocessed
-     *           object
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract Iterable<Edge<I, E>> getEdges(T line) throws X,
-        IOException;
-
-    /**
-     * Handles exceptions while reading vertex from each line.
-     *
-     * @param line
-     *          the line that was being read when the exception was thrown
-     * @param processed
-     *          the object obtained by preprocessing the line. Can be null if
-     *          exception was thrown during preprocessing.
-     * @param e
-     *          the exception thrown while reading the line
-     * @return the recovered/alternative vertex to be used
-     */
-    protected Vertex<I, V, E, M> handleException(Text line, T processed, X e) {
-      throw new IllegalArgumentException(e);
-    }
-
-  }
-  // CHECKSTYLE: resume RedundantThrows
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
deleted file mode 100644
index bce1210..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexOutputFormat;
-import org.apache.giraph.graph.VertexWriter;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-import java.io.IOException;
-
-/**
- * Abstract class that users should subclass to use their own text based
- * vertex output format.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class TextVertexOutputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends VertexOutputFormat<I, V, E> {
-
-  /** Uses the TextOutputFormat to do everything */
-  protected TextOutputFormat<Text, Text> textOutputFormat =
-      new TextOutputFormat<Text, Text>();
-
-  @Override
-  public void checkOutputSpecs(JobContext context)
-    throws IOException, InterruptedException {
-    textOutputFormat.checkOutputSpecs(context);
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    return textOutputFormat.getOutputCommitter(context);
-  }
-
-  /**
-   * The factory method which produces the {@link TextVertexWriter} used by this
-   * output format.
-   *
-   * @param context
-   *          the information about the task
-   * @return
-   *         the text vertex writer to be used
-   */
-  @Override
-  public abstract TextVertexWriter createVertexWriter(TaskAttemptContext
-      context) throws IOException, InterruptedException;
-
-  /**
-   * Abstract class to be implemented by the user based on their specific
-   * vertex output.  Easiest to ignore the key value separator and only use
-   * key instead.
-   */
-  protected abstract class TextVertexWriter implements VertexWriter<I, V, E> {
-    /** Internal line record writer */
-    private RecordWriter<Text, Text> lineRecordWriter;
-    /** Context passed to initialize */
-    private TaskAttemptContext context;
-
-    @Override
-    public void initialize(TaskAttemptContext context) throws IOException,
-           InterruptedException {
-      lineRecordWriter = createLineRecordWriter(context);
-      this.context = context;
-    }
-
-    /**
-     * Create the line record writer. Override this to use a different
-     * underlying record writer (useful for testing).
-     *
-     * @param context
-     *          the context passed to initialize
-     * @return
-     *         the record writer to be used
-     * @throws IOException
-     *           exception that can be thrown during creation
-     * @throws InterruptedException
-     *           exception that can be thrown during creation
-     */
-    protected RecordWriter<Text, Text> createLineRecordWriter(
-        TaskAttemptContext context) throws IOException, InterruptedException {
-      return textOutputFormat.getRecordWriter(context);
-    }
-
-    @Override
-    public void close(TaskAttemptContext context) throws IOException,
-        InterruptedException {
-      lineRecordWriter.close(context);
-    }
-
-    /**
-     * Get the line record writer.
-     *
-     * @return Record writer to be used for writing.
-     */
-    public RecordWriter<Text, Text> getRecordWriter() {
-      return lineRecordWriter;
-    }
-
-    /**
-     * Get the context.
-     *
-     * @return Context passed to initialize.
-     */
-    public TaskAttemptContext getContext() {
-      return context;
-    }
-  }
-
-  /**
-   * Abstract class to be implemented by the user to write a line for each
-   * vertex.
-   */
-  protected abstract class TextVertexWriterToEachLine extends TextVertexWriter {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public final void writeVertex(Vertex vertex) throws
-      IOException, InterruptedException {
-      // Note we are writing line as key with null value
-      getRecordWriter().write(convertVertexToLine(vertex), null);
-    }
-
-    /**
-     * Writes a line for the given vertex.
-     *
-     * @param vertex
-     *          the current vertex for writing
-     * @return the text line to be written
-     * @throws IOException
-     *           exception that can be thrown while writing
-     */
-    protected abstract Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
-      throws IOException;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java
deleted file mode 100644
index c557251..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.graph.VertexValueInputFormat;
-import org.apache.giraph.graph.VertexValueReader;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Abstract class that users should subclass to use their own text based
- * vertex value input format.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class TextVertexValueInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends VertexValueInputFormat<I, V, E, M> {
-  /** Uses the GiraphTextInputFormat to do everything */
-  protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
-
-  @Override
-  public List<InputSplit> getSplits(JobContext context, int numWorkers)
-    throws IOException, InterruptedException {
-    // Ignore the hint of numWorkers here since we are using
-    // GiraphTextInputFormat to do this for us
-    return textInputFormat.getVertexSplits(context);
-  }
-
-  @Override
-  public abstract TextVertexValueReader createVertexValueReader(
-      InputSplit split, TaskAttemptContext context) throws IOException;
-
-  /**
-   * {@link VertexValueReader} for {@link VertexValueInputFormat}.
-   */
-  protected abstract class TextVertexValueReader extends
-      VertexValueReader<I, V, E, M> {
-    /** Internal line record reader */
-    private RecordReader<LongWritable, Text> lineRecordReader;
-    /** Context passed to initialize */
-    private TaskAttemptContext context;
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      super.initialize(inputSplit, context);
-      this.context = context;
-      lineRecordReader = createLineRecordReader(inputSplit, context);
-      lineRecordReader.initialize(inputSplit, context);
-    }
-
-    /**
-     * Create the line record reader. Override this to use a different
-     * underlying record reader (useful for testing).
-     *
-     * @param inputSplit
-     *          the split to read
-     * @param context
-     *          the context passed to initialize
-     * @return
-     *         the record reader to be used
-     * @throws IOException
-     *           exception that can be thrown during creation
-     * @throws InterruptedException
-     *           exception that can be thrown during creation
-     */
-    protected RecordReader<LongWritable, Text>
-    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      return textInputFormat.createRecordReader(inputSplit, context);
-    }
-
-    @Override
-    public void close() throws IOException {
-      lineRecordReader.close();
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return lineRecordReader.getProgress();
-    }
-
-    /**
-     * Get the line record reader.
-     *
-     * @return Record reader to be used for reading.
-     */
-    protected RecordReader<LongWritable, Text> getRecordReader() {
-      return lineRecordReader;
-    }
-
-    /**
-     * Get the context.
-     *
-     * @return Context passed to initialize.
-     */
-    protected TaskAttemptContext getContext() {
-      return context;
-    }
-  }
-
-  /**
-   * Abstract class to be implemented by the user to read a vertex value from
-   * each text line.
-   */
-  protected abstract class TextVertexValueReaderFromEachLine extends
-      TextVertexValueReader {
-    @Override
-    public final I getCurrentVertexId() throws IOException,
-        InterruptedException {
-      return getId(getRecordReader().getCurrentValue());
-    }
-
-    @Override
-    public final V getCurrentVertexValue() throws IOException,
-        InterruptedException {
-      return getValue(getRecordReader().getCurrentValue());
-    }
-
-    @Override
-    public final boolean nextVertex() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
-    }
-
-    /**
-     * Reads vertex id from the current line.
-     *
-     * @param line
-     *          the current line
-     * @return
-     *         the vertex id corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getId(Text line) throws IOException;
-
-    /**
-     * Reads vertex value from the current line.
-     *
-     * @param line
-     *          the current line
-     * @return
-     *         the vertex value corresponding to the line
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract V getValue(Text line) throws IOException;
-  }
-
-  /**
-   * Abstract class to be implemented by the user to read a vertex value from
-   * each text line after preprocessing it.
-   *
-   * @param <T>
-   *          The resulting type of preprocessing.
-   */
-  protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
-      extends TextVertexValueReader {
-    /** Last preprocessed line. */
-    private T processedLine = null;
-
-    /** Get last preprocessed line. Generate it if missing.
-     *
-     * @return The last preprocessed line
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    private T getProcessedLine() throws IOException, InterruptedException {
-      if (processedLine == null) {
-        processedLine = preprocessLine(getRecordReader().getCurrentValue());
-      }
-      return processedLine;
-    }
-
-    @Override
-    public I getCurrentVertexId() throws IOException,
-        InterruptedException {
-      return getId(getProcessedLine());
-    }
-
-    @Override
-    public V getCurrentVertexValue() throws IOException,
-        InterruptedException {
-      return getValue(getProcessedLine());
-    }
-
-    @Override
-    public final boolean nextVertex() throws IOException, InterruptedException {
-      processedLine = null;
-      return getRecordReader().nextKeyValue();
-    }
-
-    /**
-     * Preprocess the line so other methods can easily read necessary
-     * information for creating vertex.
-     *
-     * @param line
-     *          the current line to be read
-     * @return
-     *         the preprocessed object
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract T preprocessLine(Text line) throws IOException;
-
-    /**
-     * Reads vertex id from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex id
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract I getId(T line) throws IOException;
-
-    /**
-     * Reads vertex value from the preprocessed line.
-     *
-     * @param line
-     *          the object obtained by preprocessing the line
-     * @return
-     *         the vertex value
-     * @throws IOException
-     *           exception that can be thrown while reading
-     */
-    protected abstract V getValue(T line) throws IOException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
new file mode 100644
index 0000000..0f73b8d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexInputFormat.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Use this to load data for a BSP application.  Note that the InputSplit must
+ * also implement Writable.  The InputSplits will determine the partitioning of
+ * vertices across the mappers, so keep that in consideration when implementing
+ * getSplits().
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GiraphInputFormat {
+  /**
+   * Logically split the vertices for a graph processing application.
+   *
+   * Each {@link InputSplit} is then assigned to a worker for processing.
+   *
+   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+   * input files are not physically split into chunks. For e.g. a split could
+   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
+   * also creates the {@link VertexReader} to read the {@link InputSplit}.
+   *
+   * Also, the number of workers is a hint given to the developer to try to
+   * intelligently determine how many splits to create (if this is
+   * adjustable) at runtime.
+   *
+   * @param context Context of the job
+   * @param numWorkers Number of workers used for this job
+   * @return an array of {@link InputSplit}s for the job.
+   */
+  @Override
+  public abstract List<InputSplit> getSplits(
+    JobContext context, int numWorkers)
+    throws IOException, InterruptedException;
+
+  /**
+   * Create a vertex reader for a given split. The framework will call
+   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param split the split to be read
+   * @param context the information about the task
+   * @return a new record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VertexReader<I, V, E, M> createVertexReader(
+      InputSplit split,
+      TaskAttemptContext context) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
new file mode 100644
index 0000000..270d040
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implement to output the graph after the computation.  It is modeled
+ * directly after the Hadoop OutputFormat.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexOutputFormat<
+    I extends WritableComparable, V extends Writable, E extends Writable> {
+  /**
+   * Create a vertex writer for a given split. The framework will call
+   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param context the information about the task
+   * @return a new vertex writer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VertexWriter<I, V, E> createVertexWriter(
+    TaskAttemptContext context) throws IOException, InterruptedException;
+
+  /**
+   * Check for validity of the output-specification for the job.
+   * (Copied from Hadoop OutputFormat)
+   *
+   * <p>This is to validate the output specification for the job when it is
+   * a job is submitted.  Typically checks that it does not already exist,
+   * throwing an exception when it already exists, so that output is not
+   * overwritten.</p>
+   *
+   * @param context information about the job
+   * @throws IOException when output should not be attempted
+   */
+  public abstract void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException;
+
+  /**
+   * Get the output committer for this output format. This is responsible
+   * for ensuring the output is committed correctly.
+   * (Copied from Hadoop OutputFormat)
+   *
+   * @param context the task context
+   * @return an output committer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract OutputCommitter getOutputCommitter(
+    TaskAttemptContext context) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
new file mode 100644
index 0000000..1b1c896
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Analogous to {@link RecordReader} for vertices.  Will read the vertices
+ * from an input split.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Use the input split and context to setup reading the vertices.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param inputSplit Input split to be used for reading vertices.
+   * @param context Context from the task.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException;
+
+  /**
+   *
+   * @return false iff there are no more vertices
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  boolean nextVertex() throws IOException, InterruptedException;
+
+  /**
+   * Get the current vertex.
+   *
+   * @return the current vertex which has been read.
+   *         nextVertex() should be called first.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  Vertex<I, V, E, M> getCurrentVertex()
+    throws IOException, InterruptedException;
+
+  /**
+   * Close this {@link VertexReader} to future operations.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
+
+  /**
+   * How much of the input has the {@link VertexReader} consumed i.e.
+   * has been processed by?
+   *
+   * @return Progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  float getProgress() throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/VertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueInputFormat.java
new file mode 100644
index 0000000..1ebda13
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueInputFormat.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Vertex input format that only allows setting vertex id and value. It can
+ * be used in conjunction with {@link EdgeInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+public abstract class VertexValueInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VertexInputFormat<I, V, E, M> {
+  /**
+   * Create a {@link VertexValueReader} for a given split. The framework will
+   * call {@link VertexValueReader#initialize(InputSplit,
+   * TaskAttemptContext)} before the split is used.
+   *
+   * @param split The split to be read
+   * @param context The information about the task
+   * @return A new vertex value reader
+   * @throws IOException
+   */
+  public abstract VertexValueReader<I, V, E, M> createVertexValueReader(
+      InputSplit split, TaskAttemptContext context) throws IOException;
+
+  @Override
+  public final VertexReader<I, V, E, M> createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return createVertexValueReader(split, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
new file mode 100644
index 0000000..923ca5c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Vertex reader for {@link org.apache.giraph.io.VertexValueInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+public abstract class VertexValueReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends BasicVertexValueReader<I, V, E, M> {
+  /** Configuration. */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+        context.getConfiguration());
+  }
+
+  @Override
+  public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+      InterruptedException {
+    Vertex<I, V, E, M> vertex = getConf().createVertex();
+    vertex.initialize(getCurrentVertexId(), getCurrentVertexValue());
+    return vertex;
+  }
+
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return configuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
new file mode 100644
index 0000000..82a19bb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implement to output a vertex range of the graph after the computation
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexWriter<I extends WritableComparable, V extends Writable,
+    E extends Writable> {
+  /**
+   * Use the context to setup writing the vertices.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param context Context used to write the vertices.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void initialize(TaskAttemptContext context) throws IOException,
+    InterruptedException;
+
+  /**
+   * Writes the next vertex and associated data
+   *
+   * @param vertex set the properties of this vertex
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void writeVertex(Vertex<I, V, E, ?> vertex)
+    throws IOException, InterruptedException;
+
+  /**
+   * Close this {@link VertexWriter} to future operations.
+   *
+   * @param context the context of the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void close(TaskAttemptContext context)
+    throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
new file mode 100644
index 0000000..5092352
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * VertexReader that readers lines of text with vertices encoded as adjacency
+ * lists and converts each token to the correct type.  For example, a graph
+ * with vertices as integers and values as doubles could be encoded as:
+ *   1 0.1 2 0.2 3 0.3
+ * to represent a vertex named 1, with 0.1 as its value and two edges, to
+ * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class AdjacencyListTextVertexInputFormat<I extends
+    WritableComparable, V extends Writable, E extends Writable, M extends
+    Writable> extends TextVertexInputFormat<I, V, E, M> {
+  /** Delimiter for split */
+  public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
+  /** Default delimiter for split */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+
+  /**
+   * Utility for doing any cleaning of each line before it is tokenized.
+   */
+  public interface LineSanitizer {
+    /**
+     * Clean string s before attempting to tokenize it.
+     *
+     * @param s String to be cleaned.
+     * @return Sanitized string.
+     */
+    String sanitize(String s);
+  }
+
+  @Override
+  public abstract AdjacencyListTextVertexReader createVertexReader(
+      InputSplit split, TaskAttemptContext context);
+
+  /**
+   * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.
+   */
+  protected abstract class AdjacencyListTextVertexReader extends
+    TextVertexReaderFromEachLineProcessed<String[]> {
+    /**
+     * Cached configuration.
+     */
+    private Configuration conf;
+
+    /** Cached delimiter used for split */
+    private String splitValue = null;
+
+    /**
+     * Sanitizer from constructor.
+     */
+    private final LineSanitizer sanitizer;
+
+
+    /**
+     * Constructor without line sanitizer.
+     *
+     */
+    public AdjacencyListTextVertexReader() {
+      this(null);
+    }
+
+    /**
+     * Constructor with line sanitizer.
+     *
+     * @param sanitizer Sanitizer to be used.
+     */
+    public AdjacencyListTextVertexReader(LineSanitizer sanitizer) {
+      this.sanitizer = sanitizer;
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+      conf = context.getConfiguration();
+      splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+    }
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String sanitizedLine;
+      if (sanitizer != null) {
+        sanitizedLine = sanitizer.sanitize(line.toString());
+      } else {
+        sanitizedLine = line.toString();
+      }
+      String [] values = sanitizedLine.split(splitValue);
+      if ((values.length < 2) || (values.length % 2 != 0)) {
+        throw new IllegalArgumentException(
+          "Line did not split correctly: " + line);
+      }
+      return values;
+    }
+
+    @Override
+    protected I getId(String[] values) throws IOException {
+      return decodeId(values[0]);
+    }
+
+    /**
+     * Decode the id for this line into an instance of its correct type.
+     *
+     * @param s Id of vertex from line
+     * @return Vertex id
+     */
+    public abstract I decodeId(String s);
+
+    @Override
+    protected V getValue(String[] values) throws IOException {
+      return decodeValue(values[1]);
+    }
+
+
+    /**
+     * Decode the value for this line into an instance of its correct type.
+     *
+     * @param s Value from line
+     * @return Vertex value
+     */
+    public abstract V decodeValue(String s);
+
+    @Override
+    protected Iterable<Edge<I, E>> getEdges(String[] values) throws
+        IOException {
+      int i = 2;
+      List<Edge<I, E>> edges = Lists.newLinkedList();
+      while (i < values.length) {
+        edges.add(decodeEdge(values[i], values[i + 1]));
+        i += 2;
+      }
+      return edges;
+    }
+
+    /**
+     * Decode an edge from the line into an instance of a correctly typed Edge
+     *
+     * @param id The edge's id from the line
+     * @param value The edge's value from the line
+     * @return Edge with given target id and value
+     */
+    public abstract Edge<I, E> decodeEdge(String id, String value);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
new file mode 100644
index 0000000..934663e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * OutputFormat to write out the graph nodes as text, value-separated (by
+ * tabs, by default).  With the default delimiter, a vertex is written out as:
+ *
+ * <VertexId><tab><Vertex Value><tab>[<EdgeId><tab><EdgeValue>]+
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextVertexOutputFormat<I, V, E> {
+
+  /** Split delimiter */
+  public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+  /** Default split delimiter */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT =
+    AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE_DEFAULT;
+
+  @Override
+  public AdjacencyListTextVertexWriter createVertexWriter(
+      TaskAttemptContext context) {
+    return new AdjacencyListTextVertexWriter();
+  }
+
+  /**
+   * Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}.
+   */
+  protected class AdjacencyListTextVertexWriter extends
+    TextVertexWriterToEachLine {
+    /** Cached split delimeter */
+    private String delimiter;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      super.initialize(context);
+      delimiter = context.getConfiguration()
+          .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+    }
+
+    @Override
+    public Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException {
+      StringBuffer sb = new StringBuffer(vertex.getId().toString());
+      sb.append(delimiter);
+      sb.append(vertex.getValue());
+
+      for (Edge<I, E> edge : vertex.getEdges()) {
+        sb.append(delimiter).append(edge.getTargetVertexId());
+        sb.append(delimiter).append(edge.getValue());
+      }
+
+      return new Text(sb.toString());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
new file mode 100644
index 0000000..f308169
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/GeneratedVertexInputFormat.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This VertexInputFormat is meant for testing/debugging.  It simply generates
+ * some vertex data that can be consumed by test applications.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexInputFormat<
+    I extends WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> extends VertexInputFormat<I, V, E, M> {
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // This is meaningless, the VertexReader will generate all the test
+    // data.
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < numWorkers; ++i) {
+      inputSplitList.add(new BspInputSplit(i, numWorkers));
+    }
+    return inputSplitList;
+  }
+}


Mime
View raw message