spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-25737][CORE] Remove JavaSparkContextVarargsWorkaround
Date Wed, 24 Oct 2018 19:43:55 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7251be0c0 -> f83fedc9f


[SPARK-25737][CORE] Remove JavaSparkContextVarargsWorkaround

## What changes were proposed in this pull request?

Remove JavaSparkContextVarargsWorkaround

## How was this patch tested?

Existing tests.

Closes #22729 from srowen/SPARK-25737.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f83fedc9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f83fedc9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f83fedc9

Branch: refs/heads/master
Commit: f83fedc9f20869ab4c62bb07bac50113d921207f
Parents: 7251be0
Author: Sean Owen <sean.owen@databricks.com>
Authored: Wed Oct 24 14:43:51 2018 -0500
Committer: Sean Owen <sean.owen@databricks.com>
Committed: Wed Oct 24 14:43:51 2018 -0500

----------------------------------------------------------------------
 .../java/JavaSparkContextVarargsWorkaround.java | 67 --------------------
 .../spark/api/java/JavaSparkContext.scala       | 42 ++++++------
 .../test/org/apache/spark/JavaAPISuite.java     |  5 --
 .../streaming/JavaKinesisWordCountASL.java      |  2 +-
 project/MimaExcludes.scala                      |  7 ++
 python/pyspark/context.py                       |  8 ++-
 python/pyspark/streaming/context.py             |  8 ++-
 .../api/java/JavaStreamingContext.scala         | 27 ++++----
 8 files changed, 53 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
deleted file mode 100644
index 0dd8faf..0000000
--- a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java;
-
-import java.util.ArrayList;
-import java.util.List;
-
-// See
-// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
-abstract class JavaSparkContextVarargsWorkaround {
-
-  @SafeVarargs
-  public final <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    List<JavaRDD<T>> rest = new ArrayList<>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    List<JavaDoubleRDD> rest = new ArrayList<>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  @SafeVarargs
-  public final <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds)
{
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    List<JavaPairRDD<K, V>> rest = new ArrayList<>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  // These methods take separate "first" and "rest" elements to avoid having the same type
erasure
-  public abstract <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>>
rest);
-  public abstract JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
-  public abstract <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first,
List<JavaPairRDD<K, V>>
-    rest);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index ef15f95..03f259d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -21,6 +21,7 @@ import java.io.Closeable
 import java.util
 import java.util.{Map => JMap}
 
+import scala.annotation.varargs
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
@@ -33,7 +34,7 @@ import org.apache.spark._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.input.PortableDataStream
-import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
+import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD}
 
 /**
  * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
@@ -42,8 +43,7 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
  * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext
before
  * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more
details.
  */
-class JavaSparkContext(val sc: SparkContext)
-  extends JavaSparkContextVarargsWorkaround with Closeable {
+class JavaSparkContext(val sc: SparkContext) extends Closeable {
 
   /**
    * Create a JavaSparkContext that loads settings from system properties (for instance,
when
@@ -506,27 +506,29 @@ class JavaSparkContext(val sc: SparkContext)
     new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
   }
 
-  /** Build the union of two or more RDDs. */
-  override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T]
= {
-    val rdds: Seq[RDD[T]] = (Seq(first) ++ rest.asScala).map(_.rdd)
-    implicit val ctag: ClassTag[T] = first.classTag
-    sc.union(rdds)
+  /** Build the union of JavaRDDs. */
+  @varargs
+  def union[T](rdds: JavaRDD[T]*): JavaRDD[T] = {
+    require(rdds.nonEmpty, "Union called on no RDDs")
+    implicit val ctag: ClassTag[T] = rdds.head.classTag
+    sc.union(rdds.map(_.rdd))
   }
 
-  /** Build the union of two or more RDDs. */
-  override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K,
V]])
-      : JavaPairRDD[K, V] = {
-    val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ rest.asScala).map(_.rdd)
-    implicit val ctag: ClassTag[(K, V)] = first.classTag
-    implicit val ctagK: ClassTag[K] = first.kClassTag
-    implicit val ctagV: ClassTag[V] = first.vClassTag
-    new JavaPairRDD(sc.union(rdds))
+  /** Build the union of JavaPairRDDs. */
+  @varargs
+  def union[K, V](rdds: JavaPairRDD[K, V]*): JavaPairRDD[K, V] = {
+    require(rdds.nonEmpty, "Union called on no RDDs")
+    implicit val ctag: ClassTag[(K, V)] = rdds.head.classTag
+    implicit val ctagK: ClassTag[K] = rdds.head.kClassTag
+    implicit val ctagV: ClassTag[V] = rdds.head.vClassTag
+    new JavaPairRDD(sc.union(rdds.map(_.rdd)))
   }
 
-  /** Build the union of two or more RDDs. */
-  override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD
= {
-    val rdds: Seq[RDD[Double]] = (Seq(first) ++ rest.asScala).map(_.srdd)
-    new JavaDoubleRDD(sc.union(rdds))
+  /** Build the union of JavaDoubleRDDs. */
+  @varargs
+  def union(rdds: JavaDoubleRDD*): JavaDoubleRDD = {
+    require(rdds.nonEmpty, "Union called on no RDDs")
+    new JavaDoubleRDD(sc.union(rdds.map(_.srdd)))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index 365a93d..f979f9e 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -106,11 +106,6 @@ public class JavaAPISuite implements Serializable {
     // Varargs
     JavaRDD<String> sUnion = sc.union(s1, s2);
     assertEquals(4, sUnion.count());
-    // List
-    List<JavaRDD<String>> list = new ArrayList<>();
-    list.add(s2);
-    sUnion = sc.union(s1, list);
-    assertEquals(4, sUnion.count());
 
     // Union of JavaDoubleRDDs
     List<Double> doubles = Arrays.asList(1.0, 2.0);

http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index 626bde4..86c42df 100644
--- a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -145,7 +145,7 @@ public final class JavaKinesisWordCountASL { // needs to be public for
access fr
     // Union all the streams if there is more than 1 stream
     JavaDStream<byte[]> unionStreams;
     if (streamsList.size() > 1) {
-      unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
+      unionStreams = jssc.union(streamsList.toArray(new JavaDStream[0]));
     } else {
       // Otherwise, just use the 1 stream
       unionStreams = streamsList.get(0);

http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index d6beac1..350d8ad 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,10 @@ object MimaExcludes {
 
   // Exclude rules for 3.0.x
   lazy val v30excludes = v24excludes ++ Seq(
+    // [SPARK-25737] Remove JavaSparkContextVarargsWorkaround
+    ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.api.java.JavaSparkContext"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.union"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.union"),
     // [SPARK-16775] Remove deprecated accumulator v1 APIs
     ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Accumulable"),
     ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulatorParam"),
@@ -55,9 +59,12 @@ object MimaExcludes {
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.accumulable"),
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.doubleAccumulator"),
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.accumulator"),
+    // [SPARK-24109] Remove class SnappyOutputStreamWrapper
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version"),
+    // [SPARK-19287] JavaPairRDD flatMapValues requires function returning Iterable, not
Iterator
     ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.java.JavaPairRDD.flatMapValues"),
     ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaPairDStream.flatMapValues"),
+    // [SPARK-25680] SQL execution listener shouldn't happen on execution thread
     ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.util.ExecutionListenerManager.clone"),
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.util.ExecutionListenerManager.this")
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 0924d3d..1180bf9 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -834,9 +834,11 @@ class SparkContext(object):
         first_jrdd_deserializer = rdds[0]._jrdd_deserializer
         if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds):
             rdds = [x._reserialize() for x in rdds]
-        first = rdds[0]._jrdd
-        rest = [x._jrdd for x in rdds[1:]]
-        return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer)
+        cls = SparkContext._jvm.org.apache.spark.api.java.JavaRDD
+        jrdds = SparkContext._gateway.new_array(cls, len(rdds))
+        for i in range(0, len(rdds)):
+            jrdds[i] = rdds[i]._jrdd
+        return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer)
 
     def broadcast(self, value):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 3fa57ca..e1c194b 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -343,9 +343,11 @@ class StreamingContext(object):
             raise ValueError("All DStreams should have same serializer")
         if len(set(s._slideDuration for s in dstreams)) > 1:
             raise ValueError("All DStreams should have same slide duration")
-        first = dstreams[0]
-        jrest = [d._jdstream for d in dstreams[1:]]
-        return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer)
+        cls = SparkContext._jvm.org.apache.spark.streaming.api.java.JavaDStream
+        jdstreams = SparkContext._gateway.new_array(cls, len(dstreams))
+        for i in range(0, len(dstreams)):
+            jdstreams[i] = dstreams[i]._jdstream
+        return DStream(self._jssc.union(jdstreams), self, dstreams[0]._jrdd_deserializer)
 
     def addStreamingListener(self, streamingListener):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 982e72c..e61c0d4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -21,6 +21,7 @@ import java.io.{Closeable, InputStream}
 import java.lang.{Boolean => JBoolean}
 import java.util.{List => JList, Map => JMap}
 
+import scala.annotation.varargs
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
@@ -36,7 +37,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.streaming.scheduler.StreamingListener
 
@@ -431,24 +431,23 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable
{
   /**
    * Create a unified DStream from multiple DStreams of the same type and same slide duration.
    */
-  def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
-    val dstreams: Seq[DStream[T]] = (Seq(first) ++ rest.asScala).map(_.dstream)
-    implicit val cm: ClassTag[T] = first.classTag
-    ssc.union(dstreams)(cm)
+  @varargs
+  def union[T](jdstreams: JavaDStream[T]*): JavaDStream[T] = {
+    require(jdstreams.nonEmpty, "Union called on no streams")
+    implicit val cm: ClassTag[T] = jdstreams.head.classTag
+    ssc.union(jdstreams.map(_.dstream))(cm)
   }
 
   /**
    * Create a unified DStream from multiple DStreams of the same type and same slide duration.
    */
-  def union[K, V](
-      first: JavaPairDStream[K, V],
-      rest: JList[JavaPairDStream[K, V]]
-    ): JavaPairDStream[K, V] = {
-    val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ rest.asScala).map(_.dstream)
-    implicit val cm: ClassTag[(K, V)] = first.classTag
-    implicit val kcm: ClassTag[K] = first.kManifest
-    implicit val vcm: ClassTag[V] = first.vManifest
-    new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm)
+  @varargs
+  def union[K, V](jdstreams: JavaPairDStream[K, V]*): JavaPairDStream[K, V] = {
+    require(jdstreams.nonEmpty, "Union called on no streams")
+    implicit val cm: ClassTag[(K, V)] = jdstreams.head.classTag
+    implicit val kcm: ClassTag[K] = jdstreams.head.kManifest
+    implicit val vcm: ClassTag[V] = jdstreams.head.vManifest
+    new JavaPairDStream[K, V](ssc.union(jdstreams.map(_.dstream))(cm))(kcm, vcm)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message