Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D351F200B26 for ; Mon, 13 Jun 2016 01:38:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D1F85160A5C; Sun, 12 Jun 2016 23:38:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 01DE3160A2C for ; Mon, 13 Jun 2016 01:38:02 +0200 (CEST) Received: (qmail 96133 invoked by uid 500); 12 Jun 2016 23:38:02 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 96124 invoked by uid 99); 12 Jun 2016 23:38:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 12 Jun 2016 23:38:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13762E03A9; Sun, 12 Jun 2016 23:38:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wenchen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-15870][SQL] DataFrame can't execute after uncacheTable. Date: Sun, 12 Jun 2016 23:38:02 +0000 (UTC) archived-at: Sun, 12 Jun 2016 23:38:04 -0000 Repository: spark Updated Branches: refs/heads/branch-2.0 3a1567a73 -> 619a11426 [SPARK-15870][SQL] DataFrame can't execute after uncacheTable. ## What changes were proposed in this pull request? If a cached `DataFrame` executed more than once and then do `uncacheTable` like the following: ``` val selectStar = sql("SELECT * FROM testData WHERE key = 1") selectStar.createOrReplaceTempView("selectStar") spark.catalog.cacheTable("selectStar") checkAnswer( selectStar, Seq(Row(1, "1"))) spark.catalog.uncacheTable("selectStar") checkAnswer( selectStar, Seq(Row(1, "1"))) ``` , then the uncached `DataFrame` can't execute because of `Task not serializable` exception like: ``` org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2038) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:357) at org.apache.spark.rdd.RDD.collect(RDD.scala:883) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290) ... Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ... ``` Notice that `DataFrame` uncached with `DataFrame.unpersist()` works, but with `spark.catalog.uncacheTable` doesn't work. This pr reverts a part of cf38fe0 not to unregister `batchStats` accumulator, which is not needed to be unregistered here because it will be done by `ContextCleaner` after it is collected by GC. ## How was this patch tested? Added a test to check if DataFrame can execute after uncacheTable and other existing tests. But I made a test to check if the accumulator was cleared as `ignore` because the test would be flaky. Author: Takuya UESHIN Closes #13596 from ueshin/issues/SPARK-15870. (cherry picked from commit caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/619a1142 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/619a1142 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/619a1142 Branch: refs/heads/branch-2.0 Commit: 619a11426aa6a76f88b4c0b4cc45ced8b126cbb7 Parents: 3a1567a Author: Takuya UESHIN Authored: Sun Jun 12 16:37:44 2016 -0700 Committer: Wenchen Fan Committed: Sun Jun 12 16:37:59 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/CacheManager.scala | 2 +- .../columnar/InMemoryTableScanExec.scala | 6 --- .../org/apache/spark/sql/CachedTableSuite.scala | 41 +++++++++++++++++++- 3 files changed, 41 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/619a1142/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index b584cf4..4e95754 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -109,7 +109,7 @@ private[sql] class CacheManager extends Logging { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") - cachedData(dataIndex).cachedRepresentation.uncache(blocking) + cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) cachedData.remove(dataIndex) } http://git-wip-us.apache.org/repos/asf/spark/blob/619a1142/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index ff07331..ce630bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -210,12 +210,6 @@ private[sql] case class InMemoryRelation( override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats) - - private[sql] def uncache(blocking: Boolean): Unit = { - AccumulatorContext.remove(batchStats.id) - cachedColumnBuffers.unpersist(blocking) - _cachedColumnBuffers = null - } } private[sql] case class InMemoryTableScanExec( http://git-wip-us.apache.org/repos/asf/spark/blob/619a1142/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index e08a9ab..44bafa5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql +import scala.collection.mutable.HashSet import scala.concurrent.duration._ import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ +import org.apache.spark.CleanerListener import org.apache.spark.sql.execution.RDDScanExec import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exchange.ShuffleExchange @@ -321,7 +323,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(spark.sharedState.cacheManager.isEmpty) } - test("Clear accumulators when uncacheTable to prevent memory leaking") { + test("Ensure accumulators to be cleared after GC when uncacheTable") { sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1") sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2") @@ -333,17 +335,39 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("SELECT * FROM t1").count() sql("SELECT * FROM t2").count() + val toBeCleanedAccIds = new HashSet[Long] + val accId1 = spark.table("t1").queryExecution.withCachedData.collect { case i: InMemoryRelation => i.batchStats.id }.head + toBeCleanedAccIds += accId1 val accId2 = spark.table("t1").queryExecution.withCachedData.collect { case i: InMemoryRelation => i.batchStats.id }.head + toBeCleanedAccIds += accId2 + + val cleanerListener = new CleanerListener { + def rddCleaned(rddId: Int): Unit = {} + def shuffleCleaned(shuffleId: Int): Unit = {} + def broadcastCleaned(broadcastId: Long): Unit = {} + def accumCleaned(accId: Long): Unit = { + toBeCleanedAccIds.synchronized { toBeCleanedAccIds -= accId } + } + def checkpointCleaned(rddId: Long): Unit = {} + } + spark.sparkContext.cleaner.get.attachListener(cleanerListener) spark.catalog.uncacheTable("t1") spark.catalog.uncacheTable("t2") + System.gc() + + eventually(timeout(10 seconds)) { + assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty }, + "batchStats accumulators should be cleared after GC when uncacheTable") + } + assert(AccumulatorContext.get(accId1).isEmpty) assert(AccumulatorContext.get(accId2).isEmpty) } @@ -513,4 +537,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext spark.catalog.uncacheTable("t2") } } + + test("SPARK-15870 DataFrame can't execute after uncacheTable") { + val selectStar = sql("SELECT * FROM testData WHERE key = 1") + selectStar.createOrReplaceTempView("selectStar") + + spark.catalog.cacheTable("selectStar") + checkAnswer( + selectStar, + Seq(Row(1, "1"))) + + spark.catalog.uncacheTable("selectStar") + checkAnswer( + selectStar, + Seq(Row(1, "1"))) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org