Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 62B7DFEE2 for ; Thu, 21 Mar 2013 17:20:20 +0000 (UTC) Received: (qmail 53739 invoked by uid 500); 21 Mar 2013 17:20:20 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 53713 invoked by uid 500); 21 Mar 2013 17:20:20 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 53706 invoked by uid 99); 21 Mar 2013 17:20:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Mar 2013 17:20:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 084C3522F8; Thu, 21 Mar 2013 17:20:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nitay@apache.org To: commits@giraph.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: GIRAPH-: GiraphInputSplit (nitay) Date: Thu, 21 Mar 2013 17:20:19 +0000 (UTC) Updated Branches: refs/heads/input-split-483 [created] f38d2b885 GIRAPH-: GiraphInputSplit (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f38d2b88 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f38d2b88 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f38d2b88 Branch: refs/heads/input-split-483 Commit: f38d2b8853ab3f24e560be5a1c104ca501ba55f4 Parents: 4585139 Author: Nitay Joffe Authored: Wed Dec 19 14:53:51 2012 -0800 Committer: Nitay Joffe Committed: Wed Dec 19 20:48:21 2012 -0800 ---------------------------------------------------------------------- .../LongDoubleFloatDoubleTextInputFormat.java | 4 +- ...lizingLongDoubleFloatDoubleTextInputFormat.java | 4 +- .../giraph/examples/SimplePageRankVertex.java | 4 +- .../giraph/examples/SimpleSuperstepVertex.java | 4 +- .../org/apache/giraph/graph/BspServiceMaster.java | 26 +++++++------- .../org/apache/giraph/graph/EdgeInputFormat.java | 27 ++------------ .../org/apache/giraph/graph/GiraphInputFormat.java | 24 +++++++++---- .../org/apache/giraph/graph/VertexInputFormat.java | 29 ++------------- .../java/org/apache/giraph/graph/VertexReader.java | 10 ----- .../giraph/graph/VertexValueInputFormat.java | 3 +- .../io/AdjacencyListTextVertexInputFormat.java | 3 +- .../apache/giraph/io/GiraphFileInputFormat.java | 27 ++++++++------ .../giraph/io/IntIntNullIntTextInputFormat.java | 3 +- .../giraph/io/IntNullNullNullTextInputFormat.java | 4 +- .../giraph/io/IntNullTextEdgeInputFormat.java | 4 +- .../giraph/io/JsonBase64VertexInputFormat.java | 3 +- ...JsonLongDoubleFloatDoubleVertexInputFormat.java | 4 +- ...DoubleDoubleAdjacencyListVertexInputFormat.java | 4 +- .../giraph/io/PseudoRandomVertexInputFormat.java | 3 +- .../giraph/io/SequenceFileVertexInputFormat.java | 3 +- ...DoubleDoubleAdjacencyListVertexInputFormat.java | 4 +- .../org/apache/giraph/io/TextEdgeInputFormat.java | 5 ++- .../apache/giraph/io/TextVertexInputFormat.java | 4 ++- .../io/accumulo/AccumuloVertexInputFormat.java | 5 ++- .../giraph/io/hcatalog/GiraphHCatInputFormat.java | 17 +++++---- .../apache/giraph/io/hcatalog/GiraphHCatSplit.java | 18 +++++++++ .../io/hcatalog/HCatalogEdgeInputFormat.java | 8 +++-- .../io/hcatalog/HCatalogVertexInputFormat.java | 3 +- .../edgemarker/AccumuloEdgeInputFormat.java | 5 ++- .../io/hbase/edgemarker/TableEdgeInputFormat.java | 6 +++- .../org/apache/giraph/input/GiraphInputSplit.java | 29 +++++++++++++++ .../java/org/apache/giraph/io/GiraphFileSplit.java | 11 ++++++ pom.xml | 3 +- 33 files changed, 175 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java index b06f07f..a4ca5bf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java @@ -21,11 +21,11 @@ package org.apache.giraph.examples; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.io.TextVertexInputFormat; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import com.google.common.collect.Lists; @@ -45,7 +45,7 @@ public class LongDoubleFloatDoubleTextInputFormat FloatWritable, DoubleWritable> { @Override - public TextVertexReader createVertexReader(InputSplit split, + public TextVertexReader createVertexReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new LongDoubleFloatDoubleVertexReader(); http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java index 300fbb6..a2b8808 100644 --- a/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java @@ -21,11 +21,11 @@ package org.apache.giraph.examples; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.io.TextVertexInputFormat; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import com.google.common.collect.Lists; @@ -47,7 +47,7 @@ public class NormalizingLongDoubleFloatDoubleTextInputFormat @Override public TextVertexReader createVertexReader( - InputSplit split, TaskAttemptContext context) throws IOException { + GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new NormalizingLongDoubleFloatDoubleVertexReader(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java index 5fd834b..fa0f88e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java +++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java @@ -27,13 +27,13 @@ import org.apache.giraph.graph.LongDoubleFloatDoubleVertex; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; import org.apache.giraph.graph.WorkerContext; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.io.GeneratedVertexInputFormat; import org.apache.giraph.io.TextVertexOutputFormat; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; @@ -217,7 +217,7 @@ public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex { DoubleWritable, FloatWritable, DoubleWritable> { @Override public VertexReader createVertexReader(InputSplit split, + FloatWritable, DoubleWritable> createVertexReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new SimplePageRankVertexReader(); http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java index b166ce0..d65a211 100644 --- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java +++ b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java @@ -22,13 +22,13 @@ import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.io.GeneratedVertexInputFormat; import org.apache.giraph.io.TextVertexOutputFormat; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; @@ -120,7 +120,7 @@ public class SimpleSuperstepVertex extends IntWritable, FloatWritable, IntWritable> { @Override public VertexReader - createVertexReader(InputSplit split, TaskAttemptContext context) + createVertexReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new SimpleSuperstepVertexReader(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java index ee64a46..91f1abc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java @@ -35,6 +35,7 @@ import org.apache.giraph.graph.partition.MasterGraphPartitioner; import org.apache.giraph.graph.partition.PartitionOwner; import org.apache.giraph.graph.partition.PartitionStats; import org.apache.giraph.graph.partition.PartitionUtils; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.master.MasterObserver; import org.apache.giraph.metrics.AggregatedMetrics; import org.apache.giraph.metrics.GiraphMetrics; @@ -58,7 +59,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; @@ -257,11 +257,12 @@ public class BspServiceMaster generateInputSplits(GiraphInputFormat inputFormat, - int numWorkers, - String inputSplitType) { + private List generateInputSplits( + GiraphInputFormat inputFormat, + int numWorkers, + String inputSplitType) { String logPrefix = "generate" + inputSplitType + "InputSplits"; - List splits; + List splits; try { splits = inputFormat.getSplits(getContext(), numWorkers); } catch (IOException e) { @@ -277,7 +278,7 @@ public class BspServiceMaster sampleSplits = splits.subList(0, lastIndex); + List sampleSplits = splits.subList(0, lastIndex); LOG.warn(logPrefix + ": Using sampling - Processing only " + sampleSplits.size() + " instead of " + splits.size() + " expected splits."); @@ -565,7 +566,7 @@ public class BspServiceMaster splitList = generateInputSplits(inputFormat, + List splitList = generateInputSplits(inputFormat, healthyWorkerInfoList.size(), inputSplitType); if (splitList.isEmpty()) { @@ -594,7 +595,7 @@ public class BspServiceMaster { /** Input split which we are going to write */ - private final InputSplit inputSplit; + private final GiraphInputSplit inputSplit; /** Input splits path */ private final String inputSplitsPath; /** Index of the input split */ @@ -1805,7 +1806,7 @@ public class BspServiceMaster implements GiraphInputFormat { /** - * Logically split the vertices for a graph processing application. - * - * Each {@link InputSplit} is then assigned to a worker for processing. - * - *

Note: The split is a logical split of the inputs and the - * input files are not physically split into chunks. For e.g. a split could - * be <input-file-path, start, offset> tuple. The InputFormat - * also creates the {@link VertexReader} to read the {@link InputSplit}. - * - * Also, the number of workers is a hint given to the developer to try to - * intelligently determine how many splits to create (if this is - * adjustable) at runtime. - * - * @param context Context of the job - * @param numWorkers Number of workers used for this job - * @return an array of {@link InputSplit}s for the job. - */ - @Override - public abstract List getSplits( - JobContext context, int numWorkers) throws IOException, - InterruptedException; - - /** * Create an edge reader for a given split. The framework will call * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before * the split is used. * + * * @param split the split to be read * @param context the information about the task * @return a new record reader @@ -70,6 +49,6 @@ public abstract class EdgeInputFormat createEdgeReader( - InputSplit split, + GiraphInputSplit split, TaskAttemptContext context) throws IOException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java index e1cc844..0a7223a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java @@ -18,6 +18,7 @@ package org.apache.giraph.graph; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; @@ -29,14 +30,23 @@ import java.util.List; */ public interface GiraphInputFormat { /** - * Get the list of input splits for the format. + * Logically split the vertices for a graph processing application. * - * @param context The job context - * @param numWorkers Number of workers - * @return The list of input splits - * @throws IOException - * @throws InterruptedException + * Each {@link InputSplit} is then assigned to a worker for processing. + * + *

Note: The split is a logical split of the inputs and the + * input files are not physically split into chunks. For e.g. a split could + * be <input-file-path, start, offset> tuple. The InputFormat + * also creates the {@link VertexReader} to read the {@link InputSplit}. + * + * Also, the number of workers is a hint given to the developer to try to + * intelligently determine how many splits to create (if this is + * adjustable) at runtime. + * + * @param context Context of the job + * @param numWorkers Number of workers used for this job + * @return an array of {@link InputSplit}s for the job. */ - List getSplits(JobContext context, int numWorkers) + List getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java index 9824b69..78bc292 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputFormat.java @@ -18,14 +18,13 @@ package org.apache.giraph.graph; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; -import java.util.List; /** * Use this to load data for a BSP application. Note that the InputSplit must @@ -43,33 +42,11 @@ public abstract class VertexInputFormat implements GiraphInputFormat { /** - * Logically split the vertices for a graph processing application. - * - * Each {@link InputSplit} is then assigned to a worker for processing. - * - *

Note: The split is a logical split of the inputs and the - * input files are not physically split into chunks. For e.g. a split could - * be <input-file-path, start, offset> tuple. The InputFormat - * also creates the {@link VertexReader} to read the {@link InputSplit}. - * - * Also, the number of workers is a hint given to the developer to try to - * intelligently determine how many splits to create (if this is - * adjustable) at runtime. - * - * @param context Context of the job - * @param numWorkers Number of workers used for this job - * @return an array of {@link InputSplit}s for the job. - */ - @Override - public abstract List getSplits( - JobContext context, int numWorkers) - throws IOException, InterruptedException; - - /** * Create a vertex reader for a given split. The framework will call * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before * the split is used. * + * * @param split the split to be read * @param context the information about the task * @return a new record reader @@ -77,6 +54,6 @@ public abstract class VertexInputFormat createVertexReader( - InputSplit split, + GiraphInputSplit split, TaskAttemptContext context) throws IOException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java index 77801cc..f8b59b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java @@ -74,14 +74,4 @@ public interface VertexReader0.0 to 1.0. - * @throws IOException - * @throws InterruptedException - */ - float getProgress() throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java index 804d23e..deab07e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java @@ -18,6 +18,7 @@ package org.apache.giraph.graph; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -52,7 +53,7 @@ public abstract class VertexValueInputFormat createVertexReader( - InputSplit split, TaskAttemptContext context) throws IOException { + GiraphInputSplit split, TaskAttemptContext context) throws IOException { return createVertexValueReader(split, context); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java index 3b047f3..802d30f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java @@ -18,6 +18,7 @@ package org.apache.giraph.io; import org.apache.giraph.graph.Edge; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -67,7 +68,7 @@ public abstract class AdjacencyListTextVertexInputFormat getSplits(JobContext job, List files) + private List getSplits(JobContext job, List files) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits - List splits = new ArrayList(); + List splits = Lists.newArrayList(); for (FileStatus file: files) { Path path = file.getPath(); @@ -285,21 +286,22 @@ end[HADOOP_NON_SECURE]*/ long bytesRemaining = length; while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(new FileSplit(path, length - bytesRemaining, splitSize, + splits.add(new GiraphFileSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { - splits.add(new FileSplit(path, length - bytesRemaining, + splits.add(new GiraphFileSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkLocations.length - 1].getHosts())); } } else if (length != 0) { - splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); + splits.add(new GiraphFileSplit(path, 0, length, + blkLocations[0].getHosts())); } else { //Create empty hosts array for zero length files - splits.add(new FileSplit(path, 0, length, new String[0])); + splits.add(new GiraphFileSplit(path, 0, length, new String[0])); } } return splits; @@ -312,9 +314,10 @@ end[HADOOP_NON_SECURE]*/ * @return The list of vertex input splits * @throws IOException */ - public List getVertexSplits(JobContext job) throws IOException { + public List getVertexSplits(JobContext job) + throws IOException { List files = listVertexStatus(job); - List splits = getSplits(job, files); + List splits = getSplits(job, files); // Save the number of input files in the job-conf job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size()); LOG.debug("Total # of vertex splits: " + splits.size()); @@ -328,9 +331,9 @@ end[HADOOP_NON_SECURE]*/ * @return The list of edge input splits * @throws IOException */ - public List getEdgeSplits(JobContext job) throws IOException { + public List getEdgeSplits(JobContext job) throws IOException { List files = listEdgeStatus(job); - List splits = getSplits(job, files); + List splits = getSplits(job, files); // Save the number of input files in the job-conf job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size()); LOG.debug("Total # of edge splits: " + splits.size()); http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java index 9aa21c7..90a5b50 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java @@ -19,6 +19,7 @@ package org.apache.giraph.io; import org.apache.giraph.graph.Edge; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -44,7 +45,7 @@ public class IntIntNullIntTextInputFormat extends private static final Pattern SEPARATOR = Pattern.compile("[\t ]"); @Override - public TextVertexReader createVertexReader(InputSplit split, + public TextVertexReader createVertexReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new IntIntNullIntVertexReader(); http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java index 4d98657..e03dc6c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java @@ -17,11 +17,11 @@ */ package org.apache.giraph.io; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.graph.Edge; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import com.google.common.collect.ImmutableList; @@ -38,7 +38,7 @@ public class IntNullNullNullTextInputFormat extends TextVertexInputFormat< IntWritable, NullWritable, NullWritable, NullWritable> { @Override public TextVertexReader createVertexReader( - InputSplit split, TaskAttemptContext context) throws IOException { + GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new IntNullNullNullVertexReader(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java index ed13b45..6805e45 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java @@ -18,11 +18,11 @@ package org.apache.giraph.io; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.utils.IntPair; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; @@ -41,7 +41,7 @@ public class IntNullTextEdgeInputFormat extends @Override public TextEdgeReader createEdgeReader( - InputSplit split, TaskAttemptContext context) throws IOException { + GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new IntNullTextEdgeReader(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java index cc5872c..74604f9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java @@ -18,6 +18,7 @@ package org.apache.giraph.io; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.graph.Edge; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -54,7 +55,7 @@ public class JsonBase64VertexInputFormat { @Override - public TextVertexReader createVertexReader(InputSplit split, + public TextVertexReader createVertexReader(GiraphInputSplit split, TaskAttemptContext context) { return new JsonBase64VertexReader(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java index c01d442..3f98807 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java @@ -19,11 +19,11 @@ package org.apache.giraph.io; import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.json.JSONArray; import org.json.JSONException; @@ -44,7 +44,7 @@ public class JsonLongDoubleFloatDoubleVertexInputFormat extends FloatWritable, DoubleWritable> { @Override - public TextVertexReader createVertexReader(InputSplit split, + public TextVertexReader createVertexReader(GiraphInputSplit split, TaskAttemptContext context) { return new JsonLongDoubleFloatDoubleVertexReader(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java index acae10b..bd79b61 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java @@ -18,10 +18,10 @@ package org.apache.giraph.io; import org.apache.giraph.graph.Edge; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** @@ -39,7 +39,7 @@ public class LongDoubleDoubleAdjacencyListVertexInputFormat DoubleWritable, M> { @Override - public AdjacencyListTextVertexReader createVertexReader(InputSplit split, + public AdjacencyListTextVertexReader createVertexReader(GiraphInputSplit split, TaskAttemptContext context) { return new LongDoubleDoubleAdjacencyListVertexReader(null); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java index bac0a39..a2715e0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java @@ -24,6 +24,7 @@ import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexInputFormat; import org.apache.giraph.graph.VertexReader; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; @@ -72,7 +73,7 @@ public class PseudoRandomVertexInputFormat extends @Override public VertexReader - createVertexReader(InputSplit split, TaskAttemptContext context) + createVertexReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new PseudoRandomVertexReader(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java index a984089..c8e575f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/SequenceFileVertexInputFormat.java @@ -20,6 +20,7 @@ package org.apache.giraph.io; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexInputFormat; import org.apache.giraph.graph.VertexReader; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -56,7 +57,7 @@ public class SequenceFileVertexInputFormat createVertexReader(InputSplit split, + public VertexReader createVertexReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new SequenceFileVertexReader( sequenceFileInputFormat.createRecordReader(split, context)); http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java index a009000..8307dc8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java @@ -18,10 +18,10 @@ package org.apache.giraph.io; import org.apache.giraph.graph.Edge; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** @@ -36,7 +36,7 @@ public class TextDoubleDoubleAdjacencyListVertexInputFormat DoubleWritable, M> { @Override - public AdjacencyListTextVertexReader createVertexReader(InputSplit split, + public AdjacencyListTextVertexReader createVertexReader(GiraphInputSplit split, TaskAttemptContext context) { return new TextDoubleDoubleAdjacencyListVertexReader(null); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java index e4cbf94..6dd059c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java @@ -23,6 +23,7 @@ import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeInputFormat; import org.apache.giraph.graph.EdgeReader; import org.apache.giraph.graph.EdgeWithSource; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -50,7 +51,7 @@ public abstract class TextEdgeInputFormat getSplits( + public List getSplits( JobContext context, int numWorkers) throws IOException, InterruptedException { // Ignore the hint of numWorkers here since we are using @@ -60,7 +61,7 @@ public abstract class TextEdgeInputFormat getSplits( + public List getSplits( JobContext context, int numWorkers) throws IOException, InterruptedException { - List splits = null; + List splits = null; try { splits = accumuloInputFormat.getSplits(context); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java index 2e91cba..ea1733a 100644 --- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java +++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java @@ -18,6 +18,7 @@ package org.apache.giraph.io.hcatalog; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,6 +42,8 @@ import org.apache.hcatalog.mapreduce.HCatUtils; import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.mapreduce.PartInfo; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -211,12 +214,12 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat { * @throws IOException * @throws InterruptedException */ - private List getSplits(JobContext jobContext, + private List getSplits(JobContext jobContext, InputJobInfo inputJobInfo) throws IOException, InterruptedException { Configuration conf = jobContext.getConfiguration(); - List splits = new ArrayList(); + List splits = Lists.newArrayList(); List partitionInfoList = inputJobInfo.getPartitions(); if (partitionInfoList == null) { //No partitions match the specified partition filter @@ -264,7 +267,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat { inputFormat.getSplits(jobConf, desiredNumSplits); for (org.apache.hadoop.mapred.InputSplit split : baseSplits) { - splits.add(new HCatSplit(partitionInfo, split, allCols)); + splits.add(new GiraphHCatSplit(partitionInfo, split, allCols)); } } @@ -313,7 +316,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat { * @throws IOException * @throws InterruptedException */ - public List getVertexSplits(JobContext jobContext) + public List getVertexSplits(JobContext jobContext) throws IOException, InterruptedException { return getSplits(jobContext, getVertexJobInfo(jobContext.getConfiguration())); @@ -327,7 +330,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat { * @throws IOException * @throws InterruptedException */ - public List getEdgeSplits(JobContext jobContext) + public List getEdgeSplits(JobContext jobContext) throws IOException, InterruptedException { return getSplits(jobContext, getEdgeJobInfo(jobContext.getConfiguration())); @@ -344,7 +347,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat { * @throws InterruptedException */ private RecordReader - createRecordReader(InputSplit split, + createRecordReader(GiraphInputSplit split, HCatSchema schema, TaskAttemptContext taskContext) throws IOException, InterruptedException { @@ -392,7 +395,7 @@ public class GiraphHCatInputFormat extends HCatBaseInputFormat { * @throws InterruptedException */ public RecordReader - createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext) + createEdgeRecordReader(GiraphInputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException { return createRecordReader(split, getEdgeTableSchema( taskContext.getConfiguration()), taskContext); http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java ---------------------------------------------------------------------- diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java new file mode 100644 index 0000000..1f2515e --- /dev/null +++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatSplit.java @@ -0,0 +1,18 @@ +package org.apache.giraph.io.hcatalog; + +import org.apache.giraph.input.GiraphInputSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatSplit; +import org.apache.hcatalog.mapreduce.PartInfo; + +public class GiraphHCatSplit extends HCatSplit implements GiraphInputSplit { + public GiraphHCatSplit() { + super(); + } + + public GiraphHCatSplit(PartInfo partitionInfo, InputSplit baseMapRedSplit, + HCatSchema tableSchema) { + super(partitionInfo, baseMapRedSplit, tableSchema); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java index 2112df3..4f52405 100644 --- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java +++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java @@ -22,6 +22,7 @@ import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeInputFormat; import org.apache.giraph.graph.EdgeReader; import org.apache.giraph.graph.EdgeWithSource; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -49,7 +50,8 @@ public abstract class HCatalogEdgeInputFormat< private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat(); @Override - public final List getSplits(JobContext context, int numWorkers) + public final List getSplits( + JobContext context, int numWorkers) throws IOException, InterruptedException { return hCatInputFormat.getEdgeSplits(context); } @@ -64,7 +66,7 @@ public abstract class HCatalogEdgeInputFormat< private TaskAttemptContext context; @Override - public final void initialize(InputSplit inputSplit, + public final void initialize(GiraphInputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { hCatRecordReader = @@ -117,7 +119,7 @@ public abstract class HCatalogEdgeInputFormat< @Override public EdgeReader - createEdgeReader(InputSplit split, TaskAttemptContext context) + createEdgeReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { try { HCatalogEdgeReader reader = createEdgeReader(); http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java index ec49137..2c2d78c 100644 --- a/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java +++ b/giraph-formats/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java @@ -24,6 +24,7 @@ import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexInputFormat; import org.apache.giraph.graph.VertexReader; +import org.apache.giraph.input.GiraphInputSplit; import org.apache.giraph.utils.TimedLogger; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -169,7 +170,7 @@ public abstract class HCatalogVertexInputFormat< @Override public final VertexReader - createVertexReader(final InputSplit split, + createVertexReader(final GiraphInputSplit split, final TaskAttemptContext context) throws IOException { try { http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java index b670144..7e05697 100644 --- a/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java +++ b/giraph-formats/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java @@ -20,12 +20,13 @@ package org.apache.giraph.io.accumulo.edgemarker; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.giraph.graph.Edge; +import org.apache.giraph.input.GiraphInputSplit; +import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -44,7 +45,7 @@ public class AccumuloEdgeInputFormat private static final Text uselessEdgeValue = new Text(); private Configuration conf; public VertexReader - createVertexReader(InputSplit split, TaskAttemptContext context) + createVertexReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { try { http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java b/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java index e4e08d6..f766ac0 100644 --- a/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java +++ b/giraph-formats/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java @@ -18,6 +18,10 @@ package org.apache.giraph.io.hbase.edgemarker; import org.apache.giraph.graph.Edge; +import com.google.common.collect.Maps; + +import org.apache.giraph.input.GiraphInputSplit; +import org.apache.giraph.io.hbase.HBaseVertexInputFormat; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; import org.apache.giraph.io.hbase.HBaseVertexInputFormat; @@ -45,7 +49,7 @@ public class TableEdgeInputFormat extends private static final Text uselessEdgeValue = new Text(); public VertexReader - createVertexReader(InputSplit split, + createVertexReader(GiraphInputSplit split, TaskAttemptContext context) throws IOException { return new TableEdgeVertexReader(split, context); http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java ---------------------------------------------------------------------- diff --git a/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java b/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java new file mode 100644 index 0000000..f6c709b --- /dev/null +++ b/giraph/src/main/java/org/apache/giraph/input/GiraphInputSplit.java @@ -0,0 +1,29 @@ +package org.apache.giraph.input; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +import java.io.IOException; + +/** + * @see org.apache.hadoop.mapreduce.InputSplit + */ +public interface GiraphInputSplit extends Writable { + /** + * Get the list of nodes by name where the data for the split would be local. + * The locations do not need to be serialized. + * @return a new array of the node nodes. + * @throws java.io.IOException + * @throws InterruptedException + */ + String[] getLocations() throws IOException, InterruptedException; + + /** + * Get the Hadoop compatible InputSplit. Ideally we shouldn't need this, but + * unfortunately we deal with a lot of input formats that strictly require + * this class, and unfortunately since it is abstract class (not an interface) + * + * @return InputSplit, for most implementations will be just "this" + */ + InputSplit getInputSplit(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java ---------------------------------------------------------------------- diff --git a/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java b/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java new file mode 100644 index 0000000..2e00ebb --- /dev/null +++ b/giraph/src/main/java/org/apache/giraph/io/GiraphFileSplit.java @@ -0,0 +1,11 @@ +package org.apache.giraph.io; + +import org.apache.giraph.input.GiraphInputSplit; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +public class GiraphFileSplit extends FileSplit implements GiraphInputSplit { + public GiraphFileSplit(Path file, long start, long length, String[] hosts) { + super(file, start, length, hosts); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f38d2b88/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0be32c5..41eab1a 100644 --- a/pom.xml +++ b/pom.xml @@ -810,7 +810,8 @@ under the License. org.apache.mahout - mahout-collections + maho + ut-collections 1.0