spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-3266] Use intermediate abstract classes to fix type erasure issues in Java APIs
Date Tue, 17 Mar 2015 16:23:09 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 95f8d1c51 -> 29e39e178


[SPARK-3266] Use intermediate abstract classes to fix type erasure issues in Java APIs

This PR addresses a Scala compiler bug ([SI-8905](https://issues.scala-lang.org/browse/SI-8905))
that was breaking some of the Spark Java APIs.  In a nutshell, it seems that methods whose
implementations are inherited from generic traits sometimes have their type parameters erased
to Object.  This was causing methods like `DoubleRDD.min()` to throw confusing NoSuchMethodErrors
at runtime.

The fix implemented here is to introduce an intermediate layer of abstract classes and inherit
from those instead of directly extends the `Java*Like` traits.  This should not break binary
compatibility.

I also improved the test coverage of the Java API, adding several new tests for methods that
failed at runtime due to this bug.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #5050 from JoshRosen/javardd-si-8905-fix and squashes the following commits:

2feb068 [Josh Rosen] Use intermediate abstract classes to work around SPARK-3266
d5f3e5d [Josh Rosen] Add failing regression tests for SPARK-3266

(cherry picked from commit 0f673c21f68ee3d5df3c01ae405709d3c1f4909b)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29e39e17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29e39e17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29e39e17

Branch: refs/heads/branch-1.3
Commit: 29e39e178e9e21a038cf1aef1d110b368b6d64f7
Parents: 95f8d1c
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Tue Mar 17 09:18:57 2015 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Tue Mar 17 09:19:21 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/api/java/JavaDoubleRDD.scala   |   3 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |   2 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |   2 +-
 .../org/apache/spark/api/java/JavaRDDLike.scala |   8 ++
 .../java/org/apache/spark/JavaAPISuite.java     | 129 +++++++++++++++++++
 .../spark/streaming/api/java/JavaDStream.scala  |   2 +-
 .../streaming/api/java/JavaDStreamLike.scala    |   9 ++
 .../streaming/api/java/JavaPairDStream.scala    |   2 +-
 8 files changed, 152 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/29e39e17/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 8e8f7f6..79e4ebf 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -32,7 +32,8 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.StatCounter
 import org.apache.spark.util.Utils
 
-class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD]
{
+class JavaDoubleRDD(val srdd: RDD[scala.Double])
+  extends AbstractJavaRDDLike[JDouble, JavaDoubleRDD] {
 
   override val classTag: ClassTag[JDouble] = implicitly[ClassTag[JDouble]]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29e39e17/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 7af3538..4eadc9a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -44,7 +44,7 @@ import org.apache.spark.util.Utils
 
 class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
                        (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
-  extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+  extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] {
 
   override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29e39e17/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 86fb374..645dc3b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
 class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
-  extends JavaRDDLike[T, JavaRDD[T]] {
+  extends AbstractJavaRDDLike[T, JavaRDD[T]] {
 
   override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29e39e17/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 0f91c94..8da4293 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -39,6 +39,14 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
 /**
+ * As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations
+ * of JavaRDDLike should extend this dummy abstract class instead of directly inheriting
+ * from the trait. See SPARK-3266 for additional details.
+ */
+private[spark] abstract class AbstractJavaRDDLike[T, This <: JavaRDDLike[T, This]]
+  extends JavaRDDLike[T, This]
+
+/**
  * Defines operations common to several Java RDD implementations.
  * Note that this trait is not intended to be implemented by user code.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/29e39e17/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index b16a1e9..dc9f773 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -268,6 +268,22 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
+  public void foreachPartition() {
+    final Accumulator<Integer> accum = sc.accumulator(0);
+    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
+      @Override
+      public void call(Iterator<String> iter) throws IOException {
+        while (iter.hasNext()) {
+          iter.next();
+          accum.add(1);
+        }
+      }
+    });
+    Assert.assertEquals(2, accum.value().intValue());
+  }
+
+  @Test
   public void toLocalIterator() {
     List<Integer> correct = Arrays.asList(1, 2, 3, 4);
     JavaRDD<Integer> rdd = sc.parallelize(correct);
@@ -658,6 +674,13 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
+  public void toArray() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3));
+    List<Integer> list = rdd.toArray();
+    Assert.assertEquals(Arrays.asList(1, 2, 3), list);
+  }
+
+  @Test
   public void cartesian() {
     JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0,
8.0));
     JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
@@ -710,6 +733,80 @@ public class JavaAPISuite implements Serializable {
     Assert.assertArrayEquals(expected_counts, histogram);
   }
 
+  private static class DoubleComparator implements Comparator<Double>, Serializable
{
+    public int compare(Double o1, Double o2) {
+      return o1.compareTo(o2);
+    }
+  }
+
+  @Test
+  public void max() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    double max = rdd.max(new DoubleComparator());
+    Assert.assertEquals(4.0, max, 0.001);
+  }
+
+  @Test
+  public void min() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    double max = rdd.min(new DoubleComparator());
+    Assert.assertEquals(1.0, max, 0.001);
+  }
+
+  @Test
+  public void takeOrdered() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    Assert.assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2, new DoubleComparator()));
+    Assert.assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2));
+  }
+
+  @Test
+  public void top() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    List<Integer> top2 = rdd.top(2);
+    Assert.assertEquals(Arrays.asList(4, 3), top2);
+  }
+
+  private static class AddInts implements Function2<Integer, Integer, Integer> {
+    @Override
+    public Integer call(Integer a, Integer b) {
+      return a + b;
+    }
+  }
+
+  @Test
+  public void reduce() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    int sum = rdd.reduce(new AddInts());
+    Assert.assertEquals(10, sum);
+  }
+
+  @Test
+  public void reduceOnJavaDoubleRDD() {
+    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+    double sum = rdd.reduce(new Function2<Double, Double, Double>() {
+      @Override
+      public Double call(Double v1, Double v2) throws Exception {
+        return v1 + v2;
+      }
+    });
+    Assert.assertEquals(10.0, sum, 0.001);
+  }
+
+  @Test
+  public void fold() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    int sum = rdd.fold(0, new AddInts());
+    Assert.assertEquals(10, sum);
+  }
+
+  @Test
+  public void aggregate() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    int sum = rdd.aggregate(0, new AddInts(), new AddInts());
+    Assert.assertEquals(10, sum);
+  }
+
   @Test
   public void map() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
@@ -826,6 +923,25 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
   }
 
+
+  @Test
+  public void mapPartitionsWithIndex() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+    JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex(
+      new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
+        @Override
+        public Iterator<Integer> call(Integer index, Iterator<Integer> iter)
throws Exception {
+          int sum = 0;
+          while (iter.hasNext()) {
+            sum += iter.next();
+          }
+          return Collections.singletonList(sum).iterator();
+        }
+    }, false);
+    Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+  }
+
+
   @Test
   public void repartition() {
     // Shrinking number of partitions
@@ -1513,6 +1629,19 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
+  public void takeAsync() throws Exception {
+    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+    JavaRDD<Integer> rdd = sc.parallelize(data, 1);
+    JavaFutureAction<List<Integer>> future = rdd.takeAsync(1);
+    List<Integer> result = future.get();
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals((Integer) 1, result.get(0));
+    Assert.assertFalse(future.isCancelled());
+    Assert.assertTrue(future.isDone());
+    Assert.assertEquals(1, future.jobIds().size());
+  }
+
+  @Test
   public void foreachAsync() throws Exception {
     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
     JavaRDD<Integer> rdd = sc.parallelize(data, 1);

http://git-wip-us.apache.org/repos/asf/spark/blob/29e39e17/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index 505e443..01cdcb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -36,7 +36,7 @@ import org.apache.spark.streaming.dstream.DStream
  * [[org.apache.spark.streaming.api.java.JavaPairDStream]].
  */
 class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T])
-    extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
+    extends AbstractJavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
 
   override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29e39e17/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index c382a12..2eabdd9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -34,6 +34,15 @@ import org.apache.spark.streaming._
 import org.apache.spark.streaming.api.java.JavaDStream._
 import org.apache.spark.streaming.dstream.DStream
 
+/**
+ * As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations
+ * of JavaDStreamLike should extend this dummy abstract class instead of directly inheriting
+ * from the trait. See SPARK-3266 for additional details.
+ */
+private[streaming]
+abstract class AbstractJavaDStreamLike[T, This <: JavaDStreamLike[T, This, R],
+  R <: JavaRDDLike[T, R]] extends JavaDStreamLike[T, This, R]
+
 trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
     extends Serializable {
   implicit val classTag: ClassTag[T]

http://git-wip-us.apache.org/repos/asf/spark/blob/29e39e17/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index bd01789..f94f2d0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -45,7 +45,7 @@ import org.apache.spark.streaming.dstream.DStream
 class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
     implicit val kManifest: ClassTag[K],
     implicit val vManifest: ClassTag[V])
-    extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
+    extends AbstractJavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
 
   override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message