Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1931717522 for ; Tue, 24 Mar 2015 20:55:56 +0000 (UTC) Received: (qmail 24629 invoked by uid 500); 24 Mar 2015 20:55:55 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 24601 invoked by uid 500); 24 Mar 2015 20:55:55 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 24592 invoked by uid 99); 24 Mar 2015 20:55:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Mar 2015 20:55:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 539EAE10A2; Tue, 24 Mar 2015 20:55:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joshrosen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-3266] Use intermediate abstract classes to fix type erasure issues in Java APIs Date: Tue, 24 Mar 2015 20:55:55 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-1.2 8ef69957f -> 61c059a4a [SPARK-3266] Use intermediate abstract classes to fix type erasure issues in Java APIs This PR addresses a Scala compiler bug ([SI-8905](https://issues.scala-lang.org/browse/SI-8905)) that was breaking some of the Spark Java APIs. In a nutshell, it seems that methods whose implementations are inherited from generic traits sometimes have their type parameters erased to Object. This was causing methods like `DoubleRDD.min()` to throw confusing NoSuchMethodErrors at runtime. The fix implemented here is to introduce an intermediate layer of abstract classes and inherit from those instead of directly extends the `Java*Like` traits. This should not break binary compatibility. I also improved the test coverage of the Java API, adding several new tests for methods that failed at runtime due to this bug. Author: Josh Rosen Closes #5050 from JoshRosen/javardd-si-8905-fix and squashes the following commits: 2feb068 [Josh Rosen] Use intermediate abstract classes to work around SPARK-3266 d5f3e5d [Josh Rosen] Add failing regression tests for SPARK-3266 (cherry picked from commit 0f673c21f68ee3d5df3c01ae405709d3c1f4909b) Signed-off-by: Josh Rosen Conflicts: core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala core/src/test/java/org/apache/spark/JavaAPISuite.java Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61c059a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61c059a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61c059a4 Branch: refs/heads/branch-1.2 Commit: 61c059a4ace4007cccbb3ffcc2a382acdaf7196a Parents: 8ef6995 Author: Josh Rosen Authored: Tue Mar 17 09:18:57 2015 -0700 Committer: Josh Rosen Committed: Tue Mar 24 13:47:19 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/api/java/JavaDoubleRDD.scala | 3 +- .../org/apache/spark/api/java/JavaPairRDD.scala | 2 +- .../org/apache/spark/api/java/JavaRDD.scala | 2 +- .../org/apache/spark/api/java/JavaRDDLike.scala | 12 ++ .../java/org/apache/spark/JavaAPISuite.java | 129 +++++++++++++++++++ .../spark/streaming/api/java/JavaDStream.scala | 2 +- .../streaming/api/java/JavaDStreamLike.scala | 9 ++ .../streaming/api/java/JavaPairDStream.scala | 2 +- 8 files changed, 156 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/61c059a4/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index 8e8f7f6..79e4ebf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -32,7 +32,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.StatCounter import org.apache.spark.util.Utils -class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD] { +class JavaDoubleRDD(val srdd: RDD[scala.Double]) + extends AbstractJavaRDDLike[JDouble, JavaDoubleRDD] { override val classTag: ClassTag[JDouble] = implicitly[ClassTag[JDouble]] http://git-wip-us.apache.org/repos/asf/spark/blob/61c059a4/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index e37f3ac..352bec1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V]) - extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { + extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] { override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) http://git-wip-us.apache.org/repos/asf/spark/blob/61c059a4/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 86fb374..645dc3b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) - extends JavaRDDLike[T, JavaRDD[T]] { + extends AbstractJavaRDDLike[T, JavaRDD[T]] { override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) http://git-wip-us.apache.org/repos/asf/spark/blob/61c059a4/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index fa2c1c2..38e9b77 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -39,6 +39,18 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils +/** + * As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations + * of JavaRDDLike should extend this dummy abstract class instead of directly inheriting + * from the trait. See SPARK-3266 for additional details. + */ +private[spark] abstract class AbstractJavaRDDLike[T, This <: JavaRDDLike[T, This]] + extends JavaRDDLike[T, This] + +/** + * Defines operations common to several Java RDD implementations. + * Note that this trait is not intended to be implemented by user code. + */ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def wrapRDD(rdd: RDD[T]): This http://git-wip-us.apache.org/repos/asf/spark/blob/61c059a4/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index e5bdad6..a5e9b19 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -267,6 +267,22 @@ public class JavaAPISuite implements Serializable { } @Test + public void foreachPartition() { + final Accumulator accum = sc.accumulator(0); + JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); + rdd.foreachPartition(new VoidFunction>() { + @Override + public void call(Iterator iter) throws IOException { + while (iter.hasNext()) { + iter.next(); + accum.add(1); + } + } + }); + Assert.assertEquals(2, accum.value().intValue()); + } + + @Test public void toLocalIterator() { List correct = Arrays.asList(1, 2, 3, 4); JavaRDD rdd = sc.parallelize(correct); @@ -605,6 +621,13 @@ public class JavaAPISuite implements Serializable { } @Test + public void toArray() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3)); + List list = rdd.toArray(); + Assert.assertEquals(Arrays.asList(1, 2, 3), list); + } + + @Test public void cartesian() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); JavaRDD stringRDD = sc.parallelize(Arrays.asList("Hello", "World")); @@ -657,6 +680,80 @@ public class JavaAPISuite implements Serializable { Assert.assertArrayEquals(expected_counts, histogram); } + private static class DoubleComparator implements Comparator, Serializable { + public int compare(Double o1, Double o2) { + return o1.compareTo(o2); + } + } + + @Test + public void max() { + JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); + double max = rdd.max(new DoubleComparator()); + Assert.assertEquals(4.0, max, 0.001); + } + + @Test + public void min() { + JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); + double max = rdd.min(new DoubleComparator()); + Assert.assertEquals(1.0, max, 0.001); + } + + @Test + public void takeOrdered() { + JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); + Assert.assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2, new DoubleComparator())); + Assert.assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2)); + } + + @Test + public void top() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + List top2 = rdd.top(2); + Assert.assertEquals(Arrays.asList(4, 3), top2); + } + + private static class AddInts implements Function2 { + @Override + public Integer call(Integer a, Integer b) { + return a + b; + } + } + + @Test + public void reduce() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + int sum = rdd.reduce(new AddInts()); + Assert.assertEquals(10, sum); + } + + @Test + public void reduceOnJavaDoubleRDD() { + JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); + double sum = rdd.reduce(new Function2() { + @Override + public Double call(Double v1, Double v2) throws Exception { + return v1 + v2; + } + }); + Assert.assertEquals(10.0, sum, 0.001); + } + + @Test + public void fold() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + int sum = rdd.fold(0, new AddInts()); + Assert.assertEquals(10, sum); + } + + @Test + public void aggregate() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + int sum = rdd.aggregate(0, new AddInts(), new AddInts()); + Assert.assertEquals(10, sum); + } + @Test public void map() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); @@ -773,6 +870,25 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); } + + @Test + public void mapPartitionsWithIndex() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); + JavaRDD partitionSums = rdd.mapPartitionsWithIndex( + new Function2, Iterator>() { + @Override + public Iterator call(Integer index, Iterator iter) throws Exception { + int sum = 0; + while (iter.hasNext()) { + sum += iter.next(); + } + return Collections.singletonList(sum).iterator(); + } + }, false); + Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); + } + + @Test public void repartition() { // Shrinking number of partitions @@ -1460,6 +1576,19 @@ public class JavaAPISuite implements Serializable { } @Test + public void takeAsync() throws Exception { + List data = Arrays.asList(1, 2, 3, 4, 5); + JavaRDD rdd = sc.parallelize(data, 1); + JavaFutureAction> future = rdd.takeAsync(1); + List result = future.get(); + Assert.assertEquals(1, result.size()); + Assert.assertEquals((Integer) 1, result.get(0)); + Assert.assertFalse(future.isCancelled()); + Assert.assertTrue(future.isDone()); + Assert.assertEquals(1, future.jobIds().size()); + } + + @Test public void foreachAsync() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); http://git-wip-us.apache.org/repos/asf/spark/blob/61c059a4/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index 505e443..01cdcb0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -36,7 +36,7 @@ import org.apache.spark.streaming.dstream.DStream * [[org.apache.spark.streaming.api.java.JavaPairDStream]]. */ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]) - extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] { + extends AbstractJavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] { override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) http://git-wip-us.apache.org/repos/asf/spark/blob/61c059a4/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 2a7004e..d485c73 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -34,6 +34,15 @@ import org.apache.spark.streaming._ import org.apache.spark.streaming.api.java.JavaDStream._ import org.apache.spark.streaming.dstream.DStream +/** + * As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations + * of JavaDStreamLike should extend this dummy abstract class instead of directly inheriting + * from the trait. See SPARK-3266 for additional details. + */ +private[streaming] +abstract class AbstractJavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], + R <: JavaRDDLike[T, R]] extends JavaDStreamLike[T, This, R] + trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] extends Serializable { implicit val classTag: ClassTag[T] http://git-wip-us.apache.org/repos/asf/spark/blob/61c059a4/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 59d4423..8c1949c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -46,7 +46,7 @@ import org.apache.spark.streaming.dstream.DStream class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifest: ClassTag[K], implicit val vManifest: ClassTag[V]) - extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { + extends AbstractJavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org