Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0D428200D19 for ; Fri, 22 Sep 2017 00:47:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0BBF21609E4; Thu, 21 Sep 2017 22:47:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4E1331609E1 for ; Fri, 22 Sep 2017 00:47:29 +0200 (CEST) Received: (qmail 85996 invoked by uid 500); 21 Sep 2017 22:47:28 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 85987 invoked by uid 99); 21 Sep 2017 22:47:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Sep 2017 22:47:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60856F32C3; Thu, 21 Sep 2017 22:47:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vanzin@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-21928][CORE] Set classloader on SerializerManager's private kryo Date: Thu, 21 Sep 2017 22:47:28 +0000 (UTC) archived-at: Thu, 21 Sep 2017 22:47:30 -0000 Repository: spark Updated Branches: refs/heads/branch-2.1 56865a1e9 -> 1a4b6eea8 [SPARK-21928][CORE] Set classloader on SerializerManager's private kryo ## What changes were proposed in this pull request? We have to make sure that SerializerManager's private instance of kryo also uses the right classloader, regardless of the current thread classloader. In particular, this fixes serde during remote cache fetches, as those occur in netty threads. ## How was this patch tested? Manual tests & existing suite via jenkins. I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally. I manually verified the warning message is no longer present. (cherry picked from commit b75bd1777496ce0354458bf85603a8087a6a0ff8) Author: Imran Rashid Closes #19313 from squito/SPARK-21928_2.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a4b6eea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a4b6eea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a4b6eea Branch: refs/heads/branch-2.1 Commit: 1a4b6eea86d27c37ccd7e0101f45e8d4c9f51263 Parents: 56865a1 Author: Imran Rashid Authored: Thu Sep 21 15:47:23 2017 -0700 Committer: Marcelo Vanzin Committed: Thu Sep 21 15:47:23 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 +++ .../scala/org/apache/spark/serializer/SerializerManager.scala | 4 ++++ .../src/test/scala/org/apache/spark/executor/ExecutorSuite.scala | 3 ++- 3 files changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1a4b6eea/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3346f6d..bc32436 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -113,6 +113,9 @@ private[spark] class Executor( // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) + // SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads + // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. + env.serializerManager.setDefaultClassLoader(replClassLoader) // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. http://git-wip-us.apache.org/repos/asf/spark/blob/1a4b6eea/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index dd98ea2..5832048 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -42,6 +42,10 @@ private[spark] class SerializerManager( private[this] val kryoSerializer = new KryoSerializer(conf) + def setDefaultClassLoader(classLoader: ClassLoader): Unit = { + kryoSerializer.setDefaultClassLoader(classLoader) + } + private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]] private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { val primitiveClassTags = Set[ClassTag[_]]( http://git-wip-us.apache.org/repos/asf/spark/blob/1a4b6eea/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 683eeee..6b486fa 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.memory.MemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.{FakeTask, Task} -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} class ExecutorSuite extends SparkFunSuite { @@ -47,6 +47,7 @@ class ExecutorSuite extends SparkFunSuite { val mockMemoryManager = mock(classOf[MemoryManager]) when(mockEnv.conf).thenReturn(conf) when(mockEnv.serializer).thenReturn(serializer) + when(mockEnv.serializerManager).thenReturn(mock(classOf[SerializerManager])) when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem) when(mockEnv.memoryManager).thenReturn(mockMemoryManager) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org