Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44E841070B for ; Fri, 20 Feb 2015 23:37:47 +0000 (UTC) Received: (qmail 6545 invoked by uid 500); 20 Feb 2015 23:37:47 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 6511 invoked by uid 500); 20 Feb 2015 23:37:47 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 6502 invoked by uid 99); 20 Feb 2015 23:37:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2015 23:37:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0AFE0E0534; Fri, 20 Feb 2015 23:37:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marmbrus@apache.org To: commits@spark.apache.org Message-Id: <723b6bce385149ceb5b6651054e15185@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-5909][SQL] Add a clearCache command to Spark SQL's cache manager Date: Fri, 20 Feb 2015 23:37:47 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-1.3 913562ae7 -> b9a6c5c84 [SPARK-5909][SQL] Add a clearCache command to Spark SQL's cache manager JIRA: https://issues.apache.org/jira/browse/SPARK-5909 Author: Yin Huai Closes #4694 from yhuai/clearCache and squashes the following commits: 397ecc4 [Yin Huai] Address comments. a2702fc [Yin Huai] Update parser. 3a54506 [Yin Huai] add isEmpty to CacheManager. 6d14460 [Yin Huai] Python clearCache. f7b8dbd [Yin Huai] Add clear cache command. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9a6c5c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9a6c5c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9a6c5c8 Branch: refs/heads/branch-1.3 Commit: b9a6c5c840be1cb4ec4c256920424afbe09c9b37 Parents: 913562a Author: Yin Huai Authored: Fri Feb 20 16:20:02 2015 +0800 Committer: Michael Armbrust Committed: Fri Feb 20 15:37:38 2015 -0800 ---------------------------------------------------------------------- python/pyspark/sql/context.py | 4 ++++ .../scala/org/apache/spark/sql/CacheManager.scala | 6 ++++++ .../scala/org/apache/spark/sql/SQLContext.scala | 5 +++++ .../scala/org/apache/spark/sql/SparkSQLParser.scala | 11 +++++++---- .../org/apache/spark/sql/execution/commands.scala | 15 +++++++++++++++ .../org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++++ 6 files changed, 53 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b9a6c5c8/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index c5b361a..313f15e 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -695,6 +695,10 @@ class SQLContext(object): """Removes the specified table from the in-memory cache.""" self._ssql_ctx.uncacheTable(tableName) + def clearCache(self): + """Removes all cached tables from the in-memory cache. """ + self._ssql_ctx.clearCache() + class HiveContext(SQLContext): http://git-wip-us.apache.org/repos/asf/spark/blob/b9a6c5c8/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index f1949aa..ca4a127 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -71,11 +71,17 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { } } + /** Clears all cached tables. */ private[sql] def clearCache(): Unit = writeLock { cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist()) cachedData.clear() } + /** Checks if the cache is empty. */ + private[sql] def isEmpty: Boolean = readLock { + cachedData.isEmpty + } + /** * Caches the data produced by the logical representation of the given schema rdd. Unlike * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing http://git-wip-us.apache.org/repos/asf/spark/blob/b9a6c5c8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a6cf3cd..4bdaa02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -217,6 +217,11 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName) + /** + * Removes all cached tables from the in-memory cache. + */ + def clearCache(): Unit = cacheManager.clearCache() + // scalastyle:off // Disable style checker so "implicits" object can start with lowercase i /** http://git-wip-us.apache.org/repos/asf/spark/blob/b9a6c5c8/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala index 00e19da..5921eaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala @@ -23,7 +23,7 @@ import scala.util.parsing.combinator.RegexParsers import org.apache.spark.sql.catalyst.AbstractSparkSQLParser import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{ShowTablesCommand, UncacheTableCommand, CacheTableCommand, SetCommand} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.types.StringType @@ -57,6 +57,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr protected val AS = Keyword("AS") protected val CACHE = Keyword("CACHE") + protected val CLEAR = Keyword("CLEAR") protected val IN = Keyword("IN") protected val LAZY = Keyword("LAZY") protected val SET = Keyword("SET") @@ -74,9 +75,11 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr } private lazy val uncache: Parser[LogicalPlan] = - UNCACHE ~ TABLE ~> ident ^^ { - case tableName => UncacheTableCommand(tableName) - } + ( UNCACHE ~ TABLE ~> ident ^^ { + case tableName => UncacheTableCommand(tableName) + } + | CLEAR ~ CACHE ^^^ ClearCacheCommand + ) private lazy val set: Parser[LogicalPlan] = SET ~> restInput ^^ { http://git-wip-us.apache.org/repos/asf/spark/blob/b9a6c5c8/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 7c92e9f..a112321 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -176,6 +176,21 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand { /** * :: DeveloperApi :: + * Clear all cached data from the in-memory cache. + */ +@DeveloperApi +case object ClearCacheCommand extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + sqlContext.clearCache() + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + +/** + * :: DeveloperApi :: */ @DeveloperApi case class DescribeCommand( http://git-wip-us.apache.org/repos/asf/spark/blob/b9a6c5c8/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index e70e866..c240f2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -280,4 +280,20 @@ class CachedTableSuite extends QueryTest { assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found")) assert(!isCached("t2")) } + + test("Clear all cache") { + sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1") + sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2") + cacheTable("t1") + cacheTable("t2") + clearCache() + assert(cacheManager.isEmpty) + + sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1") + sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2") + cacheTable("t1") + cacheTable("t2") + sql("Clear CACHE") + assert(cacheManager.isEmpty) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org