giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject git commit: GIRAPH-: GiraphInputSplit (nitay)
Date Thu, 21 Mar 2013 17:15:30 GMT
Updated Branches:
  refs/heads/input-split-483 [created] f38d2b885


GIRAPH-: GiraphInputSplit (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f38d2b88
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f38d2b88
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f38d2b88

Branch: refs/heads/input-split-483
Commit: f38d2b8853ab3f24e560be5a1c104ca501ba55f4
Parents: 4585139
Author: Nitay Joffe <nitay@apache.org>
Authored: Wed Dec 19 14:53:51 2012 -0800
Committer: Nitay Joffe <nitay@apache.org>
Committed: Wed Dec 19 20:48:21 2012 -0800

----------------------------------------------------------------------
 .../LongDoubleFloatDoubleTextInputFormat.java      |    4 +-
 ...lizingLongDoubleFloatDoubleTextInputFormat.java |    4 +-
 .../giraph/examples/SimplePageRankVertex.java      |    4 +-
 .../giraph/examples/SimpleSuperstepVertex.java     |    4 +-
 .../org/apache/giraph/graph/BspServiceMaster.java  |   26 +++++++-------
 .../org/apache/giraph/graph/EdgeInputFormat.java   |   27 ++------------
 .../org/apache/giraph/graph/GiraphInputFormat.java |   24 +++++++++----
 .../org/apache/giraph/graph/VertexInputFormat.java |   29 ++-------------
 .../java/org/apache/giraph/graph/VertexReader.java |   10 -----
 .../giraph/graph/VertexValueInputFormat.java       |    3 +-
 .../io/AdjacencyListTextVertexInputFormat.java     |    3 +-
 .../apache/giraph/io/GiraphFileInputFormat.java    |   27 ++++++++------
 .../giraph/io/IntIntNullIntTextInputFormat.java    |    3 +-
 .../giraph/io/IntNullNullNullTextInputFormat.java  |    4 +-
 .../giraph/io/IntNullTextEdgeInputFormat.java      |    4 +-
 .../giraph/io/JsonBase64VertexInputFormat.java     |    3 +-
 ...JsonLongDoubleFloatDoubleVertexInputFormat.java |    4 +-
 ...DoubleDoubleAdjacencyListVertexInputFormat.java |    4 +-
 .../giraph/io/PseudoRandomVertexInputFormat.java   |    3 +-
 .../giraph/io/SequenceFileVertexInputFormat.java   |    3 +-
 ...DoubleDoubleAdjacencyListVertexInputFormat.java |    4 +-
 .../org/apache/giraph/io/TextEdgeInputFormat.java  |    5 ++-
 .../apache/giraph/io/TextVertexInputFormat.java    |    4 ++-
 .../io/accumulo/AccumuloVertexInputFormat.java     |    5 ++-
 .../giraph/io/hcatalog/GiraphHCatInputFormat.java  |   17 +++++----
 .../apache/giraph/io/hcatalog/GiraphHCatSplit.java |   18 +++++++++
 .../io/hcatalog/HCatalogEdgeInputFormat.java       |    8 +++--
 .../io/hcatalog/HCatalogVertexInputFormat.java     |    3 +-
 .../edgemarker/AccumuloEdgeInputFormat.java        |    5 ++-
 .../io/hbase/edgemarker/TableEdgeInputFormat.java  |    6 +++-
 .../org/apache/giraph/input/GiraphInputSplit.java  |   29 +++++++++++++++
 .../java/org/apache/giraph/io/GiraphFileSplit.java |   11 ++++++
 pom.xml                                            |    3 +-
 33 files changed, 175 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
index b06f07f..a4ca5bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
@@ -21,11 +21,11 @@ package org.apache.giraph.examples;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.io.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.google.common.collect.Lists;
@@ -45,7 +45,7 @@ public class LongDoubleFloatDoubleTextInputFormat
     FloatWritable, DoubleWritable> {
 
   @Override
-  public TextVertexReader createVertexReader(InputSplit split,
+  public TextVertexReader createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context)
     throws IOException {
     return new LongDoubleFloatDoubleVertexReader();

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
index 300fbb6..a2b8808 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
@@ -21,11 +21,11 @@ package org.apache.giraph.examples;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.io.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.google.common.collect.Lists;
@@ -47,7 +47,7 @@ public class NormalizingLongDoubleFloatDoubleTextInputFormat
 
   @Override
   public TextVertexReader createVertexReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
+      GiraphInputSplit split, TaskAttemptContext context) throws IOException {
     return new NormalizingLongDoubleFloatDoubleVertexReader();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
index 5fd834b..fa0f88e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
@@ -27,13 +27,13 @@ import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.io.GeneratedVertexInputFormat;
 import org.apache.giraph.io.TextVertexOutputFormat;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
@@ -217,7 +217,7 @@ public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
             DoubleWritable, FloatWritable, DoubleWritable> {
     @Override
     public VertexReader<LongWritable, DoubleWritable,
-    FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
+    FloatWritable, DoubleWritable> createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context)
       throws IOException {
       return new SimplePageRankVertexReader();

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
index b166ce0..d65a211 100644
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
@@ -22,13 +22,13 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.io.GeneratedVertexInputFormat;
 import org.apache.giraph.io.TextVertexOutputFormat;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
@@ -120,7 +120,7 @@ public class SimpleSuperstepVertex extends
         IntWritable, FloatWritable, IntWritable> {
     @Override
     public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
-    createVertexReader(InputSplit split, TaskAttemptContext context)
+    createVertexReader(GiraphInputSplit split, TaskAttemptContext context)
       throws IOException {
       return new SimpleSuperstepVertexReader();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
index ee64a46..91f1abc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
@@ -35,6 +35,7 @@ import org.apache.giraph.graph.partition.MasterGraphPartitioner;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.metrics.AggregatedMetrics;
 import org.apache.giraph.metrics.GiraphMetrics;
@@ -58,7 +59,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
@@ -257,11 +257,12 @@ public class BspServiceMaster<I extends WritableComparable,
    * @param inputSplitType Type of input splits (for logging purposes)
    * @return List of input splits for the given format
    */
-  private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
-                                               int numWorkers,
-                                               String inputSplitType) {
+  private List<GiraphInputSplit> generateInputSplits(
+      GiraphInputFormat inputFormat,
+      int numWorkers,
+      String inputSplitType) {
     String logPrefix = "generate" + inputSplitType + "InputSplits";
-    List<InputSplit> splits;
+    List<GiraphInputSplit> splits;
     try {
       splits = inputFormat.getSplits(getContext(), numWorkers);
     } catch (IOException e) {
@@ -277,7 +278,7 @@ public class BspServiceMaster<I extends WritableComparable,
     if (samplePercent !=
         GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
       int lastIndex = (int) (samplePercent * splits.size() / 100f);
-      List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
+      List<GiraphInputSplit> sampleSplits = splits.subList(0, lastIndex);
       LOG.warn(logPrefix + ": Using sampling - Processing only " +
           sampleSplits.size() + " instead of " + splits.size() +
           " expected splits.");
@@ -565,7 +566,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // Note that the input splits may only be a sample if
     // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
-    List<InputSplit> splitList = generateInputSplits(inputFormat,
+    List<GiraphInputSplit> splitList = generateInputSplits(inputFormat,
         healthyWorkerInfoList.size(), inputSplitType);
 
     if (splitList.isEmpty()) {
@@ -594,7 +595,7 @@ public class BspServiceMaster<I extends WritableComparable,
     ExecutorService taskExecutor =
         Executors.newFixedThreadPool(inputSplitThreadCount);
     for (int i = 0; i < splitList.size(); ++i) {
-      InputSplit inputSplit = splitList.get(i);
+      GiraphInputSplit inputSplit = splitList.get(i);
       taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i));
     }
     taskExecutor.shutdown();
@@ -1792,7 +1793,7 @@ public class BspServiceMaster<I extends WritableComparable,
    */
   private class WriteInputSplit implements Callable<Void> {
     /** Input split which we are going to write */
-    private final InputSplit inputSplit;
+    private final GiraphInputSplit inputSplit;
     /** Input splits path */
     private final String inputSplitsPath;
     /** Index of the input split */
@@ -1805,7 +1806,7 @@ public class BspServiceMaster<I extends WritableComparable,
      * @param inputSplitsPath Input splits path
      * @param index Index of the input split
      */
-    public WriteInputSplit(InputSplit inputSplit,
+    public WriteInputSplit(GiraphInputSplit inputSplit,
                            String inputSplitsPath,
                            int index) {
       this.inputSplit = inputSplit;
@@ -1835,9 +1836,8 @@ public class BspServiceMaster<I extends WritableComparable,
         }
         Text.writeString(outputStream,
             locations == null ? "" : locations.toString());
-        Text.writeString(outputStream,
-            inputSplit.getClass().getName());
-        ((Writable) inputSplit).write(outputStream);
+        Text.writeString(outputStream, inputSplit.getClass().getName());
+        inputSplit.write(outputStream);
         inputSplitPath = inputSplitsPath + "/" + index;
         getZkExt().createExt(inputSplitPath,
             byteArrayOutputStream.toByteArray(),

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
index cdc1891..f32a3e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,33 +37,11 @@ import java.util.List;
 public abstract class EdgeInputFormat<I extends WritableComparable,
     E extends Writable> implements GiraphInputFormat {
   /**
-   * Logically split the vertices for a graph processing application.
-   *
-   * Each {@link InputSplit} is then assigned to a worker for processing.
-   *
-   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
-   * input files are not physically split into chunks. For e.g. a split could
-   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
-   * also creates the {@link VertexReader} to read the {@link InputSplit}.
-   *
-   * Also, the number of workers is a hint given to the developer to try to
-   * intelligently determine how many splits to create (if this is
-   * adjustable) at runtime.
-   *
-   * @param context Context of the job
-   * @param numWorkers Number of workers used for this job
-   * @return an array of {@link InputSplit}s for the job.
-   */
-  @Override
-  public abstract List<InputSplit> getSplits(
-      JobContext context, int numWorkers) throws IOException,
-      InterruptedException;
-
-  /**
    * Create an edge reader for a given split. The framework will call
    * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before
    * the split is used.
    *
+   *
    * @param split the split to be read
    * @param context the information about the task
    * @return a new record reader
@@ -70,6 +49,6 @@ public abstract class EdgeInputFormat<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract EdgeReader<I, E> createEdgeReader(
-      InputSplit split,
+      GiraphInputSplit split,
       TaskAttemptContext context) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
index e1cc844..0a7223a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 
@@ -29,14 +30,23 @@ import java.util.List;
  */
 public interface GiraphInputFormat {
   /**
-   * Get the list of input splits for the format.
+   * Logically split the vertices for a graph processing application.
    *
-   * @param context The job context
-   * @param numWorkers Number of workers
-   * @return The list of input splits
-   * @throws IOException
-   * @throws InterruptedException
+   * Each {@link InputSplit} is then assigned to a worker for processing.
+   *
+   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+   * input files are not physically split into chunks. For e.g. a split could
+   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
+   * also creates the {@link VertexReader} to read the {@link InputSplit}.
+   *
+   * Also, the number of workers is a hint given to the developer to try to
+   * intelligently determine how many splits to create (if this is
+   * adjustable) at runtime.
+   *
+   * @param context Context of the job
+   * @param numWorkers Number of workers used for this job
+   * @return an array of {@link InputSplit}s for the job.
    */
-  List<InputSplit> getSplits(JobContext context, int numWorkers)
+  List<GiraphInputSplit> getSplits(JobContext context, int numWorkers)
     throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
index 9824b69..78bc292 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
@@ -18,14 +18,13 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.List;
 
 /**
  * Use this to load data for a BSP application.  Note that the InputSplit must
@@ -43,33 +42,11 @@ public abstract class VertexInputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements GiraphInputFormat {
   /**
-   * Logically split the vertices for a graph processing application.
-   *
-   * Each {@link InputSplit} is then assigned to a worker for processing.
-   *
-   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
-   * input files are not physically split into chunks. For e.g. a split could
-   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
-   * also creates the {@link VertexReader} to read the {@link InputSplit}.
-   *
-   * Also, the number of workers is a hint given to the developer to try to
-   * intelligently determine how many splits to create (if this is
-   * adjustable) at runtime.
-   *
-   * @param context Context of the job
-   * @param numWorkers Number of workers used for this job
-   * @return an array of {@link InputSplit}s for the job.
-   */
-  @Override
-  public abstract List<InputSplit> getSplits(
-    JobContext context, int numWorkers)
-    throws IOException, InterruptedException;
-
-  /**
    * Create a vertex reader for a given split. The framework will call
    * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
    * the split is used.
    *
+   *
    * @param split the split to be read
    * @param context the information about the task
    * @return a new record reader
@@ -77,6 +54,6 @@ public abstract class VertexInputFormat<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract VertexReader<I, V, E, M> createVertexReader(
-      InputSplit split,
+      GiraphInputSplit split,
       TaskAttemptContext context) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
index 77801cc..f8b59b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
@@ -74,14 +74,4 @@ public interface VertexReader<I extends WritableComparable,
    * @throws IOException
    */
   void close() throws IOException;
-
-  /**
-   * How much of the input has the {@link VertexReader} consumed i.e.
-   * has been processed by?
-   *
-   * @return Progress from <code>0.0</code> to <code>1.0</code>.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  float getProgress() throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
index 804d23e..deab07e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,7 +53,7 @@ public abstract class VertexValueInputFormat<I extends WritableComparable,
 
   @Override
   public final VertexReader<I, V, E, M> createVertexReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
+      GiraphInputSplit split, TaskAttemptContext context) throws IOException {
     return createVertexValueReader(split, context);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
index 3b047f3..802d30f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -67,7 +68,7 @@ public abstract class AdjacencyListTextVertexInputFormat<I extends
 
   @Override
   public abstract AdjacencyListTextVertexReader createVertexReader(
-      InputSplit split, TaskAttemptContext context);
+      GiraphInputSplit split, TaskAttemptContext context);
 
   /**
    * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
index 114e75f..4b2a9a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
@@ -18,21 +18,22 @@
 
 package org.apache.giraph.io;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -265,13 +266,13 @@ end[HADOOP_NON_SECURE]*/
    * @return The list of vertex/edge input splits
    * @throws IOException
    */
-  private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
+  private List<GiraphInputSplit> getSplits(JobContext job, List<FileStatus> files)
     throws IOException {
     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     long maxSize = getMaxSplitSize(job);
 
     // generate splits
-    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<GiraphInputSplit> splits = Lists.newArrayList();
 
     for (FileStatus file: files) {
       Path path = file.getPath();
@@ -285,21 +286,22 @@ end[HADOOP_NON_SECURE]*/
         long bytesRemaining = length;
         while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
           int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-          splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+          splits.add(new GiraphFileSplit(path, length - bytesRemaining, splitSize,
               blkLocations[blkIndex].getHosts()));
           bytesRemaining -= splitSize;
         }
 
         if (bytesRemaining != 0) {
-          splits.add(new FileSplit(path, length - bytesRemaining,
+          splits.add(new GiraphFileSplit(path, length - bytesRemaining,
               bytesRemaining,
               blkLocations[blkLocations.length - 1].getHosts()));
         }
       } else if (length != 0) {
-        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+        splits.add(new GiraphFileSplit(path, 0, length,
+            blkLocations[0].getHosts()));
       } else {
         //Create empty hosts array for zero length files
-        splits.add(new FileSplit(path, 0, length, new String[0]));
+        splits.add(new GiraphFileSplit(path, 0, length, new String[0]));
       }
     }
     return splits;
@@ -312,9 +314,10 @@ end[HADOOP_NON_SECURE]*/
    * @return The list of vertex input splits
    * @throws IOException
    */
-  public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
+  public List<GiraphInputSplit> getVertexSplits(JobContext job)
+      throws IOException {
     List<FileStatus> files = listVertexStatus(job);
-    List<InputSplit> splits = getSplits(job, files);
+    List<GiraphInputSplit> splits = getSplits(job, files);
     // Save the number of input files in the job-conf
     job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
     LOG.debug("Total # of vertex splits: " + splits.size());
@@ -328,9 +331,9 @@ end[HADOOP_NON_SECURE]*/
    * @return The list of edge input splits
    * @throws IOException
    */
-  public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
+  public List<GiraphInputSplit> getEdgeSplits(JobContext job) throws IOException {
     List<FileStatus> files = listEdgeStatus(job);
-    List<InputSplit> splits = getSplits(job, files);
+    List<GiraphInputSplit> splits = getSplits(job, files);
     // Save the number of input files in the job-conf
     job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
     LOG.debug("Total # of edge splits: " + splits.size());

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
index 9aa21c7..90a5b50 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -44,7 +45,7 @@ public class IntIntNullIntTextInputFormat extends
   private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
 
   @Override
-  public TextVertexReader createVertexReader(InputSplit split,
+  public TextVertexReader createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context)
     throws IOException {
     return new IntIntNullIntVertexReader();

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
index 4d98657..e03dc6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
@@ -17,11 +17,11 @@
  */
 package org.apache.giraph.io;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.graph.Edge;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.google.common.collect.ImmutableList;
@@ -38,7 +38,7 @@ public class IntNullNullNullTextInputFormat extends TextVertexInputFormat<
     IntWritable, NullWritable, NullWritable, NullWritable> {
   @Override
   public TextVertexReader createVertexReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
+      GiraphInputSplit split, TaskAttemptContext context) throws IOException {
     return new IntNullNullNullVertexReader();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
index ed13b45..6805e45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
@@ -18,11 +18,11 @@
 
 package org.apache.giraph.io;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.utils.IntPair;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
@@ -41,7 +41,7 @@ public class IntNullTextEdgeInputFormat extends
 
   @Override
   public TextEdgeReader createEdgeReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
+      GiraphInputSplit split, TaskAttemptContext context) throws IOException {
     return new IntNullTextEdgeReader();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
index cc5872c..74604f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.io;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.graph.Edge;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -54,7 +55,7 @@ public class JsonBase64VertexInputFormat<I extends WritableComparable,
     extends TextVertexInputFormat<I, V, E, M> {
 
   @Override
-  public TextVertexReader createVertexReader(InputSplit split,
+  public TextVertexReader createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context) {
     return new JsonBase64VertexReader();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
index c01d442..3f98807 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
@@ -19,11 +19,11 @@ package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -44,7 +44,7 @@ public class JsonLongDoubleFloatDoubleVertexInputFormat extends
   FloatWritable, DoubleWritable> {
 
   @Override
-  public TextVertexReader createVertexReader(InputSplit split,
+  public TextVertexReader createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context) {
     return new JsonLongDoubleFloatDoubleVertexReader();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
index acae10b..bd79b61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -18,10 +18,10 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
@@ -39,7 +39,7 @@ public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
     DoubleWritable, M> {
 
   @Override
-  public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+  public AdjacencyListTextVertexReader createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context) {
     return new LongDoubleDoubleAdjacencyListVertexReader(null);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
index bac0a39..a2715e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -72,7 +73,7 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
 
   @Override
   public VertexReader<LongWritable, DoubleWritable, DoubleWritable, M>
-  createVertexReader(InputSplit split, TaskAttemptContext context)
+  createVertexReader(GiraphInputSplit split, TaskAttemptContext context)
     throws IOException {
     return new PseudoRandomVertexReader<M>();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
index a984089..c8e575f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -56,7 +57,7 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable,
   }
 
   @Override
-  public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+  public VertexReader<I, V, E, M> createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context) throws IOException {
     return new SequenceFileVertexReader<I, V, E, M, X>(
         sequenceFileInputFormat.createRecordReader(split, context));

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
index a009000..8307dc8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -18,10 +18,10 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
@@ -36,7 +36,7 @@ public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
             DoubleWritable, M>  {
 
   @Override
-  public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+  public AdjacencyListTextVertexReader createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context) {
     return new TextDoubleDoubleAdjacencyListVertexReader(null);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
index e4cbf94..6dd059c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeInputFormat;
 import org.apache.giraph.graph.EdgeReader;
 import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -50,7 +51,7 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
   protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
 
   @Override
-  public List<InputSplit> getSplits(
+  public List<GiraphInputSplit> getSplits(
       JobContext context, int numWorkers) throws IOException,
       InterruptedException {
     // Ignore the hint of numWorkers here since we are using
@@ -60,7 +61,7 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
 
   @Override
   public abstract TextEdgeReader createEdgeReader(
-      InputSplit split, TaskAttemptContext context) throws IOException;
+      GiraphInputSplit split, TaskAttemptContext context) throws IOException;
 
   /**
    * {@link EdgeReader} for {@link TextEdgeInputFormat}.

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
index e085473..0c1aaca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -65,6 +66,7 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
    * The factory method which produces the {@link TextVertexReader} used by this
    * input format.
    *
+   *
    * @param split
    *          the split to be read
    * @param context
@@ -73,7 +75,7 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
    *         the text vertex reader to be used
    */
   @Override
-  public abstract TextVertexReader createVertexReader(InputSplit split,
+  public abstract TextVertexReader createVertexReader(GiraphInputSplit split,
       TaskAttemptContext context) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
index 92328b7..90cfd51 100644
--- a/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -160,10 +161,10 @@ public abstract class AccumuloVertexInputFormat<
    * @throws InterruptedException
    */
   @Override
-  public List<InputSplit> getSplits(
+  public List<GiraphInputSplit> getSplits(
     JobContext context, int numWorkers)
     throws IOException, InterruptedException {
-    List<InputSplit> splits = null;
+    List<GiraphInputSplit> splits = null;
     try {
       splits = accumuloInputFormat.getSplits(context);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
index 2e91cba..ea1733a 100644
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.io.hcatalog;
 
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,6 +42,8 @@ import org.apache.hcatalog.mapreduce.HCatUtils;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.hcatalog.mapreduce.PartInfo;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -211,12 +214,12 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
    * @throws IOException
    * @throws InterruptedException
    */
-  private List<InputSplit> getSplits(JobContext jobContext,
+  private List<GiraphInputSplit> getSplits(JobContext jobContext,
                                      InputJobInfo inputJobInfo)
     throws IOException, InterruptedException {
     Configuration conf = jobContext.getConfiguration();
 
-    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<GiraphInputSplit> splits = Lists.newArrayList();
     List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
     if (partitionInfoList == null) {
       //No partitions match the specified partition filter
@@ -264,7 +267,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
           inputFormat.getSplits(jobConf, desiredNumSplits);
 
       for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
-        splits.add(new HCatSplit(partitionInfo, split, allCols));
+        splits.add(new GiraphHCatSplit(partitionInfo, split, allCols));
       }
     }
 
@@ -313,7 +316,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
    * @throws IOException
    * @throws InterruptedException
    */
-  public List<InputSplit> getVertexSplits(JobContext jobContext)
+  public List<GiraphInputSplit> getVertexSplits(JobContext jobContext)
     throws IOException, InterruptedException {
     return getSplits(jobContext,
         getVertexJobInfo(jobContext.getConfiguration()));
@@ -327,7 +330,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
    * @throws IOException
    * @throws InterruptedException
    */
-  public List<InputSplit> getEdgeSplits(JobContext jobContext)
+  public List<GiraphInputSplit> getEdgeSplits(JobContext jobContext)
     throws IOException, InterruptedException {
     return getSplits(jobContext,
         getEdgeJobInfo(jobContext.getConfiguration()));
@@ -344,7 +347,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
    * @throws InterruptedException
    */
   private RecordReader<WritableComparable, HCatRecord>
-  createRecordReader(InputSplit split,
+  createRecordReader(GiraphInputSplit split,
                      HCatSchema schema,
                      TaskAttemptContext taskContext)
     throws IOException, InterruptedException {
@@ -392,7 +395,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat {
    * @throws InterruptedException
    */
   public RecordReader<WritableComparable, HCatRecord>
-  createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
+  createEdgeRecordReader(GiraphInputSplit split, TaskAttemptContext taskContext)
     throws IOException, InterruptedException {
     return createRecordReader(split, getEdgeTableSchema(
         taskContext.getConfiguration()), taskContext);

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java
new file mode 100644
index 0000000..1f2515e
--- /dev/null
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java
@@ -0,0 +1,18 @@
+package org.apache.giraph.io.hcatalog;
+
+import org.apache.giraph.input.GiraphInputSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+import org.apache.hcatalog.mapreduce.PartInfo;
+
+public class GiraphHCatSplit extends HCatSplit implements GiraphInputSplit {
+  public GiraphHCatSplit() {
+    super();
+  }
+
+  public GiraphHCatSplit(PartInfo partitionInfo, InputSplit baseMapRedSplit,
+                         HCatSchema tableSchema) {
+    super(partitionInfo, baseMapRedSplit, tableSchema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
index 2112df3..4f52405 100644
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeInputFormat;
 import org.apache.giraph.graph.EdgeReader;
 import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -49,7 +50,8 @@ public abstract class HCatalogEdgeInputFormat<
   private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
 
   @Override
-  public final List<InputSplit> getSplits(JobContext context, int numWorkers)
+  public final List<GiraphInputSplit> getSplits(
+      JobContext context, int numWorkers)
     throws IOException, InterruptedException {
     return hCatInputFormat.getEdgeSplits(context);
   }
@@ -64,7 +66,7 @@ public abstract class HCatalogEdgeInputFormat<
     private TaskAttemptContext context;
 
     @Override
-    public final void initialize(InputSplit inputSplit,
+    public final void initialize(GiraphInputSplit inputSplit,
                                  TaskAttemptContext context)
       throws IOException, InterruptedException {
       hCatRecordReader =
@@ -117,7 +119,7 @@ public abstract class HCatalogEdgeInputFormat<
 
   @Override
   public EdgeReader<I, E>
-  createEdgeReader(InputSplit split, TaskAttemptContext context)
+  createEdgeReader(GiraphInputSplit split, TaskAttemptContext context)
     throws IOException {
     try {
       HCatalogEdgeReader reader = createEdgeReader();

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
index ec49137..2c2d78c 100644
--- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
+++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.input.GiraphInputSplit;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -169,7 +170,7 @@ public abstract class HCatalogVertexInputFormat<
 
   @Override
   public final VertexReader<I, V, E, M>
-  createVertexReader(final InputSplit split,
+  createVertexReader(final GiraphInputSplit split,
                      final TaskAttemptContext context)
     throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
index b670144..7e05697 100644
--- a/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
+++ b/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
@@ -20,12 +20,13 @@ package org.apache.giraph.io.accumulo.edgemarker;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.giraph.graph.Edge;
+import org.apache.giraph.input.GiraphInputSplit;
+import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -44,7 +45,7 @@ public class AccumuloEdgeInputFormat
     private static final Text uselessEdgeValue = new Text();
     private Configuration conf;
     public VertexReader<Text, Text, Text, Text>
-    createVertexReader(InputSplit split, TaskAttemptContext context)
+    createVertexReader(GiraphInputSplit split, TaskAttemptContext context)
             throws IOException {
         try {
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java b/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
index e4e08d6..f766ac0 100644
--- a/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
+++ b/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
@@ -18,6 +18,10 @@
 package org.apache.giraph.io.hbase.edgemarker;
 
 import org.apache.giraph.graph.Edge;
+import com.google.common.collect.Maps;
+
+import org.apache.giraph.input.GiraphInputSplit;
+import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
@@ -45,7 +49,7 @@ public class TableEdgeInputFormat extends
     private static final Text uselessEdgeValue = new Text();
 
     public VertexReader<Text, Text, Text, Text>
-            createVertexReader(InputSplit split,
+            createVertexReader(GiraphInputSplit split,
                                TaskAttemptContext context) throws IOException {
 
         return new TableEdgeVertexReader(split, context);

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java b/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java
new file mode 100644
index 0000000..f6c709b
--- /dev/null
+++ b/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java
@@ -0,0 +1,29 @@
+package org.apache.giraph.input;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+
+/**
+ * @see org.apache.hadoop.mapreduce.InputSplit
+ */
+public interface GiraphInputSplit extends Writable {
+  /**
+   * Get the list of nodes by name where the data for the split would be local.
+   * The locations do not need to be serialized.
+   * @return a new array of the node nodes.
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  String[] getLocations() throws IOException, InterruptedException;
+
+  /**
+   * Get the Hadoop compatible InputSplit. Ideally we shouldn't need this, but
+   * unfortunately we deal with a lot of input formats that strictly require
+   * this class, and unfortunately since it is abstract class (not an interface)
+   *
+   * @return InputSplit, for most implementations will be just "this"
+   */
+  InputSplit getInputSplit();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java b/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java
new file mode 100644
index 0000000..2e00ebb
--- /dev/null
+++ b/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java
@@ -0,0 +1,11 @@
+package org.apache.giraph.io;
+
+import org.apache.giraph.input.GiraphInputSplit;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class GiraphFileSplit extends FileSplit implements GiraphInputSplit {
+  public GiraphFileSplit(Path file, long start, long length, String[] hosts) {
+    super(file, start, length, hosts);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0be32c5..41eab1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -810,7 +810,8 @@ under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.mahout</groupId>
-        <artifactId>mahout-collections</artifactId>
+        <artifactId>maho
+          ut-collections</artifactId>
         <version>1.0</version>
       </dependency>
       <dependency>


Mime
View raw message