spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: SPARK-2028: Expose mapPartitionsWithInputSplit in HadoopRDD
Date Thu, 31 Jul 2014 18:35:43 GMT
Repository: spark
Updated Branches:
  refs/heads/master 72cfb1398 -> f19331235


SPARK-2028: Expose mapPartitionsWithInputSplit in HadoopRDD

This allows users to gain access to the InputSplit which backs each partition.

An alternative solution would have been to have a .withInputSplit() method which returns a
new RDD[(InputSplit, (K, V))], but this is confusing because you could not cache this RDD
or shuffle it, as InputSplit is not inherently serializable.

Author: Aaron Davidson <aaron@databricks.com>

Closes #973 from aarondav/hadoop and squashes the following commits:

9c9112b [Aaron Davidson] Add JavaAPISuite test
9942cd7 [Aaron Davidson] Add Java API
1284a3a [Aaron Davidson] SPARK-2028: Expose mapPartitionsWithInputSplit in HadoopRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1933123
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1933123
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1933123

Branch: refs/heads/master
Commit: f1933123525e7c806f5fc0b0a46a78a7546f8b61
Parents: 72cfb13
Author: Aaron Davidson <aaron@databricks.com>
Authored: Thu Jul 31 11:35:38 2014 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Thu Jul 31 11:35:38 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/api/java/JavaHadoopRDD.scala   | 43 ++++++++++++++++++++
 .../spark/api/java/JavaNewHadoopRDD.scala       | 43 ++++++++++++++++++++
 .../spark/api/java/JavaSparkContext.scala       | 21 ++++++----
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 32 +++++++++++++++
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 34 ++++++++++++++++
 .../java/org/apache/spark/JavaAPISuite.java     | 26 +++++++++++-
 .../test/scala/org/apache/spark/FileSuite.scala | 34 +++++++++++++++-
 7 files changed, 222 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f1933123/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
new file mode 100644
index 0000000..0ae0b4e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.mapred.InputSplit
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.java.JavaSparkContext._
+import org.apache.spark.api.java.function.{Function2 => JFunction2}
+import org.apache.spark.rdd.HadoopRDD
+
+@DeveloperApi
+class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
+    (implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
+  extends JavaPairRDD[K, V](rdd) {
+
+  /** Maps over a partition, providing the InputSplit that was used as the base of the partition.
*/
+  @DeveloperApi
+  def mapPartitionsWithInputSplit[R](
+      f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
+      preservesPartitioning: Boolean = false): JavaRDD[R] = {
+    new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
+      preservesPartitioning)(fakeClassTag))(fakeClassTag)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f1933123/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
new file mode 100644
index 0000000..ec4f396
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.mapreduce.InputSplit
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.java.JavaSparkContext._
+import org.apache.spark.api.java.function.{Function2 => JFunction2}
+import org.apache.spark.rdd.NewHadoopRDD
+
+@DeveloperApi
+class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
+    (implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
+  extends JavaPairRDD[K, V](rdd) {
+
+  /** Maps over a partition, providing the InputSplit that was used as the base of the partition.
*/
+  @DeveloperApi
+  def mapPartitionsWithInputSplit[R](
+      f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
+      preservesPartitioning: Boolean = false): JavaRDD[R] = {
+    new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
+      preservesPartitioning)(fakeClassTag))(fakeClassTag)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f1933123/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 8a5f808..d9d1c59 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -34,7 +34,7 @@ import org.apache.spark._
 import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.{EmptyRDD, RDD}
+import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
 
 /**
  * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
@@ -294,7 +294,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     ): JavaPairRDD[K, V] = {
     implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
     implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
+    val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)
+    new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
   }
 
   /**
@@ -314,7 +315,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     ): JavaPairRDD[K, V] = {
     implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
     implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
+    val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)
+    new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
   }
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat.
@@ -333,7 +335,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     ): JavaPairRDD[K, V] = {
     implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
     implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
+    val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
+    new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
   }
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -351,8 +354,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     ): JavaPairRDD[K, V] = {
     implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
     implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
-    new JavaPairRDD(sc.hadoopFile(path,
-      inputFormatClass, keyClass, valueClass))
+    val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass)
+    new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
   }
 
   /**
@@ -372,7 +375,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     conf: Configuration): JavaPairRDD[K, V] = {
     implicit val ctagK: ClassTag[K] = ClassTag(kClass)
     implicit val ctagV: ClassTag[V] = ClassTag(vClass)
-    new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
+    val rdd = sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)
+    new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
   }
 
   /**
@@ -391,7 +395,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     vClass: Class[V]): JavaPairRDD[K, V] = {
     implicit val ctagK: ClassTag[K] = ClassTag(kClass)
     implicit val ctagV: ClassTag[V] = ClassTag(vClass)
-    new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
+    val rdd = sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)
+    new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
   }
 
   /** Build the union of two or more RDDs. */

http://git-wip-us.apache.org/repos/asf/spark/blob/f1933123/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e521612..8d92ea0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -20,7 +20,9 @@ package org.apache.spark.rdd
 import java.text.SimpleDateFormat
 import java.util.Date
 import java.io.EOFException
+
 import scala.collection.immutable.Map
+import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.mapred.FileSplit
@@ -39,6 +41,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
 import org.apache.spark.util.NextIterator
 
 /**
@@ -232,6 +235,14 @@ class HadoopRDD[K, V](
     new InterruptibleIterator[(K, V)](context, iter)
   }
 
+  /** Maps over a partition, providing the InputSplit that was used as the base of the partition.
*/
+  @DeveloperApi
+  def mapPartitionsWithInputSplit[U: ClassTag](
+      f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
+      preservesPartitioning: Boolean = false): RDD[U] = {
+    new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+  }
+
   override def getPreferredLocations(split: Partition): Seq[String] = {
     // TODO: Filtering out "localhost" in case of file:// URLs
     val hadoopSplit = split.asInstanceOf[HadoopPartition]
@@ -272,4 +283,25 @@ private[spark] object HadoopRDD {
     conf.setInt("mapred.task.partition", splitId)
     conf.set("mapred.job.id", jobID.toString)
   }
+
+  /**
+   * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit
to
+   * the given function rather than the index of the partition.
+   */
+  private[spark] class HadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
+      prev: RDD[T],
+      f: (InputSplit, Iterator[T]) => Iterator[U],
+      preservesPartitioning: Boolean = false)
+    extends RDD[U](prev) {
+
+    override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else
None
+
+    override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+    override def compute(split: Partition, context: TaskContext) = {
+      val partition = split.asInstanceOf[HadoopPartition]
+      val inputSplit = partition.inputSplit.value
+      f(inputSplit, firstParent[T].iterator(split, context))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1933123/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index f2b3a64..7dfec9a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import scala.reflect.ClassTag
+
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
@@ -32,6 +34,7 @@ import org.apache.spark.Partition
 import org.apache.spark.SerializableWritable
 import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 
 private[spark] class NewHadoopPartition(
     rddId: Int,
@@ -157,6 +160,14 @@ class NewHadoopRDD[K, V](
     new InterruptibleIterator(context, iter)
   }
 
+  /** Maps over a partition, providing the InputSplit that was used as the base of the partition.
*/
+  @DeveloperApi
+  def mapPartitionsWithInputSplit[U: ClassTag](
+      f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
+      preservesPartitioning: Boolean = false): RDD[U] = {
+    new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+  }
+
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[NewHadoopPartition]
     theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
@@ -165,6 +176,29 @@ class NewHadoopRDD[K, V](
   def getConf: Configuration = confBroadcast.value.value
 }
 
+private[spark] object NewHadoopRDD {
+  /**
+   * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit
to
+   * the given function rather than the index of the partition.
+   */
+  private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
+      prev: RDD[T],
+      f: (InputSplit, Iterator[T]) => Iterator[U],
+      preservesPartitioning: Boolean = false)
+    extends RDD[U](prev) {
+
+    override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else
None
+
+    override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+    override def compute(split: Partition, context: TaskContext) = {
+      val partition = split.asInstanceOf[NewHadoopPartition]
+      val inputSplit = partition.serializableHadoopSplit.value
+      f(inputSplit, firstParent[T].iterator(split, context))
+    }
+  }
+}
+
 private[spark] class WholeTextFileRDD(
     sc : SparkContext,
     inputFormatClass: Class[_ <: WholeTextFileInputFormat],

http://git-wip-us.apache.org/repos/asf/spark/blob/f1933123/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index fab64a5..56150ca 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -25,19 +25,23 @@ import scala.Tuple2;
 import scala.Tuple3;
 import scala.Tuple4;
 
-
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.base.Optional;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.After;
 import org.junit.Assert;
@@ -45,6 +49,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaHadoopRDD;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -1262,4 +1267,23 @@ public class JavaAPISuite implements Serializable {
     SomeCustomClass[] collected = (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
     Assert.assertEquals(data.size(), collected.length);
   }
+
+  public void getHadoopInputSplits() {
+    String outDir = new File(tempDir, "output").getAbsolutePath();
+    sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2).saveAsTextFile(outDir);
+
+    JavaHadoopRDD<LongWritable, Text> hadoopRDD = (JavaHadoopRDD<LongWritable, Text>)
+        sc.hadoopFile(outDir, TextInputFormat.class, LongWritable.class, Text.class);
+    List<String> inputPaths = hadoopRDD.mapPartitionsWithInputSplit(
+        new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>>()
{
+      @Override
+      public Iterator<String> call(InputSplit split, Iterator<Tuple2<LongWritable,
Text>> it)
+          throws Exception {
+        FileSplit fileSplit = (FileSplit) split;
+        return Lists.newArrayList(fileSplit.getPath().toUri().getPath()).iterator();
+      }
+    }, true).collect();
+    Assert.assertEquals(Sets.newHashSet(inputPaths),
+        Sets.newHashSet(outDir + "/part-00000", outDir + "/part-00001"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1933123/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index c70e22c..4a53d25 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -24,12 +24,14 @@ import scala.io.Source
 import com.google.common.io.Files
 import org.apache.hadoop.io._
 import org.apache.hadoop.io.compress.DefaultCodec
-import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat,
TextOutputFormat}
 import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat
=> NewTextInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
 import org.scalatest.FunSuite
 
 import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{NewHadoopRDD, HadoopRDD}
 import org.apache.spark.util.Utils
 
 class FileSuite extends FunSuite with LocalSparkContext {
@@ -318,4 +320,32 @@ class FileSuite extends FunSuite with LocalSparkContext {
     randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
     assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
   }
+
+  test("Get input files via old Hadoop API") {
+    sc = new SparkContext("local", "test")
+    val outDir = new File(tempDir, "output").getAbsolutePath
+    sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir)
+
+    val inputPaths =
+      sc.hadoopFile(outDir, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
+        .asInstanceOf[HadoopRDD[_, _]]
+        .mapPartitionsWithInputSplit { (split, part) =>
+          Iterator(split.asInstanceOf[FileSplit].getPath.toUri.getPath)
+        }.collect()
+    assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
+  }
+
+  test("Get input files via new Hadoop API") {
+    sc = new SparkContext("local", "test")
+    val outDir = new File(tempDir, "output").getAbsolutePath
+    sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir)
+
+    val inputPaths =
+      sc.newAPIHadoopFile(outDir, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text])
+        .asInstanceOf[NewHadoopRDD[_, _]]
+        .mapPartitionsWithInputSplit { (split, part) =>
+          Iterator(split.asInstanceOf[NewFileSplit].getPath.toUri.getPath)
+        }.collect()
+    assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
+  }
 }


Mime
View raw message