spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-21928][CORE] Set classloader on SerializerManager's private kryo
Date Thu, 21 Sep 2017 22:47:28 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 56865a1e9 -> 1a4b6eea8


[SPARK-21928][CORE] Set classloader on SerializerManager's private kryo

## What changes were proposed in this pull request?

We have to make sure that SerializerManager's private instance of
kryo also uses the right classloader, regardless of the current thread
classloader.  In particular, this fixes serde during remote cache
fetches, as those occur in netty threads.

## How was this patch tested?

Manual tests & existing suite via jenkins.  I haven't been able to reproduce this is in
a unit test, because when a remote RDD partition can be fetched, there is a warning message
and then the partition is just recomputed locally.  I manually verified the warning message
is no longer present.

(cherry picked from commit b75bd1777496ce0354458bf85603a8087a6a0ff8)

Author: Imran Rashid <irashid@cloudera.com>

Closes #19313 from squito/SPARK-21928_2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a4b6eea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a4b6eea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a4b6eea

Branch: refs/heads/branch-2.1
Commit: 1a4b6eea86d27c37ccd7e0101f45e8d4c9f51263
Parents: 56865a1
Author: Imran Rashid <irashid@cloudera.com>
Authored: Thu Sep 21 15:47:23 2017 -0700
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Thu Sep 21 15:47:23 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/executor/Executor.scala     | 3 +++
 .../scala/org/apache/spark/serializer/SerializerManager.scala    | 4 ++++
 .../src/test/scala/org/apache/spark/executor/ExecutorSuite.scala | 3 ++-
 3 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a4b6eea/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3346f6d..bc32436 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -113,6 +113,9 @@ private[spark] class Executor(
 
   // Set the classloader for serializer
   env.serializer.setDefaultClassLoader(replClassLoader)
+  // SPARK-21928.  SerializerManager's internal instance of Kryo might get used in netty
threads
+  // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader
too.
+  env.serializerManager.setDefaultClassLoader(replClassLoader)
 
   // Max size of direct result. If task result is bigger than this, we use the block manager
   // to send the result back.

http://git-wip-us.apache.org/repos/asf/spark/blob/1a4b6eea/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index dd98ea2..5832048 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -42,6 +42,10 @@ private[spark] class SerializerManager(
 
   private[this] val kryoSerializer = new KryoSerializer(conf)
 
+  def setDefaultClassLoader(classLoader: ClassLoader): Unit = {
+    kryoSerializer.setDefaultClassLoader(classLoader)
+  }
+
   private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
   private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
     val primitiveClassTags = Set[ClassTag[_]](

http://git-wip-us.apache.org/repos/asf/spark/blob/1a4b6eea/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 683eeee..6b486fa 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.memory.MemoryManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.{FakeTask, Task}
-import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 
 class ExecutorSuite extends SparkFunSuite {
 
@@ -47,6 +47,7 @@ class ExecutorSuite extends SparkFunSuite {
     val mockMemoryManager = mock(classOf[MemoryManager])
     when(mockEnv.conf).thenReturn(conf)
     when(mockEnv.serializer).thenReturn(serializer)
+    when(mockEnv.serializerManager).thenReturn(mock(classOf[SerializerManager]))
     when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
     when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
     when(mockEnv.memoryManager).thenReturn(mockMemoryManager)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message