spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-7318] [STREAMING] DStream cleans objects that are not closures
Date Tue, 05 May 2015 16:37:59 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 01d402298 -> acc877a98


[SPARK-7318] [STREAMING] DStream cleans objects that are not closures

I added a check in `ClosureCleaner#clean` to fail fast if this is detected in the future.
tdas

Author: Andrew Or <andrew@databricks.com>

Closes #5860 from andrewor14/streaming-closure-cleaner and squashes the following commits:

8e971d7 [Andrew Or] Do not throw exception if object to clean is not closure
5ee4e25 [Andrew Or] Fix tests
eed3390 [Andrew Or] Merge branch 'master' of github.com:apache/spark into streaming-closure-cleaner
67eeff4 [Andrew Or] Add tests
a4fa768 [Andrew Or] Clean the closure, not the RDD

(cherry picked from commit 57e9f29e17d97ed9d0f110fb2ce5a075b854a841)
Signed-off-by: Andrew Or <andrew@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acc877a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acc877a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acc877a9

Branch: refs/heads/branch-1.4
Commit: acc877a989207789ad4bfec3bae43f484486f7a2
Parents: 01d4022
Author: Andrew Or <andrew@databricks.com>
Authored: Tue May 5 09:37:49 2015 -0700
Committer: Andrew Or <andrew@databricks.com>
Committed: Tue May 5 09:37:56 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala  | 5 +++++
 .../main/scala/org/apache/spark/streaming/dstream/DStream.scala | 3 ++-
 .../test/scala/org/apache/spark/streaming/ReceiverSuite.scala   | 4 ++--
 3 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/acc877a9/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 19fe6cb..6fe32e4 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -179,6 +179,11 @@ private[spark] object ClosureCleaner extends Logging {
       cleanTransitively: Boolean,
       accessedFields: Map[Class[_], Set[String]]): Unit = {
 
+    if (!isClosure(func.getClass)) {
+      logWarning("Expected a closure; got " + func.getClass.getName)
+      return
+    }
+
     // TODO: clean all inner closures first. This requires us to find the inner objects.
     // TODO: cache outerClasses / innerClasses / accessedFields
 

http://git-wip-us.apache.org/repos/asf/spark/blob/acc877a9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 83d41f5..f1f8a70 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] (
     // because the DStream is reachable from the outer object here, and because 
     // DStreams can't be serialized with closures, we can't proactively check 
     // it for serializability and so we pass the optional false to SparkContext.clean
-    transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
+    val cleanedF = context.sparkContext.clean(transformFunc, false)
+    transform((r: RDD[T], t: Time) => cleanedF(r))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/acc877a9/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 393a360..5d71276 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable
{
     }
 
     withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
-      val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
-      val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
+      val receiver1 = new FakeReceiver(sendData = true)
+      val receiver2 = new FakeReceiver(sendData = true)
       val receiverStream1 = ssc.receiverStream(receiver1)
       val receiverStream2 = ssc.receiverStream(receiver2)
       receiverStream1.register()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message