spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-15870][SQL] DataFrame can't execute after uncacheTable.
Date Sun, 12 Jun 2016 23:38:02 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 3a1567a73 -> 619a11426


[SPARK-15870][SQL] DataFrame can't execute after uncacheTable.

## What changes were proposed in this pull request?

If a cached `DataFrame` executed more than once and then do `uncacheTable` like the following:

```
    val selectStar = sql("SELECT * FROM testData WHERE key = 1")
    selectStar.createOrReplaceTempView("selectStar")

    spark.catalog.cacheTable("selectStar")
    checkAnswer(
      selectStar,
      Seq(Row(1, "1")))

    spark.catalog.uncacheTable("selectStar")
    checkAnswer(
      selectStar,
      Seq(Row(1, "1")))
```

, then the uncached `DataFrame` can't execute because of `Task not serializable` exception
like:

```
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2038)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:883)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
...
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before
send to executor
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153)
	at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
...
```

Notice that `DataFrame` uncached with `DataFrame.unpersist()` works, but with `spark.catalog.uncacheTable`
doesn't work.

This pr reverts a part of cf38fe0 not to unregister `batchStats` accumulator, which is not
needed to be unregistered here because it will be done by `ContextCleaner` after it is collected
by GC.

## How was this patch tested?

Added a test to check if DataFrame can execute after uncacheTable and other existing tests.
But I made a test to check if the accumulator was cleared as `ignore` because the test would
be flaky.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #13596 from ueshin/issues/SPARK-15870.

(cherry picked from commit caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/619a1142
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/619a1142
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/619a1142

Branch: refs/heads/branch-2.0
Commit: 619a11426aa6a76f88b4c0b4cc45ced8b126cbb7
Parents: 3a1567a
Author: Takuya UESHIN <ueshin@happy-camper.st>
Authored: Sun Jun 12 16:37:44 2016 -0700
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Sun Jun 12 16:37:59 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/CacheManager.scala      |  2 +-
 .../columnar/InMemoryTableScanExec.scala        |  6 ---
 .../org/apache/spark/sql/CachedTableSuite.scala | 41 +++++++++++++++++++-
 3 files changed, 41 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/619a1142/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b584cf4..4e95754 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -109,7 +109,7 @@ private[sql] class CacheManager extends Logging {
     val planToCache = query.queryExecution.analyzed
     val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
     require(dataIndex >= 0, s"Table $query is not cached.")
-    cachedData(dataIndex).cachedRepresentation.uncache(blocking)
+    cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
     cachedData.remove(dataIndex)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/619a1142/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index ff07331..ce630bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -210,12 +210,6 @@ private[sql] case class InMemoryRelation(
 
   override protected def otherCopyArgs: Seq[AnyRef] =
     Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats)
-
-  private[sql] def uncache(blocking: Boolean): Unit = {
-    AccumulatorContext.remove(batchStats.id)
-    cachedColumnBuffers.unpersist(blocking)
-    _cachedColumnBuffers = null
-  }
 }
 
 private[sql] case class InMemoryTableScanExec(

http://git-wip-us.apache.org/repos/asf/spark/blob/619a1142/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e08a9ab..44bafa5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql
 
+import scala.collection.mutable.HashSet
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import org.scalatest.concurrent.Eventually._
 
+import org.apache.spark.CleanerListener
 import org.apache.spark.sql.execution.RDDScanExec
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
@@ -321,7 +323,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     assert(spark.sharedState.cacheManager.isEmpty)
   }
 
-  test("Clear accumulators when uncacheTable to prevent memory leaking") {
+  test("Ensure accumulators to be cleared after GC when uncacheTable") {
     sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1")
     sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2")
 
@@ -333,17 +335,39 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     sql("SELECT * FROM t1").count()
     sql("SELECT * FROM t2").count()
 
+    val toBeCleanedAccIds = new HashSet[Long]
+
     val accId1 = spark.table("t1").queryExecution.withCachedData.collect {
       case i: InMemoryRelation => i.batchStats.id
     }.head
+    toBeCleanedAccIds += accId1
 
     val accId2 = spark.table("t1").queryExecution.withCachedData.collect {
       case i: InMemoryRelation => i.batchStats.id
     }.head
+    toBeCleanedAccIds += accId2
+
+    val cleanerListener = new CleanerListener {
+      def rddCleaned(rddId: Int): Unit = {}
+      def shuffleCleaned(shuffleId: Int): Unit = {}
+      def broadcastCleaned(broadcastId: Long): Unit = {}
+      def accumCleaned(accId: Long): Unit = {
+        toBeCleanedAccIds.synchronized { toBeCleanedAccIds -= accId }
+      }
+      def checkpointCleaned(rddId: Long): Unit = {}
+    }
+    spark.sparkContext.cleaner.get.attachListener(cleanerListener)
 
     spark.catalog.uncacheTable("t1")
     spark.catalog.uncacheTable("t2")
 
+    System.gc()
+
+    eventually(timeout(10 seconds)) {
+      assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty },
+        "batchStats accumulators should be cleared after GC when uncacheTable")
+    }
+
     assert(AccumulatorContext.get(accId1).isEmpty)
     assert(AccumulatorContext.get(accId2).isEmpty)
   }
@@ -513,4 +537,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       spark.catalog.uncacheTable("t2")
     }
   }
+
+  test("SPARK-15870 DataFrame can't execute after uncacheTable") {
+    val selectStar = sql("SELECT * FROM testData WHERE key = 1")
+    selectStar.createOrReplaceTempView("selectStar")
+
+    spark.catalog.cacheTable("selectStar")
+    checkAnswer(
+      selectStar,
+      Seq(Row(1, "1")))
+
+    spark.catalog.uncacheTable("selectStar")
+    checkAnswer(
+      selectStar,
+      Seq(Row(1, "1")))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message