spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-13078][SQL] API and test cases for internal catalog
Date Mon, 01 Feb 2016 22:11:55 GMT
Repository: spark
Updated Branches:
  refs/heads/master a2973fed3 -> be7a2fc07


[SPARK-13078][SQL] API and test cases for internal catalog

This pull request creates an internal catalog API. The creation of this API is the first step
towards consolidating SQLContext and HiveContext. I envision we will have two different implementations
in Spark 2.0: (1) a simple in-memory implementation, and (2) an implementation based on the
current HiveClient (ClientWrapper).

I took a look at what Hive's internal metastore interface/implementation, and then created
this API based on it. I believe this is the minimal set needed in order to achieve all the
needed functionality.

Author: Reynold Xin <rxin@databricks.com>

Closes #10982 from rxin/SPARK-13078.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be7a2fc0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be7a2fc0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be7a2fc0

Branch: refs/heads/master
Commit: be7a2fc0716b7d25327b6f8f683390fc62532e3b
Parents: a2973fe
Author: Reynold Xin <rxin@databricks.com>
Authored: Mon Feb 1 14:11:52 2016 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Mon Feb 1 14:11:52 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 246 +++++++++++++++++
 .../spark/sql/catalyst/catalog/interface.scala  | 178 +++++++++++++
 .../sql/catalyst/catalog/CatalogTestCases.scala | 263 +++++++++++++++++++
 .../catalyst/catalog/InMemoryCatalogSuite.scala |  23 ++
 4 files changed, 710 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be7a2fc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
new file mode 100644
index 0000000..9e6dfb7
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.AnalysisException
+
+
+/**
+ * An in-memory (ephemeral) implementation of the system catalog.
+ *
+ * All public methods should be synchronized for thread-safety.
+ */
+class InMemoryCatalog extends Catalog {
+
+  private class TableDesc(var table: Table) {
+    val partitions = new mutable.HashMap[String, TablePartition]
+  }
+
+  private class DatabaseDesc(var db: Database) {
+    val tables = new mutable.HashMap[String, TableDesc]
+    val functions = new mutable.HashMap[String, Function]
+  }
+
+  private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
+
+  private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
+    val regex = pattern.replaceAll("\\*", ".*").r
+    names.filter { funcName => regex.pattern.matcher(funcName).matches() }
+  }
+
+  private def existsFunction(db: String, funcName: String): Boolean = {
+    catalog(db).functions.contains(funcName)
+  }
+
+  private def existsTable(db: String, table: String): Boolean = {
+    catalog(db).tables.contains(table)
+  }
+
+  private def assertDbExists(db: String): Unit = {
+    if (!catalog.contains(db)) {
+      throw new AnalysisException(s"Database $db does not exist")
+    }
+  }
+
+  private def assertFunctionExists(db: String, funcName: String): Unit = {
+    assertDbExists(db)
+    if (!existsFunction(db, funcName)) {
+      throw new AnalysisException(s"Function $funcName does not exists in $db database")
+    }
+  }
+
+  private def assertTableExists(db: String, table: String): Unit = {
+    assertDbExists(db)
+    if (!existsTable(db, table)) {
+      throw new AnalysisException(s"Table $table does not exists in $db database")
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  // Databases
+  // --------------------------------------------------------------------------
+
+  override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized
{
+    if (catalog.contains(dbDefinition.name)) {
+      if (!ifNotExists) {
+        throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
+      }
+    } else {
+      catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
+    }
+  }
+
+  override def dropDatabase(
+    db: String,
+    ignoreIfNotExists: Boolean,
+    cascade: Boolean): Unit = synchronized {
+    if (catalog.contains(db)) {
+      if (!cascade) {
+        // If cascade is false, make sure the database is empty.
+        if (catalog(db).tables.nonEmpty) {
+          throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
+        }
+        if (catalog(db).functions.nonEmpty) {
+          throw new AnalysisException(s"Database $db is not empty. One or more functions
exist.")
+        }
+      }
+      // Remove the database.
+      catalog.remove(db)
+    } else {
+      if (!ignoreIfNotExists) {
+        throw new AnalysisException(s"Database $db does not exist")
+      }
+    }
+  }
+
+  override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
+    assertDbExists(db)
+    assert(db == dbDefinition.name)
+    catalog(db).db = dbDefinition
+  }
+
+  override def getDatabase(db: String): Database = synchronized {
+    assertDbExists(db)
+    catalog(db).db
+  }
+
+  override def listDatabases(): Seq[String] = synchronized {
+    catalog.keySet.toSeq
+  }
+
+  override def listDatabases(pattern: String): Seq[String] = synchronized {
+    filterPattern(listDatabases(), pattern)
+  }
+
+  // --------------------------------------------------------------------------
+  // Tables
+  // --------------------------------------------------------------------------
+
+  override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean)
+  : Unit = synchronized {
+    assertDbExists(db)
+    if (existsTable(db, tableDefinition.name)) {
+      if (!ifNotExists) {
+        throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db
database")
+      }
+    } else {
+      catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
+    }
+  }
+
+  override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean)
+  : Unit = synchronized {
+    assertDbExists(db)
+    if (existsTable(db, table)) {
+      catalog(db).tables.remove(table)
+    } else {
+      if (!ignoreIfNotExists) {
+        throw new AnalysisException(s"Table $table does not exist in $db database")
+      }
+    }
+  }
+
+  override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized
{
+    assertTableExists(db, oldName)
+    val oldDesc = catalog(db).tables(oldName)
+    oldDesc.table = oldDesc.table.copy(name = newName)
+    catalog(db).tables.put(newName, oldDesc)
+    catalog(db).tables.remove(oldName)
+  }
+
+  override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized
{
+    assertTableExists(db, table)
+    assert(table == tableDefinition.name)
+    catalog(db).tables(table).table = tableDefinition
+  }
+
+  override def getTable(db: String, table: String): Table = synchronized {
+    assertTableExists(db, table)
+    catalog(db).tables(table).table
+  }
+
+  override def listTables(db: String): Seq[String] = synchronized {
+    assertDbExists(db)
+    catalog(db).tables.keySet.toSeq
+  }
+
+  override def listTables(db: String, pattern: String): Seq[String] = synchronized {
+    assertDbExists(db)
+    filterPattern(listTables(db), pattern)
+  }
+
+  // --------------------------------------------------------------------------
+  // Partitions
+  // --------------------------------------------------------------------------
+
+  override def alterPartition(db: String, table: String, part: TablePartition)
+  : Unit = synchronized {
+    throw new UnsupportedOperationException
+  }
+
+  override def alterPartitions(db: String, table: String, parts: Seq[TablePartition])
+  : Unit = synchronized {
+    throw new UnsupportedOperationException
+  }
+
+  // --------------------------------------------------------------------------
+  // Functions
+  // --------------------------------------------------------------------------
+
+  override def createFunction(
+      db: String, func: Function, ifNotExists: Boolean): Unit = synchronized {
+    assertDbExists(db)
+
+    if (existsFunction(db, func.name)) {
+      if (!ifNotExists) {
+        throw new AnalysisException(s"Function $func already exists in $db database")
+      }
+    } else {
+      catalog(db).functions.put(func.name, func)
+    }
+  }
+
+  override def dropFunction(db: String, funcName: String): Unit = synchronized {
+    assertFunctionExists(db, funcName)
+    catalog(db).functions.remove(funcName)
+  }
+
+  override def alterFunction(db: String, funcName: String, funcDefinition: Function)
+    : Unit = synchronized {
+    assertFunctionExists(db, funcName)
+    if (funcName != funcDefinition.name) {
+      // Also a rename; remove the old one and add the new one back
+      catalog(db).functions.remove(funcName)
+    }
+    catalog(db).functions.put(funcName, funcDefinition)
+  }
+
+  override def getFunction(db: String, funcName: String): Function = synchronized {
+    assertFunctionExists(db, funcName)
+    catalog(db).functions(funcName)
+  }
+
+  override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
+    assertDbExists(db)
+    val regex = pattern.replaceAll("\\*", ".*").r
+    filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/be7a2fc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
new file mode 100644
index 0000000..a6caf91
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.sql.AnalysisException
+
+
+/**
+ * Interface for the system catalog (of columns, partitions, tables, and databases).
+ *
+ * This is only used for non-temporary items, and implementations must be thread-safe as
they
+ * can be accessed in multiple threads.
+ *
+ * Implementations should throw [[AnalysisException]] when table or database don't exist.
+ */
+abstract class Catalog {
+
+  // --------------------------------------------------------------------------
+  // Databases
+  // --------------------------------------------------------------------------
+
+  def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit
+
+  def dropDatabase(
+    db: String,
+    ignoreIfNotExists: Boolean,
+    cascade: Boolean): Unit
+
+  def alterDatabase(db: String, dbDefinition: Database): Unit
+
+  def getDatabase(db: String): Database
+
+  def listDatabases(): Seq[String]
+
+  def listDatabases(pattern: String): Seq[String]
+
+  // --------------------------------------------------------------------------
+  // Tables
+  // --------------------------------------------------------------------------
+
+  def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit
+
+  def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
+
+  def renameTable(db: String, oldName: String, newName: String): Unit
+
+  def alterTable(db: String, table: String, tableDefinition: Table): Unit
+
+  def getTable(db: String, table: String): Table
+
+  def listTables(db: String): Seq[String]
+
+  def listTables(db: String, pattern: String): Seq[String]
+
+  // --------------------------------------------------------------------------
+  // Partitions
+  // --------------------------------------------------------------------------
+
+  // TODO: need more functions for partitioning.
+
+  def alterPartition(db: String, table: String, part: TablePartition): Unit
+
+  def alterPartitions(db: String, table: String, parts: Seq[TablePartition]): Unit
+
+  // --------------------------------------------------------------------------
+  // Functions
+  // --------------------------------------------------------------------------
+
+  def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit
+
+  def dropFunction(db: String, funcName: String): Unit
+
+  def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit
+
+  def getFunction(db: String, funcName: String): Function
+
+  def listFunctions(db: String, pattern: String): Seq[String]
+
+}
+
+
+/**
+ * A function defined in the catalog.
+ *
+ * @param name name of the function
+ * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
+ */
+case class Function(
+  name: String,
+  className: String
+)
+
+
+/**
+ * Storage format, used to describe how a partition or a table is stored.
+ */
+case class StorageFormat(
+  locationUri: String,
+  inputFormat: String,
+  outputFormat: String,
+  serde: String,
+  serdeProperties: Map[String, String]
+)
+
+
+/**
+ * A column in a table.
+ */
+case class Column(
+  name: String,
+  dataType: String,
+  nullable: Boolean,
+  comment: String
+)
+
+
+/**
+ * A partition (Hive style) defined in the catalog.
+ *
+ * @param values values for the partition columns
+ * @param storage storage format of the partition
+ */
+case class TablePartition(
+  values: Seq[String],
+  storage: StorageFormat
+)
+
+
+/**
+ * A table defined in the catalog.
+ *
+ * Note that Hive's metastore also tracks skewed columns. We should consider adding that
in the
+ * future once we have a better understanding of how we want to handle skewed columns.
+ */
+case class Table(
+  name: String,
+  description: String,
+  schema: Seq[Column],
+  partitionColumns: Seq[Column],
+  sortColumns: Seq[Column],
+  storage: StorageFormat,
+  numBuckets: Int,
+  properties: Map[String, String],
+  tableType: String,
+  createTime: Long,
+  lastAccessTime: Long,
+  viewOriginalText: Option[String],
+  viewText: Option[String]) {
+
+  require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" ||
+    tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW")
+}
+
+
+/**
+ * A database defined in the catalog.
+ */
+case class Database(
+  name: String,
+  description: String,
+  locationUri: String,
+  properties: Map[String, String]
+)

http://git-wip-us.apache.org/repos/asf/spark/blob/be7a2fc0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
new file mode 100644
index 0000000..ab9d5ac
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+
+
+/**
+ * A reasonable complete test suite (i.e. behaviors) for a [[Catalog]].
+ *
+ * Implementations of the [[Catalog]] interface can create test suites by extending this.
+ */
+abstract class CatalogTestCases extends SparkFunSuite {
+
+  protected def newEmptyCatalog(): Catalog
+
+  /**
+   * Creates a basic catalog, with the following structure:
+   *
+   * db1
+   * db2
+   *   - tbl1
+   *   - tbl2
+   *   - func1
+   */
+  private def newBasicCatalog(): Catalog = {
+    val catalog = newEmptyCatalog()
+    catalog.createDatabase(newDb("db1"), ifNotExists = false)
+    catalog.createDatabase(newDb("db2"), ifNotExists = false)
+
+    catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
+    catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+    catalog
+  }
+
+  private def newFunc(): Function = Function("funcname", "org.apache.spark.MyFunc")
+
+  private def newDb(name: String = "default"): Database =
+    Database(name, name + " description", "uri", Map.empty)
+
+  private def newTable(name: String): Table =
+    Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE",
0, 0,
+      None, None)
+
+  private def newFunc(name: String): Function = Function(name, "class.name")
+
+  // --------------------------------------------------------------------------
+  // Databases
+  // --------------------------------------------------------------------------
+
+  test("basic create, drop and list databases") {
+    val catalog = newEmptyCatalog()
+    catalog.createDatabase(newDb(), ifNotExists = false)
+    assert(catalog.listDatabases().toSet == Set("default"))
+
+    catalog.createDatabase(newDb("default2"), ifNotExists = false)
+    assert(catalog.listDatabases().toSet == Set("default", "default2"))
+  }
+
+  test("get database when a database exists") {
+    val db1 = newBasicCatalog().getDatabase("db1")
+    assert(db1.name == "db1")
+    assert(db1.description.contains("db1"))
+  }
+
+  test("get database should throw exception when the database does not exist") {
+    intercept[AnalysisException] { newBasicCatalog().getDatabase("db_that_does_not_exist")
}
+  }
+
+  test("list databases without pattern") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listDatabases().toSet == Set("db1", "db2"))
+  }
+
+  test("list databases with pattern") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listDatabases("db").toSet == Set.empty)
+    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
+    assert(catalog.listDatabases("*1").toSet == Set("db1"))
+    assert(catalog.listDatabases("db2").toSet == Set("db2"))
+  }
+
+  test("drop database") {
+    val catalog = newBasicCatalog()
+    catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
+    assert(catalog.listDatabases().toSet == Set("db2"))
+  }
+
+  test("drop database when the database is not empty") {
+    // Throw exception if there are functions left
+    val catalog1 = newBasicCatalog()
+    catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+    catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
+    intercept[AnalysisException] {
+      catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+    }
+
+    // Throw exception if there are tables left
+    val catalog2 = newBasicCatalog()
+    catalog2.dropFunction("db2", "func1")
+    intercept[AnalysisException] {
+      catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+    }
+
+    // When cascade is true, it should drop them
+    val catalog3 = newBasicCatalog()
+    catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
+    assert(catalog3.listDatabases().toSet == Set("db1"))
+  }
+
+  test("drop database when the database does not exist") {
+    val catalog = newBasicCatalog()
+
+    intercept[AnalysisException] {
+      catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade =
false)
+    }
+
+    catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
+  }
+
+  test("alter database") {
+    val catalog = newBasicCatalog()
+    catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty))
+    assert(catalog.getDatabase("db1").description == "new description")
+  }
+
+  test("alter database should throw exception when the database does not exist") {
+    intercept[AnalysisException] {
+      newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty))
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  // Tables
+  // --------------------------------------------------------------------------
+
+  test("drop table") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+    assert(catalog.listTables("db2").toSet == Set("tbl2"))
+  }
+
+  test("drop table when database / table does not exist") {
+    val catalog = newBasicCatalog()
+
+    // Should always throw exception when the database does not exist
+    intercept[AnalysisException] {
+      catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
+    }
+
+    intercept[AnalysisException] {
+      catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
+    }
+
+    // Should throw exception when the table does not exist, if ignoreIfNotExists is false
+    intercept[AnalysisException] {
+      catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
+    }
+
+    catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
+  }
+
+  test("rename table") {
+    val catalog = newBasicCatalog()
+
+    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    catalog.renameTable("db2", "tbl1", "tblone")
+    assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
+  }
+
+  test("rename table when database / table does not exist") {
+    val catalog = newBasicCatalog()
+
+    intercept[AnalysisException] {  // Throw exception when the database does not exist
+      catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
+    }
+
+    intercept[AnalysisException] {  // Throw exception when the table does not exist
+      catalog.renameTable("db2", "unknown_table", "unknown_table")
+    }
+  }
+
+  test("alter table") {
+    val catalog = newBasicCatalog()
+    catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10))
+    assert(catalog.getTable("db2", "tbl1").createTime == 10)
+  }
+
+  test("alter table when database / table does not exist") {
+    val catalog = newBasicCatalog()
+
+    intercept[AnalysisException] {  // Throw exception when the database does not exist
+      catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table"))
+    }
+
+    intercept[AnalysisException] {  // Throw exception when the table does not exist
+      catalog.alterTable("db2", "unknown_table", newTable("unknown_table"))
+    }
+  }
+
+  test("get table") {
+    assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
+  }
+
+  test("get table when database / table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.getTable("unknown_db", "unknown_table")
+    }
+
+    intercept[AnalysisException] {
+      catalog.getTable("db2", "unknown_table")
+    }
+  }
+
+  test("list tables without pattern") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listTables("db1").toSet == Set.empty)
+    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+  }
+
+  test("list tables with pattern") {
+    val catalog = newBasicCatalog()
+
+    // Test when database does not exist
+    intercept[AnalysisException] { catalog.listTables("unknown_db") }
+
+    assert(catalog.listTables("db1", "*").toSet == Set.empty)
+    assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
+    assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
+    assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
+  }
+
+  // --------------------------------------------------------------------------
+  // Partitions
+  // --------------------------------------------------------------------------
+
+  // TODO: Add tests cases for partitions
+
+  // --------------------------------------------------------------------------
+  // Functions
+  // --------------------------------------------------------------------------
+
+  // TODO: Add tests cases for functions
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/be7a2fc0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
new file mode 100644
index 0000000..871f0a0
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+/** Test suite for the [[InMemoryCatalog]]. */
+class InMemoryCatalogSuite extends CatalogTestCases {
+  override protected def newEmptyCatalog(): Catalog = new InMemoryCatalog
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message