Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B457A19C59 for ; Thu, 24 Mar 2016 05:21:43 +0000 (UTC) Received: (qmail 49675 invoked by uid 500); 24 Mar 2016 05:21:43 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 49640 invoked by uid 500); 24 Mar 2016 05:21:43 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 49563 invoked by uid 99); 24 Mar 2016 05:21:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 05:21:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55B94DFFAE; Thu, 24 Mar 2016 05:21:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andrewor14@apache.org To: commits@spark.apache.org Date: Thu, 24 Mar 2016 05:21:44 -0000 Message-Id: <7d33fdf1a31244228a98c6e8b061d0e4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] spark git commit: Revert "[SPARK-14014][SQL] Replace existing catalog with SessionCatalog" Revert "[SPARK-14014][SQL] Replace existing catalog with SessionCatalog" This reverts commit 5dfc01976bb0d72489620b4f32cc12d620bb6260. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c44d140c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c44d140c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c44d140c Branch: refs/heads/master Commit: c44d140cae99d0b880e6d25f158125ad3adc6a05 Parents: cf823be Author: Andrew Or Authored: Wed Mar 23 22:21:15 2016 -0700 Committer: Andrew Or Committed: Wed Mar 23 22:21:15 2016 -0700 ---------------------------------------------------------------------- R/pkg/inst/tests/testthat/test_sparkSQL.R | 3 +- project/MimaExcludes.scala | 3 - python/pyspark/sql/context.py | 2 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 20 +- .../spark/sql/catalyst/analysis/Catalog.scala | 218 ++++++++ .../sql/catalyst/analysis/unresolved.scala | 2 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 35 +- .../sql/catalyst/catalog/SessionCatalog.scala | 123 ++--- .../spark/sql/catalyst/catalog/interface.scala | 2 - .../sql/catalyst/analysis/AnalysisSuite.scala | 6 +- .../sql/catalyst/analysis/AnalysisTest.scala | 23 +- .../analysis/DecimalPrecisionSuite.scala | 25 +- .../sql/catalyst/catalog/CatalogTestCases.scala | 3 +- .../catalyst/catalog/SessionCatalogSuite.scala | 20 +- .../optimizer/BooleanSimplificationSuite.scala | 11 +- .../optimizer/EliminateSortsSuite.scala | 5 +- .../scala/org/apache/spark/sql/SQLContext.scala | 73 +-- .../spark/sql/execution/command/commands.scala | 8 +- .../spark/sql/execution/datasources/ddl.scala | 24 +- .../spark/sql/execution/datasources/rules.scala | 10 +- .../spark/sql/internal/SessionState.scala | 7 +- .../org/apache/spark/sql/ListTablesSuite.scala | 15 +- .../org/apache/spark/sql/SQLContextSuite.scala | 9 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 22 +- .../datasources/parquet/ParquetQuerySuite.scala | 6 +- .../apache/spark/sql/test/SQLTestUtils.scala | 4 +- .../hive/thriftserver/SparkSQLCLIDriver.scala | 3 +- .../spark/sql/hive/thriftserver/CliSuite.scala | 5 +- .../org/apache/spark/sql/hive/HiveCatalog.scala | 5 +- .../org/apache/spark/sql/hive/HiveContext.scala | 498 +++++++++---------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 60 ++- .../spark/sql/hive/HiveSessionCatalog.scala | 104 ---- .../spark/sql/hive/HiveSessionState.scala | 10 +- .../spark/sql/hive/client/HiveClient.scala | 3 + .../spark/sql/hive/client/HiveClientImpl.scala | 4 + .../hive/execution/CreateTableAsSelect.scala | 4 +- .../sql/hive/execution/CreateViewAsSelect.scala | 4 +- .../hive/execution/InsertIntoHiveTable.scala | 14 +- .../spark/sql/hive/execution/commands.scala | 9 +- .../apache/spark/sql/hive/test/TestHive.scala | 151 ++---- .../sql/hive/JavaMetastoreDataSourcesSuite.java | 5 +- .../spark/sql/hive/HiveContextSuite.scala | 38 -- .../sql/hive/HiveMetastoreCatalogSuite.scala | 9 +- .../apache/spark/sql/hive/ListTablesSuite.scala | 6 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 31 +- .../spark/sql/hive/MultiDatabaseSuite.scala | 5 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 3 +- .../spark/sql/hive/client/VersionsSuite.scala | 4 + .../sql/hive/execution/HiveQuerySuite.scala | 16 +- .../sql/hive/execution/SQLQuerySuite.scala | 4 +- .../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +- .../apache/spark/sql/hive/parquetSuites.scala | 24 +- 52 files changed, 783 insertions(+), 919 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/R/pkg/inst/tests/testthat/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index eef365b..63acbad 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1817,8 +1817,7 @@ test_that("approxQuantile() on a DataFrame", { test_that("SQL error message is returned from JVM", { retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e) - expect_equal(grepl("Table not found", retError), TRUE) - expect_equal(grepl("blah", retError), TRUE) + expect_equal(grepl("Table not found: blah", retError), TRUE) }) irisDF <- suppressWarnings(createDataFrame(sqlContext, iris)) http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9158983..42eafcb 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -563,9 +563,6 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance") ) ++ Seq( - // [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this") - ) ++ Seq( // [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"), (problem: Problem) => problem match { http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 4008332..9c2f6a3 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -554,7 +554,7 @@ class SQLContext(object): >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> "table1" in sqlContext.tableNames() True - >>> "table1" in sqlContext.tableNames("default") + >>> "table1" in sqlContext.tableNames("db") True """ if dbName is None: http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 54543ee..07b0f5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -37,22 +36,23 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.types._ /** - * A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. - * Used for testing when all relations are already filled in and the analyzer needs only - * to resolve attribute references. + * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing + * when all relations are already filled in and the analyzer needs only to resolve attribute + * references. */ object SimpleAnalyzer - extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true)) -class SimpleAnalyzer(conf: CatalystConf) - extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf) + extends Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and - * [[UnresolvedRelation]]s into fully typed objects using information in a - * [[SessionCatalog]] and a [[FunctionRegistry]]. + * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and + * a [[FunctionRegistry]]. */ class Analyzer( - catalog: SessionCatalog, + catalog: Catalog, registry: FunctionRegistry, conf: CatalystConf, maxIterations: Int = 100) http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala new file mode 100644 index 0000000..2f0a4db --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} + + +/** + * An interface for looking up relations by name. Used by an [[Analyzer]]. + */ +trait Catalog { + + val conf: CatalystConf + + def tableExists(tableIdent: TableIdentifier): Boolean + + def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan + + def setCurrentDatabase(databaseName: String): Unit = { + throw new UnsupportedOperationException + } + + /** + * Returns tuples of (tableName, isTemporary) for all tables in the given database. + * isTemporary is a Boolean value indicates if a table is a temporary or not. + */ + def getTables(databaseName: Option[String]): Seq[(String, Boolean)] + + def refreshTable(tableIdent: TableIdentifier): Unit + + def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit + + def unregisterTable(tableIdent: TableIdentifier): Unit + + def unregisterAllTables(): Unit + + /** + * Get the table name of TableIdentifier for temporary tables. + */ + protected def getTableName(tableIdent: TableIdentifier): String = { + // It is not allowed to specify database name for temporary tables. + // We check it here and throw exception if database is defined. + if (tableIdent.database.isDefined) { + throw new AnalysisException("Specifying database name or other qualifiers are not allowed " + + "for temporary tables. If the table name has dots (.) in it, please quote the " + + "table name with backticks (`).") + } + if (conf.caseSensitiveAnalysis) { + tableIdent.table + } else { + tableIdent.table.toLowerCase + } + } +} + +class SimpleCatalog(val conf: CatalystConf) extends Catalog { + private[this] val tables = new ConcurrentHashMap[String, LogicalPlan] + + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + tables.put(getTableName(tableIdent), plan) + } + + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + tables.remove(getTableName(tableIdent)) + } + + override def unregisterAllTables(): Unit = { + tables.clear() + } + + override def tableExists(tableIdent: TableIdentifier): Boolean = { + tables.containsKey(getTableName(tableIdent)) + } + + override def lookupRelation( + tableIdent: TableIdentifier, + alias: Option[String] = None): LogicalPlan = { + val tableName = getTableName(tableIdent) + val table = tables.get(tableName) + if (table == null) { + throw new AnalysisException("Table not found: " + tableName) + } + val qualifiedTable = SubqueryAlias(tableName, table) + + // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are + // properly qualified with this alias. + alias + .map(a => SubqueryAlias(a, qualifiedTable)) + .getOrElse(qualifiedTable) + } + + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + tables.keySet().asScala.map(_ -> true).toSeq + } + + override def refreshTable(tableIdent: TableIdentifier): Unit = { + throw new UnsupportedOperationException + } +} + +/** + * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with + * new logical plans. This can be used to bind query result to virtual tables, or replace tables + * with in-memory cached versions. Note that the set of overrides is stored in memory and thus + * lost when the JVM exits. + */ +trait OverrideCatalog extends Catalog { + private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan] + + private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = { + if (tableIdent.database.isDefined) { + None + } else { + Option(overrides.get(getTableName(tableIdent))) + } + } + + abstract override def tableExists(tableIdent: TableIdentifier): Boolean = { + getOverriddenTable(tableIdent) match { + case Some(_) => true + case None => super.tableExists(tableIdent) + } + } + + abstract override def lookupRelation( + tableIdent: TableIdentifier, + alias: Option[String] = None): LogicalPlan = { + getOverriddenTable(tableIdent) match { + case Some(table) => + val tableName = getTableName(tableIdent) + val qualifiedTable = SubqueryAlias(tableName, table) + + // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes + // are properly qualified with this alias. + alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) + + case None => super.lookupRelation(tableIdent, alias) + } + } + + abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName) + } + + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + overrides.put(getTableName(tableIdent), plan) + } + + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + if (tableIdent.database.isEmpty) { + overrides.remove(getTableName(tableIdent)) + } + } + + override def unregisterAllTables(): Unit = { + overrides.clear() + } +} + +/** + * A trivial catalog that returns an error when a relation is requested. Used for testing when all + * relations are already filled in and the analyzer needs only to resolve attribute references. + */ +object EmptyCatalog extends Catalog { + + override val conf: CatalystConf = EmptyConf + + override def tableExists(tableIdent: TableIdentifier): Boolean = { + throw new UnsupportedOperationException + } + + override def lookupRelation( + tableIdent: TableIdentifier, + alias: Option[String] = None): LogicalPlan = { + throw new UnsupportedOperationException + } + + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + throw new UnsupportedOperationException + } + + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + throw new UnsupportedOperationException + } + + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + throw new UnsupportedOperationException + } + + override def unregisterAllTables(): Unit = { + throw new UnsupportedOperationException + } + + override def refreshTable(tableIdent: TableIdentifier): Unit = { + throw new UnsupportedOperationException + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index e73d367..9518309 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null) /** - * Holds the name of a relation that has yet to be looked up in a catalog. + * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. */ case class UnresolvedRelation( tableIdentifier: TableIdentifier, http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index e216fa5..7ead1dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -52,34 +52,37 @@ class InMemoryCatalog extends ExternalCatalog { names.filter { funcName => regex.pattern.matcher(funcName).matches() } } - private def functionExists(db: String, funcName: String): Boolean = { + private def existsFunction(db: String, funcName: String): Boolean = { requireDbExists(db) catalog(db).functions.contains(funcName) } - private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = { + private def existsTable(db: String, table: String): Boolean = { + requireDbExists(db) + catalog(db).tables.contains(table) + } + + private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = { requireTableExists(db, table) catalog(db).tables(table).partitions.contains(spec) } private def requireFunctionExists(db: String, funcName: String): Unit = { - if (!functionExists(db, funcName)) { - throw new AnalysisException( - s"Function not found: '$funcName' does not exist in database '$db'") + if (!existsFunction(db, funcName)) { + throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'") } } private def requireTableExists(db: String, table: String): Unit = { - if (!tableExists(db, table)) { - throw new AnalysisException( - s"Table not found: '$table' does not exist in database '$db'") + if (!existsTable(db, table)) { + throw new AnalysisException(s"Table '$table' does not exist in database '$db'") } } private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { - if (!partitionExists(db, table, spec)) { + if (!existsPartition(db, table, spec)) { throw new AnalysisException( - s"Partition not found: database '$db' table '$table' does not contain: '$spec'") + s"Partition does not exist in database '$db' table '$table': '$spec'") } } @@ -156,7 +159,7 @@ class InMemoryCatalog extends ExternalCatalog { ignoreIfExists: Boolean): Unit = synchronized { requireDbExists(db) val table = tableDefinition.name.table - if (tableExists(db, table)) { + if (existsTable(db, table)) { if (!ignoreIfExists) { throw new AnalysisException(s"Table '$table' already exists in database '$db'") } @@ -170,7 +173,7 @@ class InMemoryCatalog extends ExternalCatalog { table: String, ignoreIfNotExists: Boolean): Unit = synchronized { requireDbExists(db) - if (tableExists(db, table)) { + if (existsTable(db, table)) { catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { @@ -197,17 +200,13 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables(table).table } - override def tableExists(db: String, table: String): Boolean = synchronized { - requireDbExists(db) - catalog(db).tables.contains(table) - } - override def listTables(db: String): Seq[String] = synchronized { requireDbExists(db) catalog(db).tables.keySet.toSeq } override def listTables(db: String, pattern: String): Seq[String] = synchronized { + requireDbExists(db) filterPattern(listTables(db), pattern) } @@ -296,7 +295,7 @@ class InMemoryCatalog extends ExternalCatalog { override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) - if (functionExists(db, func.name.funcName)) { + if (existsFunction(db, func.name.funcName)) { throw new AnalysisException(s"Function '$func' already exists in '$db' database") } else { catalog(db).functions.put(func.name.funcName, func) http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 34265fa..3ac2bcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -32,34 +31,17 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary * tables and functions of the Spark Session that it belongs to. */ -class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { +class SessionCatalog(externalCatalog: ExternalCatalog) { import ExternalCatalog._ - def this(externalCatalog: ExternalCatalog) { - this(externalCatalog, new SimpleCatalystConf(true)) - } - - protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] - protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] + private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] + private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] // Note: we track current database here because certain operations do not explicitly // specify the database (e.g. DROP TABLE my_table). In these cases we must first // check whether the temporary table or function exists, then, if not, operate on // the corresponding item in the current database. - protected[this] var currentDb = { - val defaultName = "default" - val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map()) - // Initialize default database if it doesn't already exist - createDatabase(defaultDbDefinition, ignoreIfExists = true) - defaultName - } - - /** - * Format table name, taking into account case sensitivity. - */ - protected[this] def formatTableName(name: String): String = { - if (conf.caseSensitiveAnalysis) name else name.toLowerCase - } + private[this] var currentDb = "default" // ---------------------------------------------------------------------------- // Databases @@ -123,8 +105,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val table = formatTableName(tableDefinition.name.table) - val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) + val newTableDefinition = tableDefinition.copy( + name = TableIdentifier(tableDefinition.name.table, Some(db))) externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) } @@ -139,8 +121,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def alterTable(tableDefinition: CatalogTable): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val table = formatTableName(tableDefinition.name.table) - val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) + val newTableDefinition = tableDefinition.copy( + name = TableIdentifier(tableDefinition.name.table, Some(db))) externalCatalog.alterTable(db, newTableDefinition) } @@ -150,8 +132,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def getTable(name: TableIdentifier): CatalogTable = { val db = name.database.getOrElse(currentDb) - val table = formatTableName(name.table) - externalCatalog.getTable(db, table) + externalCatalog.getTable(db, name.table) } // ------------------------------------------------------------- @@ -165,11 +146,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { name: String, tableDefinition: LogicalPlan, ignoreIfExists: Boolean): Unit = { - val table = formatTableName(name) - if (tempTables.containsKey(table) && !ignoreIfExists) { + if (tempTables.containsKey(name) && !ignoreIfExists) { throw new AnalysisException(s"Temporary table '$name' already exists.") } - tempTables.put(table, tableDefinition) + tempTables.put(name, tableDefinition) } /** @@ -186,13 +166,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { throw new AnalysisException("rename does not support moving tables across databases") } val db = oldName.database.getOrElse(currentDb) - val oldTableName = formatTableName(oldName.table) - val newTableName = formatTableName(newName.table) - if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) { - externalCatalog.renameTable(db, oldTableName, newTableName) + if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) { + externalCatalog.renameTable(db, oldName.table, newName.table) } else { - val table = tempTables.remove(oldTableName) - tempTables.put(newTableName, table) + val table = tempTables.remove(oldName.table) + tempTables.put(newName.table, table) } } @@ -205,11 +183,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = { val db = name.database.getOrElse(currentDb) - val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.containsKey(table)) { - externalCatalog.dropTable(db, table, ignoreIfNotExists) + if (name.database.isDefined || !tempTables.containsKey(name.table)) { + externalCatalog.dropTable(db, name.table, ignoreIfNotExists) } else { - tempTables.remove(table) + tempTables.remove(name.table) } } @@ -222,42 +199,27 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { val db = name.database.getOrElse(currentDb) - val table = formatTableName(name.table) val relation = - if (name.database.isDefined || !tempTables.containsKey(table)) { - val metadata = externalCatalog.getTable(db, table) + if (name.database.isDefined || !tempTables.containsKey(name.table)) { + val metadata = externalCatalog.getTable(db, name.table) CatalogRelation(db, metadata, alias) } else { - tempTables.get(table) + tempTables.get(name.table) } - val qualifiedTable = SubqueryAlias(table, relation) + val qualifiedTable = SubqueryAlias(name.table, relation) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) } /** - * Return whether a table with the specified name exists. - * - * Note: If a database is explicitly specified, then this will return whether the table - * exists in that particular database instead. In that case, even if there is a temporary - * table with the same name, we will return false if the specified database does not - * contain the table. - */ - def tableExists(name: TableIdentifier): Boolean = { - val db = name.database.getOrElse(currentDb) - val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.containsKey(table)) { - externalCatalog.tableExists(db, table) - } else { - true // it's a temporary table - } - } - - /** * List all tables in the specified database, including temporary tables. */ - def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") + def listTables(db: String): Seq[TableIdentifier] = { + val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) } + val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) } + dbTables ++ _tempTables + } /** * List all matching tables in the specified database, including temporary tables. @@ -273,19 +235,6 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { } /** - * Refresh the cache entry for a metastore table, if any. - */ - def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } - - /** - * Drop all existing temporary tables. - * For testing only. - */ - def clearTempTables(): Unit = { - tempTables.clear() - } - - /** * Return a temporary table exactly as it was stored. * For testing only. */ @@ -314,8 +263,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.createPartitions(db, table, parts, ignoreIfExists) + externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists) } /** @@ -327,8 +275,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) + externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists) } /** @@ -342,8 +289,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.renamePartitions(db, table, specs, newSpecs) + externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs) } /** @@ -357,8 +303,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.alterPartitions(db, table, parts) + externalCatalog.alterPartitions(db, tableName.table, parts) } /** @@ -367,8 +312,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.getPartition(db, table, spec) + externalCatalog.getPartition(db, tableName.table, spec) } /** @@ -377,8 +321,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { */ def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = { val db = tableName.database.getOrElse(currentDb) - val table = formatTableName(tableName.table) - externalCatalog.listPartitions(db, table) + externalCatalog.listPartitions(db, tableName.table) } // ---------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 3480313..c4e4961 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -91,8 +91,6 @@ abstract class ExternalCatalog { def getTable(db: String, table: String): CatalogTable - def tableExists(db: String, table: String): Boolean - def listTables(db: String): Seq[String] def listTables(db: String, pattern: String): Seq[String] http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index afc2f32..8b568b6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -161,10 +161,14 @@ class AnalysisSuite extends AnalysisTest { } test("resolve relations") { - assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq()) + assertAnalysisError( + UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe")) + checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation) + checkAnalysis( UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false) + checkAnalysis( UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false) } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 6fa4bee..39166c4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -18,21 +18,26 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { - protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true) - protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false) + val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { + val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) - private def makeAnalyzer(caseSensitive: Boolean): Analyzer = { - val conf = new SimpleCatalystConf(caseSensitive) - val catalog = new SessionCatalog(new InMemoryCatalog, conf) - catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true) - new Analyzer(catalog, EmptyFunctionRegistry, conf) { + val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) + val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) + + caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) + caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) + + new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) { + override val extendedResolutionRules = EliminateSubqueryAliases :: Nil + } -> + new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) { override val extendedResolutionRules = EliminateSubqueryAliases :: Nil } } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 3150186..9aa685e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -31,11 +30,11 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) - private val catalog = new SessionCatalog(new InMemoryCatalog, conf) - private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + val catalog = new SimpleCatalog(conf) + val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) - private val relation = LocalRelation( + val relation = LocalRelation( AttributeReference("i", IntegerType)(), AttributeReference("d1", DecimalType(2, 1))(), AttributeReference("d2", DecimalType(5, 2))(), @@ -44,15 +43,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { AttributeReference("b", DoubleType)() ) - private val i: Expression = UnresolvedAttribute("i") - private val d1: Expression = UnresolvedAttribute("d1") - private val d2: Expression = UnresolvedAttribute("d2") - private val u: Expression = UnresolvedAttribute("u") - private val f: Expression = UnresolvedAttribute("f") - private val b: Expression = UnresolvedAttribute("b") + val i: Expression = UnresolvedAttribute("i") + val d1: Expression = UnresolvedAttribute("d1") + val d2: Expression = UnresolvedAttribute("d2") + val u: Expression = UnresolvedAttribute("u") + val f: Expression = UnresolvedAttribute("f") + val b: Expression = UnresolvedAttribute("b") before { - catalog.createTempTable("table", relation, ignoreIfExists = true) + catalog.registerTable(TableIdentifier("table"), relation) } private def checkType(expression: Expression, expectedType: DataType): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index 277c2d7..a1ea619 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -225,14 +225,13 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("list tables without pattern") { val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db") } assert(catalog.listTables("db1").toSet == Set.empty) assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) } test("list tables with pattern") { val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db", "*") } + intercept[AnalysisException] { catalog.listTables("unknown_db") } assert(catalog.listTables("db1", "*").toSet == Set.empty) assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 74e995c..e1973ee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -397,24 +397,6 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias) } - test("table exists") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2")))) - assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2")))) - assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) - assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) - assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) - // If database is explicitly specified, do not check temporary tables - val tempTable = Range(1, 10, 1, 10, Seq()) - catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false) - assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) - // If database is not explicitly specified, check the current database - catalog.setCurrentDatabase("db2") - assert(catalog.tableExists(TableIdentifier("tbl1"))) - assert(catalog.tableExists(TableIdentifier("tbl2"))) - assert(catalog.tableExists(TableIdentifier("tbl3"))) - } - test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) @@ -447,7 +429,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.listTables("db2", "*1").toSet == Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) intercept[AnalysisException] { - catalog.listTables("unknown_db", "*") + catalog.listTables("unknown_db") } } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index e2c76b7..2ab31ee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -138,11 +137,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d)) } - private val caseInsensitiveConf = new SimpleCatalystConf(false) - private val caseInsensitiveAnalyzer = new Analyzer( - new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf), - EmptyFunctionRegistry, - caseInsensitiveConf) + private val caseInsensitiveAnalyzer = + new Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = false)) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index 3824c67..a4c8d1c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -29,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._ class EliminateSortsSuite extends PlanTest { val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false) - val catalog = new SessionCatalog(new InMemoryCatalog, conf) + val catalog = new SimpleCatalog(conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) object Optimize extends RuleExecutor[LogicalPlan] { http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e413e77..853a74c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -25,14 +25,13 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog} import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} @@ -66,14 +65,13 @@ class SQLContext private[sql]( @transient val sparkContext: SparkContext, @transient protected[sql] val cacheManager: CacheManager, @transient private[sql] val listener: SQLListener, - val isRootContext: Boolean, - @transient private[sql] val externalCatalog: ExternalCatalog) + val isRootContext: Boolean) extends Logging with Serializable { self => - def this(sc: SparkContext) = { - this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog) + def this(sparkContext: SparkContext) = { + this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true) } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) @@ -111,8 +109,7 @@ class SQLContext private[sql]( sparkContext = sparkContext, cacheManager = cacheManager, listener = listener, - isRootContext = false, - externalCatalog = externalCatalog) + isRootContext = false) } /** @@ -189,12 +186,6 @@ class SQLContext private[sql]( */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs - // Extract `spark.sql.*` entries and put it in our SQLConf. - // Subclasses may additionally set these entries in other confs. - SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) => - setConf(k, v) - } - protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql) protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) @@ -208,6 +199,30 @@ class SQLContext private[sql]( sparkContext.addJar(path) } + { + // We extract spark sql settings from SparkContext's conf and put them to + // Spark SQL's conf. + // First, we populate the SQLConf (conf). So, we can make sure that other values using + // those settings in their construction can get the correct settings. + // For example, metadataHive in HiveContext may need both spark.sql.hive.metastore.version + // and spark.sql.hive.metastore.jars to get correctly constructed. + val properties = new Properties + sparkContext.getConf.getAll.foreach { + case (key, value) if key.startsWith("spark.sql") => properties.setProperty(key, value) + case _ => + } + // We directly put those settings to conf to avoid of calling setConf, which may have + // side-effects. For example, in HiveContext, setConf may cause executionHive and metadataHive + // get constructed. If we call setConf directly, the constructed metadataHive may have + // wrong settings, or the construction may fail. + conf.setConf(properties) + // After we have populated SQLConf, we call setConf to populate other confs in the subclass + // (e.g. hiveconf in HiveContext). + properties.asScala.foreach { + case (key, value) => setConf(key, value) + } + } + /** * :: Experimental :: * A collection of methods that are considered experimental, but can be used to hook into @@ -668,10 +683,8 @@ class SQLContext private[sql]( * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - sessionState.catalog.createTempTable( - sessionState.sqlParser.parseTableIdentifier(tableName).table, - df.logicalPlan, - ignoreIfExists = true) + sessionState.catalog.registerTable( + sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan) } /** @@ -684,7 +697,7 @@ class SQLContext private[sql]( */ def dropTempTable(tableName: String): Unit = { cacheManager.tryUncacheQuery(table(tableName)) - sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true) + sessionState.catalog.unregisterTable(TableIdentifier(tableName)) } /** @@ -811,7 +824,9 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(): Array[String] = { - tableNames(sessionState.catalog.getCurrentDatabase) + sessionState.catalog.getTables(None).map { + case (tableName, _) => tableName + }.toArray } /** @@ -821,7 +836,9 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(databaseName: String): Array[String] = { - sessionState.catalog.listTables(databaseName).map(_.table).toArray + sessionState.catalog.getTables(Some(databaseName)).map { + case (tableName, _) => tableName + }.toArray } @transient @@ -1008,18 +1025,4 @@ object SQLContext { } sqlListener.get() } - - /** - * Extract `spark.sql.*` properties from the conf and return them as a [[Properties]]. - */ - private[sql] def getSQLProperties(sparkConf: SparkConf): Properties = { - val properties = new Properties - sparkConf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.sql")) { - properties.setProperty(key, value) - } - } - properties - } - } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 964f0a7..59c3ffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -339,12 +339,10 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma override def run(sqlContext: SQLContext): Seq[Row] = { // Since we need to return a Seq of rows, we will call getTables directly // instead of calling tables in sqlContext. - val catalog = sqlContext.sessionState.catalog - val db = databaseName.getOrElse(catalog.getCurrentDatabase) - val rows = catalog.listTables(db).map { t => - val isTemp = t.database.isEmpty - Row(t.table, isTemp) + val rows = sqlContext.sessionState.catalog.getTables(databaseName).map { + case (tableName, isTemporary) => Row(tableName, isTemporary) } + rows } } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 24923bb..9e8e035 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -93,21 +93,15 @@ case class CreateTempTableUsing( provider: String, options: Map[String, String]) extends RunnableCommand { - if (tableIdent.database.isDefined) { - throw new AnalysisException( - s"Temporary table '$tableIdent' should not have specified a database") - } - def run(sqlContext: SQLContext): Seq[Row] = { val dataSource = DataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, className = provider, options = options) - sqlContext.sessionState.catalog.createTempTable( - tableIdent.table, - Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan, - ignoreIfExists = true) + sqlContext.sessionState.catalog.registerTable( + tableIdent, + Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) Seq.empty[Row] } @@ -121,11 +115,6 @@ case class CreateTempTableUsingAsSelect( options: Map[String, String], query: LogicalPlan) extends RunnableCommand { - if (tableIdent.database.isDefined) { - throw new AnalysisException( - s"Temporary table '$tableIdent' should not have specified a database") - } - override def run(sqlContext: SQLContext): Seq[Row] = { val df = Dataset.ofRows(sqlContext, query) val dataSource = DataSource( @@ -135,10 +124,9 @@ case class CreateTempTableUsingAsSelect( bucketSpec = None, options = options) val result = dataSource.write(mode, df) - sqlContext.sessionState.catalog.createTempTable( - tableIdent.table, - Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan, - ignoreIfExists = true) + sqlContext.sessionState.catalog.registerTable( + tableIdent, + Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan) Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 28ac458..63f0e4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -19,12 +19,10 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation} /** @@ -101,9 +99,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { /** * A rule to do various checks before inserting into or writing to a data source table. */ -private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) - extends (LogicalPlan => Unit) { - +private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) { def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } def apply(plan: LogicalPlan): Unit = { @@ -143,7 +139,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } PartitioningUtils.validatePartitionColumnDataTypes( - r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) + r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis) // Get all input data source relations of the query. val srcRelations = query.collect { @@ -194,7 +190,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } PartitioningUtils.validatePartitionColumnDataTypes( - c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) + c.child.schema, c.partitionColumns, catalog.conf.caseSensitiveAnalysis) for { spec <- c.bucketSpec http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index e5f02ca..e6be0ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.internal import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -46,7 +45,7 @@ private[sql] class SessionState(ctx: SQLContext) { /** * Internal catalog for managing table and database states. */ - lazy val catalog = new SessionCatalog(ctx.externalCatalog, conf) + lazy val catalog: Catalog = new SimpleCatalog(conf) /** * Internal catalog for managing functions registered by the user. @@ -69,7 +68,7 @@ private[sql] class SessionState(ctx: SQLContext) { DataSourceAnalysis :: (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) - override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog)) + override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala index bb54c52..2820e4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala @@ -33,8 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex } after { - sqlContext.sessionState.catalog.dropTable( - TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) } test("get all tables") { @@ -46,22 +45,20 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.sessionState.catalog.dropTable( - TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } - test("getting all tables with a database name has no impact on returned table names") { + test("getting all Tables with a database name has no impact on returned table names") { checkAnswer( - sqlContext.tables("default").filter("tableName = 'ListTablesSuiteTable'"), + sqlContext.tables("DB").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) checkAnswer( - sql("show TABLES in default").filter("tableName = 'ListTablesSuiteTable'"), + sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.sessionState.catalog.dropTable( - TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 2f62ad4..2ad92b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf -class SQLContextSuite extends SparkFunSuite with SharedSparkContext { +class SQLContextSuite extends SparkFunSuite with SharedSparkContext{ object DummyRule extends Rule[LogicalPlan] { def apply(p: LogicalPlan): LogicalPlan = p @@ -78,11 +78,4 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { sqlContext.experimental.extraOptimizations = Seq(DummyRule) assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule)) } - - test("SQLContext can access `spark.sql.*` configs") { - sc.conf.set("spark.sql.with.or.without.you", "my love") - val sqlContext = new SQLContext(sc) - assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love") - } - } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4f36b1b..eb486a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1397,16 +1397,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-4699 case sensitivity SQL query") { - val orig = sqlContext.getConf(SQLConf.CASE_SENSITIVE) - try { - sqlContext.setConf(SQLConf.CASE_SENSITIVE, false) - val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil - val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) - rdd.toDF().registerTempTable("testTable1") - checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1")) - } finally { - sqlContext.setConf(SQLConf.CASE_SENSITIVE, orig) - } + sqlContext.setConf(SQLConf.CASE_SENSITIVE, false) + val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil + val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) + rdd.toDF().registerTempTable("testTable1") + checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1")) + sqlContext.setConf(SQLConf.CASE_SENSITIVE, true) } test("SPARK-6145: ORDER BY test for nested fields") { @@ -1680,8 +1676,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { .format("parquet") .save(path) - // We don't support creating a temporary table while specifying a database - intercept[AnalysisException] { + val message = intercept[AnalysisException] { sqlContext.sql( s""" |CREATE TEMPORARY TABLE db.t @@ -1691,8 +1686,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { |) """.stripMargin) }.getMessage + assert(message.contains("Specifying database name or other qualifiers are not allowed")) - // If you use backticks to quote the name then it's OK. + // If you use backticks to quote the name of a temporary table having dot in it. sqlContext.sql( s""" |CREATE TEMPORARY TABLE `db.t` http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 2f806eb..f8166c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -51,8 +51,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple)) } - sqlContext.sessionState.catalog.dropTable( - TableIdentifier("tmp"), ignoreIfNotExists = true) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -62,8 +61,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple)) } - sqlContext.sessionState.catalog.dropTable( - TableIdentifier("tmp"), ignoreIfNotExists = true) + sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") { http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 80a85a6..d483585 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -189,8 +189,8 @@ private[sql] trait SQLTestUtils * `f` returns. */ protected def activateDatabase(db: String)(f: => Unit): Unit = { - sqlContext.sessionState.catalog.setCurrentDatabase(db) - try f finally sqlContext.sessionState.catalog.setCurrentDatabase("default") + sqlContext.sql(s"USE $db") + try f finally sqlContext.sql(s"USE default") } /** http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 5769328..7fe31b0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -150,8 +150,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { } if (sessionState.database != null) { - SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase( - s"${sessionState.database}") + SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}") } // Execute -i init files (always in silent mode) http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 8e1ebe2..032965d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -193,7 +193,10 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))( - "" -> "hive_test" + "" + -> "OK", + "" + -> "hive_test" ) } http://git-wip-us.apache.org/repos/asf/spark/blob/c44d140c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala index 0722fb0..491f2ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala @@ -85,6 +85,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit withClient { getTable(db, table) } } + // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -181,10 +182,6 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit client.getTable(db, table) } - override def tableExists(db: String, table: String): Boolean = withClient { - client.getTableOption(db, table).isDefined - } - override def listTables(db: String): Seq[String] = withClient { requireDbExists(db) client.listTables(db) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org