spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [1/2] spark git commit: [SPARK-13080][SQL] Implement new Catalog API using Hive
Date Sun, 21 Feb 2016 23:00:26 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7eb83fefd -> 6c3832b26


http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 752c037..5801051 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.ParseUtils._
@@ -39,7 +40,6 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.SparkQl
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.AnalysisException
@@ -55,7 +55,7 @@ private[hive] case object NativePlaceholder extends LogicalPlan {
 }
 
 private[hive] case class CreateTableAsSelect(
-    tableDesc: HiveTable,
+    tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean) extends UnaryNode with Command {
 
@@ -63,14 +63,14 @@ private[hive] case class CreateTableAsSelect(
   override lazy val resolved: Boolean =
     tableDesc.specifiedDatabase.isDefined &&
     tableDesc.schema.nonEmpty &&
-    tableDesc.serde.isDefined &&
-    tableDesc.inputFormat.isDefined &&
-    tableDesc.outputFormat.isDefined &&
+    tableDesc.storage.serde.isDefined &&
+    tableDesc.storage.inputFormat.isDefined &&
+    tableDesc.storage.outputFormat.isDefined &&
     childrenResolved
 }
 
 private[hive] case class CreateViewAsSelect(
-    tableDesc: HiveTable,
+    tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean,
     replace: Boolean,
@@ -193,7 +193,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
       view: ASTNode,
       viewNameParts: ASTNode,
       query: ASTNode,
-      schema: Seq[HiveColumn],
+      schema: Seq[CatalogColumn],
       properties: Map[String, String],
       allowExist: Boolean,
       replace: Boolean): CreateViewAsSelect = {
@@ -201,18 +201,20 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
 
     val originalText = query.source
 
-    val tableDesc = HiveTable(
+    val tableDesc = CatalogTable(
       specifiedDatabase = dbName,
       name = viewName,
+      tableType = CatalogTableType.VIRTUAL_VIEW,
       schema = schema,
-      partitionColumns = Seq.empty[HiveColumn],
+      storage = CatalogStorageFormat(
+        locationUri = None,
+        inputFormat = None,
+        outputFormat = None,
+        serde = None,
+        serdeProperties = Map.empty[String, String]
+      ),
       properties = properties,
-      serdeProperties = Map[String, String](),
-      tableType = VirtualView,
-      location = None,
-      inputFormat = None,
-      outputFormat = None,
-      serde = None,
+      viewOriginalText = Some(originalText),
       viewText = Some(originalText))
 
     // We need to keep the original SQL string so that if `spark.sql.nativeView` is
@@ -314,8 +316,8 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
           val schema = maybeColumns.map { cols =>
             // We can't specify column types when create view, so fill it with null first, and
             // update it after the schema has been resolved later.
-            nodeToColumns(cols, lowerCase = true).map(_.copy(hiveType = null))
-          }.getOrElse(Seq.empty[HiveColumn])
+            nodeToColumns(cols, lowerCase = true).map(_.copy(dataType = null))
+          }.getOrElse(Seq.empty[CatalogColumn])
 
           val properties = scala.collection.mutable.Map.empty[String, String]
 
@@ -369,19 +371,23 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
         val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
 
         // TODO add bucket support
-        var tableDesc: HiveTable = HiveTable(
+        var tableDesc: CatalogTable = CatalogTable(
           specifiedDatabase = dbName,
           name = tblName,
-          schema = Seq.empty[HiveColumn],
-          partitionColumns = Seq.empty[HiveColumn],
-          properties = Map[String, String](),
-          serdeProperties = Map[String, String](),
-          tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
-          location = None,
-          inputFormat = None,
-          outputFormat = None,
-          serde = None,
-          viewText = None)
+          tableType =
+            if (externalTable.isDefined) {
+              CatalogTableType.EXTERNAL_TABLE
+            } else {
+              CatalogTableType.MANAGED_TABLE
+            },
+          storage = CatalogStorageFormat(
+            locationUri = None,
+            inputFormat = None,
+            outputFormat = None,
+            serde = None,
+            serdeProperties = Map.empty[String, String]
+          ),
+          schema = Seq.empty[CatalogColumn])
 
         // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.)
         val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
@@ -392,9 +398,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
             outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
         }
 
-        hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f)))
-        hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f)))
-        hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f)))
+        tableDesc = tableDesc.withNewStorage(
+          inputFormat = hiveSerDe.inputFormat.orElse(tableDesc.storage.inputFormat),
+          outputFormat = hiveSerDe.outputFormat.orElse(tableDesc.storage.outputFormat),
+          serde = hiveSerDe.serde.orElse(tableDesc.storage.serde))
 
         children.collect {
           case list @ Token("TOK_TABCOLLIST", _) =>
@@ -440,13 +447,13 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
               // TODO support the nullFormat
               case _ => assert(false)
             }
-            tableDesc = tableDesc.copy(
-              serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala)
+            tableDesc = tableDesc.withNewStorage(
+              serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams.asScala)
           case Token("TOK_TABLELOCATION", child :: Nil) =>
             val location = EximUtil.relativeToAbsolutePath(hiveConf, unescapeSQLString(child.text))
-            tableDesc = tableDesc.copy(location = Option(location))
+            tableDesc = tableDesc.withNewStorage(locationUri = Option(location))
           case Token("TOK_TABLESERIALIZER", child :: Nil) =>
-            tableDesc = tableDesc.copy(
+            tableDesc = tableDesc.withNewStorage(
               serde = Option(unescapeSQLString(child.children.head.text)))
             if (child.numChildren == 2) {
               // This is based on the readProps(..) method in
@@ -459,59 +466,59 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
                     .orNull
                   (unescapeSQLString(prop), value)
               }.toMap
-              tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
+              tableDesc = tableDesc.withNewStorage(
+                serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams)
             }
           case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
             child.text.toLowerCase(Locale.ENGLISH) match {
               case "orc" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
                   outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
-                if (tableDesc.serde.isEmpty) {
-                  tableDesc = tableDesc.copy(
+                if (tableDesc.storage.serde.isEmpty) {
+                  tableDesc = tableDesc.withNewStorage(
                     serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
                 }
 
               case "parquet" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat =
                     Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
                   outputFormat =
                     Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
-                if (tableDesc.serde.isEmpty) {
-                  tableDesc = tableDesc.copy(
+                if (tableDesc.storage.serde.isEmpty) {
+                  tableDesc = tableDesc.withNewStorage(
                     serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
                 }
 
               case "rcfile" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
                   outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
-                if (tableDesc.serde.isEmpty) {
-                  tableDesc = tableDesc.copy(serde =
-                    Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+                if (tableDesc.storage.serde.isEmpty) {
+                  tableDesc = tableDesc.withNewStorage(
+                    serde =
+                      Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
                 }
 
               case "textfile" =>
-                tableDesc = tableDesc.copy(
-                  inputFormat =
-                    Option("org.apache.hadoop.mapred.TextInputFormat"),
-                  outputFormat =
-                    Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+                tableDesc = tableDesc.withNewStorage(
+                  inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+                  outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
 
               case "sequencefile" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
                   outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
 
               case "avro" =>
-                tableDesc = tableDesc.copy(
+                tableDesc = tableDesc.withNewStorage(
                   inputFormat =
                     Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
                   outputFormat =
                     Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
-                if (tableDesc.serde.isEmpty) {
-                  tableDesc = tableDesc.copy(
+                if (tableDesc.storage.serde.isEmpty) {
+                  tableDesc = tableDesc.withNewStorage(
                     serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
                 }
 
@@ -522,23 +529,21 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
 
           case Token("TOK_TABLESERIALIZER",
           Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
-            tableDesc = tableDesc.copy(serde = Option(unquoteString(serdeName)))
+            tableDesc = tableDesc.withNewStorage(serde = Option(unquoteString(serdeName)))
 
             otherProps match {
               case Token("TOK_TABLEPROPERTIES", list :: Nil) :: Nil =>
-                tableDesc = tableDesc.copy(
-                  serdeProperties = tableDesc.serdeProperties ++ getProperties(list))
+                tableDesc = tableDesc.withNewStorage(
+                  serdeProperties = tableDesc.storage.serdeProperties ++ getProperties(list))
               case _ =>
             }
 
           case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
             tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
           case list @ Token("TOK_TABLEFILEFORMAT", _) =>
-            tableDesc = tableDesc.copy(
-              inputFormat =
-                Option(unescapeSQLString(list.children.head.text)),
-              outputFormat =
-                Option(unescapeSQLString(list.children(1).text)))
+            tableDesc = tableDesc.withNewStorage(
+              inputFormat = Option(unescapeSQLString(list.children.head.text)),
+              outputFormat = Option(unescapeSQLString(list.children(1).text)))
           case Token("TOK_STORAGEHANDLER", _) =>
             throw new AnalysisException(
               "CREATE TABLE AS SELECT cannot be used for a non-native table")
@@ -678,15 +683,15 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
 
   // This is based the getColumns methods in
   // ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
-  protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[HiveColumn] = {
+  protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[CatalogColumn] = {
     node.children.map(_.children).collect {
       case Token(rawColName, Nil) :: colTypeNode :: comment =>
-        val colName = if (!lowerCase) rawColName
-        else rawColName.toLowerCase
-        HiveColumn(
-          cleanIdentifier(colName),
-          nodeToTypeString(colTypeNode),
-          comment.headOption.map(n => unescapeSQLString(n.text)).orNull)
+        val colName = if (!lowerCase) rawColName else rawColName.toLowerCase
+        CatalogColumn(
+          name = cleanIdentifier(colName),
+          dataType = nodeToTypeString(colTypeNode),
+          nullable = true,
+          comment.headOption.map(n => unescapeSQLString(n.text)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index f681cc6..6a0a089 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -18,67 +18,11 @@
 package org.apache.spark.sql.hive.client
 
 import java.io.PrintStream
-import java.util.{Map => JMap}
-import javax.annotation.Nullable
 
-import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 
-private[hive] case class HiveDatabase(name: String, location: String)
-
-private[hive] abstract class TableType { val name: String }
-private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
-private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
-private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
-private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
-
-// TODO: Use this for Tables and Partitions
-private[hive] case class HiveStorageDescriptor(
-    location: String,
-    inputFormat: String,
-    outputFormat: String,
-    serde: String,
-    serdeProperties: Map[String, String])
-
-private[hive] case class HivePartition(
-    values: Seq[String],
-    storage: HiveStorageDescriptor)
-
-private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
-private[hive] case class HiveTable(
-    specifiedDatabase: Option[String],
-    name: String,
-    schema: Seq[HiveColumn],
-    partitionColumns: Seq[HiveColumn],
-    properties: Map[String, String],
-    serdeProperties: Map[String, String],
-    tableType: TableType,
-    location: Option[String] = None,
-    inputFormat: Option[String] = None,
-    outputFormat: Option[String] = None,
-    serde: Option[String] = None,
-    viewText: Option[String] = None) {
-
-  @transient
-  private[client] var client: HiveClient = _
-
-  private[client] def withClient(ci: HiveClient): this.type = {
-    client = ci
-    this
-  }
-
-  def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
-
-  def isPartitioned: Boolean = partitionColumns.nonEmpty
-
-  def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
-
-  def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
-    client.getPartitionsByFilter(this, predicates)
-
-  // Hive does not support backticks when passing names to the client.
-  def qualifiedName: String = s"$database.$name"
-}
 
 /**
  * An externally visible interface to the Hive client.  This interface is shared across both the
@@ -106,6 +50,9 @@ private[hive] trait HiveClient {
   /** Returns the names of all tables in the given database. */
   def listTables(dbName: String): Seq[String]
 
+  /** Returns the names of tables in the given database that matches the given pattern. */
+  def listTables(dbName: String, pattern: String): Seq[String]
+
   /** Returns the name of the active database. */
   def currentDatabase: String
 
@@ -113,46 +60,133 @@ private[hive] trait HiveClient {
   def setCurrentDatabase(databaseName: String): Unit
 
   /** Returns the metadata for specified database, throwing an exception if it doesn't exist */
-  def getDatabase(name: String): HiveDatabase = {
-    getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
+  final def getDatabase(name: String): CatalogDatabase = {
+    getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name))
   }
 
   /** Returns the metadata for a given database, or None if it doesn't exist. */
-  def getDatabaseOption(name: String): Option[HiveDatabase]
+  def getDatabaseOption(name: String): Option[CatalogDatabase]
+
+  /** List the names of all the databases that match the specified pattern. */
+  def listDatabases(pattern: String): Seq[String]
 
   /** Returns the specified table, or throws [[NoSuchTableException]]. */
-  def getTable(dbName: String, tableName: String): HiveTable = {
-    getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
+  final def getTable(dbName: String, tableName: String): CatalogTable = {
+    getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName))
   }
 
-  /** Returns the metadata for the specified table or None if it doens't exist. */
-  def getTableOption(dbName: String, tableName: String): Option[HiveTable]
+  /** Returns the metadata for the specified table or None if it doesn't exist. */
+  def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
 
   /** Creates a view with the given metadata. */
-  def createView(view: HiveTable): Unit
+  def createView(view: CatalogTable): Unit
 
   /** Updates the given view with new metadata. */
-  def alertView(view: HiveTable): Unit
+  def alertView(view: CatalogTable): Unit
 
   /** Creates a table with the given metadata. */
-  def createTable(table: HiveTable): Unit
+  def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
 
-  /** Updates the given table with new metadata. */
-  def alterTable(table: HiveTable): Unit
+  /** Drop the specified table. */
+  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
+
+  /** Alter a table whose name matches the one specified in `table`, assuming it exists. */
+  final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table)
+
+  /** Updates the given table with new metadata, optionally renaming the table. */
+  def alterTable(tableName: String, table: CatalogTable): Unit
 
   /** Creates a new database with the given name. */
-  def createDatabase(database: HiveDatabase): Unit
+  def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
+
+  /**
+   * Drop the specified database, if it exists.
+   *
+   * @param name database to drop
+   * @param ignoreIfNotExists if true, do not throw error if the database does not exist
+   * @param cascade whether to remove all associated objects such as tables and functions
+   */
+  def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
+
+  /**
+   * Alter a database whose name matches the one specified in `database`, assuming it exists.
+   */
+  def alterDatabase(database: CatalogDatabase): Unit
+
+  /**
+   * Create one or many partitions in the given table.
+   */
+  def createPartitions(
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit
+
+  /**
+   * Drop one or many partitions in the given table.
+   *
+   * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the
+   * partitions do not already exist. The seemingly relevant flag `ifExists` in
+   * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere.
+   */
+  def dropPartitions(
+      db: String,
+      table: String,
+      specs: Seq[Catalog.TablePartitionSpec]): Unit
 
-  /** Returns the specified paritition or None if it does not exist. */
+  /**
+   * Rename one or many existing table partitions, assuming they exist.
+   */
+  def renamePartitions(
+      db: String,
+      table: String,
+      specs: Seq[Catalog.TablePartitionSpec],
+      newSpecs: Seq[Catalog.TablePartitionSpec]): Unit
+
+  /**
+   * Alter one or more table partitions whose specs match the ones specified in `newParts`,
+   * assuming the partitions exist.
+   */
+  def alterPartitions(
+      db: String,
+      table: String,
+      newParts: Seq[CatalogTablePartition]): Unit
+
+  /** Returns the specified partition, or throws [[NoSuchPartitionException]]. */
+  final def getPartition(
+      dbName: String,
+      tableName: String,
+      spec: Catalog.TablePartitionSpec): CatalogTablePartition = {
+    getPartitionOption(dbName, tableName, spec).getOrElse {
+      throw new NoSuchPartitionException(dbName, tableName, spec)
+    }
+  }
+
+  /** Returns the specified partition or None if it does not exist. */
+  final def getPartitionOption(
+      db: String,
+      table: String,
+      spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = {
+    getPartitionOption(getTable(db, table), spec)
+  }
+
+  /** Returns the specified partition or None if it does not exist. */
   def getPartitionOption(
-      hTable: HiveTable,
-      partitionSpec: JMap[String, String]): Option[HivePartition]
+      table: CatalogTable,
+      spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition]
+
+  /** Returns all partitions for the given table. */
+  final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = {
+    getAllPartitions(getTable(db, table))
+  }
 
   /** Returns all partitions for the given table. */
-  def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
+  def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition]
 
   /** Returns partitions filtered by predicates for the given table. */
-  def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
+  def getPartitionsByFilter(
+      table: CatalogTable,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition]
 
   /** Loads a static partition into an existing table. */
   def loadPartition(
@@ -181,6 +215,29 @@ private[hive] trait HiveClient {
       holdDDLTime: Boolean,
       listBucketingEnabled: Boolean): Unit
 
+  /** Create a function in an existing database. */
+  def createFunction(db: String, func: CatalogFunction): Unit
+
+  /** Drop an existing function an the database. */
+  def dropFunction(db: String, name: String): Unit
+
+  /** Rename an existing function in the database. */
+  def renameFunction(db: String, oldName: String, newName: String): Unit
+
+  /** Alter a function whose name matches the one specified in `func`, assuming it exists. */
+  def alterFunction(db: String, func: CatalogFunction): Unit
+
+  /** Return an existing function in the database, assuming it exists. */
+  final def getFunction(db: String, name: String): CatalogFunction = {
+    getFunctionOption(db, name).getOrElse(throw new NoSuchFunctionException(db, name))
+  }
+
+  /** Return an existing function in the database, or None if it doesn't exist. */
+  def getFunctionOption(db: String, name: String): Option[CatalogFunction]
+
+  /** Return the names of all functions that match the given pattern in the database. */
+  def listFunctions(db: String, pattern: String): Seq[String]
+
   /** Add a jar into class loader */
   def addJar(path: String): Unit
 
@@ -192,4 +249,5 @@ private[hive] trait HiveClient {
 
   /** Used for testing only.  Removes all metadata from this instance of Hive. */
   def reset(): Unit
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index cf1ff55..7a007d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -18,24 +18,25 @@
 package org.apache.spark.sql.hive.client
 
 import java.io.{File, PrintStream}
-import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.language.reflectiveCalls
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{TableType => HTableType}
-import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
-import org.apache.hadoop.hive.ql.{metadata, Driver}
-import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri}
+import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -234,167 +235,184 @@ private[hive] class HiveClientImpl(
     if (getDatabaseOption(databaseName).isDefined) {
       state.setCurrentDatabase(databaseName)
     } else {
-      throw new NoSuchDatabaseException
+      throw new NoSuchDatabaseException(databaseName)
     }
   }
 
-  override def createDatabase(database: HiveDatabase): Unit = withHiveState {
+  override def createDatabase(
+      database: CatalogDatabase,
+      ignoreIfExists: Boolean): Unit = withHiveState {
     client.createDatabase(
-      new Database(
+      new HiveDatabase(
         database.name,
-        "",
-        new File(database.location).toURI.toString,
-        new java.util.HashMap),
-        true)
+        database.description,
+        database.locationUri,
+        database.properties.asJava),
+        ignoreIfExists)
   }
 
-  override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
+  override def dropDatabase(
+      name: String,
+      ignoreIfNotExists: Boolean,
+      cascade: Boolean): Unit = withHiveState {
+    client.dropDatabase(name, true, ignoreIfNotExists, cascade)
+  }
+
+  override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
+    client.alterDatabase(
+      database.name,
+      new HiveDatabase(
+        database.name,
+        database.description,
+        database.locationUri,
+        database.properties.asJava))
+  }
+
+  override def getDatabaseOption(name: String): Option[CatalogDatabase] = withHiveState {
     Option(client.getDatabase(name)).map { d =>
-      HiveDatabase(
+      CatalogDatabase(
         name = d.getName,
-        location = d.getLocationUri)
+        description = d.getDescription,
+        locationUri = d.getLocationUri,
+        properties = d.getParameters.asScala.toMap)
     }
   }
 
+  override def listDatabases(pattern: String): Seq[String] = withHiveState {
+    client.getDatabasesByPattern(pattern).asScala.toSeq
+  }
+
   override def getTableOption(
       dbName: String,
-      tableName: String): Option[HiveTable] = withHiveState {
-
+      tableName: String): Option[CatalogTable] = withHiveState {
     logDebug(s"Looking up $dbName.$tableName")
-
-    val hiveTable = Option(client.getTable(dbName, tableName, false))
-    val converted = hiveTable.map { h =>
-
-      HiveTable(
-        name = h.getTableName,
+    Option(client.getTable(dbName, tableName, false)).map { h =>
+      CatalogTable(
         specifiedDatabase = Option(h.getDbName),
-        schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
-        partitionColumns = h.getPartCols.asScala.map(f =>
-          HiveColumn(f.getName, f.getType, f.getComment)),
-        properties = h.getParameters.asScala.toMap,
-        serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap,
+        name = h.getTableName,
         tableType = h.getTableType match {
-          case HTableType.MANAGED_TABLE => ManagedTable
-          case HTableType.EXTERNAL_TABLE => ExternalTable
-          case HTableType.VIRTUAL_VIEW => VirtualView
-          case HTableType.INDEX_TABLE => IndexTable
+          case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
+          case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
+          case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE
+          case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW
         },
-        location = shim.getDataLocation(h),
-        inputFormat = Option(h.getInputFormatClass).map(_.getName),
-        outputFormat = Option(h.getOutputFormatClass).map(_.getName),
-        serde = Option(h.getSerializationLib),
-        viewText = Option(h.getViewExpandedText)).withClient(this)
+        schema = h.getCols.asScala.map(fromHiveColumn),
+        partitionColumns = h.getPartCols.asScala.map(fromHiveColumn),
+        sortColumns = Seq(),
+        numBuckets = h.getNumBuckets,
+        createTime = h.getTTable.getCreateTime.toLong * 1000,
+        lastAccessTime = h.getLastAccessTime.toLong * 1000,
+        storage = CatalogStorageFormat(
+          locationUri = shim.getDataLocation(h),
+          inputFormat = Option(h.getInputFormatClass).map(_.getName),
+          outputFormat = Option(h.getOutputFormatClass).map(_.getName),
+          serde = Option(h.getSerializationLib),
+          serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap
+        ),
+        properties = h.getParameters.asScala.toMap,
+        viewOriginalText = Option(h.getViewOriginalText),
+        viewText = Option(h.getViewExpandedText))
     }
-    converted
   }
 
-  private def toInputFormat(name: String) =
-    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
-
-  private def toOutputFormat(name: String) =
-    Utils.classForName(name)
-      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
-
-  private def toQlTable(table: HiveTable): metadata.Table = {
-    val qlTable = new metadata.Table(table.database, table.name)
-
-    qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    qlTable.setPartCols(
-      table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
-    table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
-
-    // set owner
-    qlTable.setOwner(conf.getUser)
-    // set create time
-    qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
-    table.location.foreach { loc => shim.setDataLocation(qlTable, loc) }
-    table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass)
-    table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass)
-    table.serde.foreach(qlTable.setSerializationLib)
-
-    qlTable
+  override def createView(view: CatalogTable): Unit = withHiveState {
+    client.createTable(toHiveViewTable(view))
   }
 
-  private def toViewTable(view: HiveTable): metadata.Table = {
-    // TODO: this is duplicated with `toQlTable` except the table type stuff.
-    val tbl = new metadata.Table(view.database, view.name)
-    tbl.setTableType(HTableType.VIRTUAL_VIEW)
-    tbl.setSerializationLib(null)
-    tbl.clearSerDeInfo()
-
-    // TODO: we will save the same SQL string to original and expanded text, which is different
-    // from Hive.
-    tbl.setViewOriginalText(view.viewText.get)
-    tbl.setViewExpandedText(view.viewText.get)
-
-    tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-    view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
-
-    // set owner
-    tbl.setOwner(conf.getUser)
-    // set create time
-    tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
-    tbl
+  override def alertView(view: CatalogTable): Unit = withHiveState {
+    client.alterTable(view.qualifiedName, toHiveViewTable(view))
   }
 
-  override def createView(view: HiveTable): Unit = withHiveState {
-    client.createTable(toViewTable(view))
+  override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
+    client.createTable(toHiveTable(table), ignoreIfExists)
   }
 
-  override def alertView(view: HiveTable): Unit = withHiveState {
-    client.alterTable(view.qualifiedName, toViewTable(view))
+  override def dropTable(
+      dbName: String,
+      tableName: String,
+      ignoreIfNotExists: Boolean): Unit = withHiveState {
+    client.dropTable(dbName, tableName, true, ignoreIfNotExists)
   }
 
-  override def createTable(table: HiveTable): Unit = withHiveState {
-    val qlTable = toQlTable(table)
-    client.createTable(qlTable)
+  override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
+    val hiveTable = toHiveTable(table)
+    // Do not use `table.qualifiedName` here because this may be a rename
+    val qualifiedTableName = s"${table.database}.$tableName"
+    client.alterTable(qualifiedTableName, hiveTable)
   }
 
-  override def alterTable(table: HiveTable): Unit = withHiveState {
-    val qlTable = toQlTable(table)
-    client.alterTable(table.qualifiedName, qlTable)
+  override def createPartitions(
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = withHiveState {
+    val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
+    parts.foreach { s =>
+      addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
+    }
+    client.createPartitions(addPartitionDesc)
+  }
+
+  override def dropPartitions(
+      db: String,
+      table: String,
+      specs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState {
+    // TODO: figure out how to drop multiple partitions in one call
+    specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) }
+  }
+
+  override def renamePartitions(
+      db: String,
+      table: String,
+      specs: Seq[Catalog.TablePartitionSpec],
+      newSpecs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState {
+    require(specs.size == newSpecs.size, "number of old and new partition specs differ")
+    val catalogTable = getTable(db, table)
+    val hiveTable = toHiveTable(catalogTable)
+    specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
+      val hivePart = getPartitionOption(catalogTable, oldSpec)
+        .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
+        .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) }
+      client.renamePartition(hiveTable, oldSpec.asJava, hivePart)
+    }
   }
 
-  private def toHivePartition(partition: metadata.Partition): HivePartition = {
-    val apiPartition = partition.getTPartition
-    HivePartition(
-      values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
-      storage = HiveStorageDescriptor(
-        location = apiPartition.getSd.getLocation,
-        inputFormat = apiPartition.getSd.getInputFormat,
-        outputFormat = apiPartition.getSd.getOutputFormat,
-        serde = apiPartition.getSd.getSerdeInfo.getSerializationLib,
-        serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
+  override def alterPartitions(
+      db: String,
+      table: String,
+      newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
+    val hiveTable = toHiveTable(getTable(db, table))
+    client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
   }
 
   override def getPartitionOption(
-      table: HiveTable,
-      partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState {
-
-    val qlTable = toQlTable(table)
-    val qlPartition = client.getPartition(qlTable, partitionSpec, false)
-    Option(qlPartition).map(toHivePartition)
+      table: CatalogTable,
+      spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
+    val hiveTable = toHiveTable(table)
+    val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
+    Option(hivePartition).map(fromHivePartition)
   }
 
-  override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
-    val qlTable = toQlTable(hTable)
-    shim.getAllPartitions(client, qlTable).map(toHivePartition)
+  override def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] = withHiveState {
+    val hiveTable = toHiveTable(table)
+    shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
   }
 
   override def getPartitionsByFilter(
-      hTable: HiveTable,
-      predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
-    val qlTable = toQlTable(hTable)
-    shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
+      table: CatalogTable,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
+    val hiveTable = toHiveTable(table)
+    shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
   }
 
   override def listTables(dbName: String): Seq[String] = withHiveState {
     client.getAllTables(dbName).asScala
   }
 
+  override def listTables(dbName: String, pattern: String): Seq[String] = withHiveState {
+    client.getTablesByPattern(dbName, pattern).asScala
+  }
+
   /**
    * Runs the specified SQL query using Hive.
    */
@@ -508,6 +526,34 @@ private[hive] class HiveClientImpl(
       listBucketingEnabled)
   }
 
+  override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
+    client.createFunction(toHiveFunction(func, db))
+  }
+
+  override def dropFunction(db: String, name: String): Unit = withHiveState {
+    client.dropFunction(db, name)
+  }
+
+  override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState {
+    val catalogFunc = getFunction(db, oldName).copy(name = newName)
+    val hiveFunc = toHiveFunction(catalogFunc, db)
+    client.alterFunction(db, oldName, hiveFunc)
+  }
+
+  override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState {
+    client.alterFunction(db, func.name, toHiveFunction(func, db))
+  }
+
+  override def getFunctionOption(
+      db: String,
+      name: String): Option[CatalogFunction] = withHiveState {
+    Option(client.getFunction(db, name)).map(fromHiveFunction)
+  }
+
+  override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {
+    client.getFunctions(db, pattern).asScala
+  }
+
   def addJar(path: String): Unit = {
     val uri = new Path(path).toUri
     val jarURL = if (uri.getScheme == null) {
@@ -541,4 +587,97 @@ private[hive] class HiveClientImpl(
         client.dropDatabase(db, true, false, true)
       }
   }
+
+
+  /* -------------------------------------------------------- *
+   |  Helper methods for converting to and from Hive classes  |
+   * -------------------------------------------------------- */
+
+  private def toInputFormat(name: String) =
+    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+
+  private def toOutputFormat(name: String) =
+    Utils.classForName(name)
+      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+
+  private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+    new HiveFunction(
+      f.name,
+      db,
+      f.className,
+      null,
+      PrincipalType.USER,
+      (System.currentTimeMillis / 1000).toInt,
+      FunctionType.JAVA,
+      List.empty[ResourceUri].asJava)
+  }
+
+  private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
+    new CatalogFunction(hf.getFunctionName, hf.getClassName)
+  }
+
+  private def toHiveColumn(c: CatalogColumn): FieldSchema = {
+    new FieldSchema(c.name, c.dataType, c.comment.orNull)
+  }
+
+  private def fromHiveColumn(hc: FieldSchema): CatalogColumn = {
+    new CatalogColumn(
+      name = hc.getName,
+      dataType = hc.getType,
+      nullable = true,
+      comment = Option(hc.getComment))
+  }
+
+  private def toHiveTable(table: CatalogTable): HiveTable = {
+    val hiveTable = new HiveTable(table.database, table.name)
+    hiveTable.setTableType(table.tableType match {
+      case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE
+      case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE
+      case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE
+      case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW
+    })
+    hiveTable.setFields(table.schema.map(toHiveColumn).asJava)
+    hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava)
+    // TODO: set sort columns here too
+    hiveTable.setOwner(conf.getUser)
+    hiveTable.setNumBuckets(table.numBuckets)
+    hiveTable.setCreateTime((table.createTime / 1000).toInt)
+    hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
+    table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
+    table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
+    table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
+    table.storage.serde.foreach(hiveTable.setSerializationLib)
+    table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) }
+    table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) }
+    table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) }
+    table.viewText.foreach { t => hiveTable.setViewExpandedText(t) }
+    hiveTable
+  }
+
+  private def toHiveViewTable(view: CatalogTable): HiveTable = {
+    val tbl = toHiveTable(view)
+    tbl.setTableType(HiveTableType.VIRTUAL_VIEW)
+    tbl.setSerializationLib(null)
+    tbl.clearSerDeInfo()
+    tbl
+  }
+
+  private def toHivePartition(
+      p: CatalogTablePartition,
+      ht: HiveTable): HivePartition = {
+    new HivePartition(ht, p.spec.asJava, p.storage.locationUri.map { l => new Path(l) }.orNull)
+  }
+
+  private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
+    val apiPartition = hp.getTPartition
+    CatalogTablePartition(
+      spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
+      storage = CatalogStorageFormat(
+        locationUri = Option(apiPartition.getSd.getLocation),
+        inputFormat = Option(apiPartition.getSd.getInputFormat),
+        outputFormat = Option(apiPartition.getSd.getOutputFormat),
+        serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
+        serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 4c0aae6..3f81c99 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation}
-import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
 
 /**
  * Create table and insert the query result into it.
@@ -33,7 +33,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
  */
 private[hive]
 case class CreateTableAsSelect(
-    tableDesc: HiveTable,
+    tableDesc: CatalogTable,
     query: LogicalPlan,
     allowExisting: Boolean)
   extends RunnableCommand {
@@ -51,25 +51,25 @@ case class CreateTableAsSelect(
       import org.apache.hadoop.mapred.TextInputFormat
 
       val withFormat =
-        tableDesc.copy(
+        tableDesc.withNewStorage(
           inputFormat =
-            tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
+            tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
           outputFormat =
-            tableDesc.outputFormat
+            tableDesc.storage.outputFormat
               .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
-          serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName())))
+          serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)))
 
       val withSchema = if (withFormat.schema.isEmpty) {
         // Hive doesn't support specifying the column list for target table in CTAS
         // However we don't think SparkSQL should follow that.
-        tableDesc.copy(schema =
-        query.output.map(c =>
-          HiveColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType), null)))
+        tableDesc.copy(schema = query.output.map { c =>
+          CatalogColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType))
+        })
       } else {
         withFormat
       }
 
-      hiveContext.catalog.client.createTable(withSchema)
+      hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false)
 
       // Get the Metastore Relation
       hiveContext.catalog.lookupRelation(tableIdentifier, None) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 5da58a7..2914d03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -21,11 +21,11 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder}
-import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
+import org.apache.spark.sql.hive.{ HiveContext, HiveMetastoreTypes, SQLBuilder}
 
 /**
  * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
@@ -34,7 +34,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
 // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
 // from Hive and may not work for some cases like create view on self join.
 private[hive] case class CreateViewAsSelect(
-    tableDesc: HiveTable,
+    tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean,
     orReplace: Boolean) extends RunnableCommand {
@@ -72,7 +72,7 @@ private[hive] case class CreateViewAsSelect(
     Seq.empty[Row]
   }
 
-  private def prepareTable(sqlContext: SQLContext): HiveTable = {
+  private def prepareTable(sqlContext: SQLContext): CatalogTable = {
     val expandedText = if (sqlContext.conf.canonicalView) {
       try rebuildViewQueryString(sqlContext) catch {
         case NonFatal(e) => wrapViewTextWithSelect
@@ -83,12 +83,16 @@ private[hive] case class CreateViewAsSelect(
 
     val viewSchema = {
       if (tableDesc.schema.isEmpty) {
-        childSchema.map { attr =>
-          HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+        childSchema.map { a =>
+          CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType))
         }
       } else {
-        childSchema.zip(tableDesc.schema).map { case (attr, col) =>
-          HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+        childSchema.zip(tableDesc.schema).map { case (a, col) =>
+          CatalogColumn(
+            col.name,
+            HiveMetastoreTypes.toMetastoreType(a.dataType),
+            nullable = true,
+            col.comment)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index feb133d..d316664 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -205,7 +205,7 @@ case class InsertIntoHiveTable(
         val oldPart =
           catalog.client.getPartitionOption(
             catalog.client.getTable(table.databaseName, table.tableName),
-            partitionSpec.asJava)
+            partitionSpec)
 
         if (oldPart.isEmpty || !ifNotExists) {
             catalog.client.loadPartition(

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
new file mode 100644
index 0000000..f73e7e2
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
+import org.apache.spark.util.Utils
+
+
+/**
+ * Test suite for the [[HiveCatalog]].
+ */
+class HiveCatalogSuite extends CatalogTestCases {
+
+  private val client: HiveClient = {
+    IsolatedClientLoader.forVersion(
+      hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+      hadoopVersion = VersionInfo.getVersion).createClient()
+  }
+
+  protected override val tableInputFormat: String =
+    "org.apache.hadoop.mapred.SequenceFileInputFormat"
+  protected override val tableOutputFormat: String =
+    "org.apache.hadoop.mapred.SequenceFileOutputFormat"
+
+  protected override def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+
+  protected override def resetState(): Unit = client.reset()
+
+  protected override def newEmptyCatalog(): Catalog = new HiveCatalog(client)
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 14a83d5..f8764d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{QueryTest, Row, SaveMode, SQLConf}
-import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
@@ -83,16 +83,16 @@ class DataSourceWithHiveMetastoreCatalogSuite
         }
 
         val hiveTable = catalog.client.getTable("default", "t")
-        assert(hiveTable.inputFormat === Some(inputFormat))
-        assert(hiveTable.outputFormat === Some(outputFormat))
-        assert(hiveTable.serde === Some(serde))
+        assert(hiveTable.storage.inputFormat === Some(inputFormat))
+        assert(hiveTable.storage.outputFormat === Some(outputFormat))
+        assert(hiveTable.storage.serde === Some(serde))
 
-        assert(!hiveTable.isPartitioned)
-        assert(hiveTable.tableType === ManagedTable)
+        assert(hiveTable.partitionColumns.isEmpty)
+        assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE)
 
         val columns = hiveTable.schema
         assert(columns.map(_.name) === Seq("d1", "d2"))
-        assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string"))
+        assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
 
         checkAnswer(table("t"), testDF)
         assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
@@ -114,16 +114,17 @@ class DataSourceWithHiveMetastoreCatalogSuite
           }
 
           val hiveTable = catalog.client.getTable("default", "t")
-          assert(hiveTable.inputFormat === Some(inputFormat))
-          assert(hiveTable.outputFormat === Some(outputFormat))
-          assert(hiveTable.serde === Some(serde))
+          assert(hiveTable.storage.inputFormat === Some(inputFormat))
+          assert(hiveTable.storage.outputFormat === Some(outputFormat))
+          assert(hiveTable.storage.serde === Some(serde))
 
-          assert(hiveTable.tableType === ExternalTable)
-          assert(hiveTable.location.get === path.toURI.toString.stripSuffix(File.separator))
+          assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
+          assert(hiveTable.storage.locationUri ===
+            Some(path.toURI.toString.stripSuffix(File.separator)))
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
-          assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string"))
+          assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
 
           checkAnswer(table("t"), testDF)
           assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
@@ -143,17 +144,16 @@ class DataSourceWithHiveMetastoreCatalogSuite
              """.stripMargin)
 
           val hiveTable = catalog.client.getTable("default", "t")
-          assert(hiveTable.inputFormat === Some(inputFormat))
-          assert(hiveTable.outputFormat === Some(outputFormat))
-          assert(hiveTable.serde === Some(serde))
+          assert(hiveTable.storage.inputFormat === Some(inputFormat))
+          assert(hiveTable.storage.outputFormat === Some(outputFormat))
+          assert(hiveTable.storage.serde === Some(serde))
 
-          assert(hiveTable.isPartitioned === false)
-          assert(hiveTable.tableType === ExternalTable)
-          assert(hiveTable.partitionColumns.length === 0)
+          assert(hiveTable.partitionColumns.isEmpty)
+          assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))
-          assert(columns.map(_.hiveType) === Seq("int", "string"))
+          assert(columns.map(_.dataType) === Seq("int", "string"))
 
           checkAnswer(table("t"), Row(1, "val_1"))
           assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
index 137dadd..e869c0e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -22,15 +22,15 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.SimpleParserConf
 import org.apache.spark.sql.catalyst.plans.logical.Generate
-import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, HiveTable, ManagedTable}
 
 class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
   val parser = new HiveQl(SimpleParserConf())
 
-  private def extractTableDesc(sql: String): (HiveTable, Boolean) = {
+  private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
       case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting)
     }.head
@@ -53,28 +53,29 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
         |AS SELECT * FROM src""".stripMargin
 
     val (desc, exists) = extractTableDesc(s1)
-    assert(exists == true)
+    assert(exists)
     assert(desc.specifiedDatabase == Some("mydb"))
     assert(desc.name == "page_view")
-    assert(desc.tableType == ExternalTable)
-    assert(desc.location == Some("/user/external/page_view"))
+    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.storage.locationUri == Some("/user/external/page_view"))
     assert(desc.schema ==
-      HiveColumn("viewtime", "int", null) ::
-        HiveColumn("userid", "bigint", null) ::
-        HiveColumn("page_url", "string", null) ::
-        HiveColumn("referrer_url", "string", null) ::
-        HiveColumn("ip", "string", "IP Address of the User") ::
-        HiveColumn("country", "string", "country of origination") :: Nil)
+      CatalogColumn("viewtime", "int") ::
+      CatalogColumn("userid", "bigint") ::
+      CatalogColumn("page_url", "string") ::
+      CatalogColumn("referrer_url", "string") ::
+      CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
+      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
     // TODO will be SQLText
     assert(desc.viewText == Option("This is the staging page view table"))
     assert(desc.partitionColumns ==
-      HiveColumn("dt", "string", "date type") ::
-        HiveColumn("hour", "string", "hour of the day") :: Nil)
-    assert(desc.serdeProperties ==
+      CatalogColumn("dt", "string", comment = Some("date type")) ::
+      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+    assert(desc.storage.serdeProperties ==
       Map((serdeConstants.SERIALIZATION_FORMAT, "\054"), (serdeConstants.FIELD_DELIM, "\054")))
-    assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
-    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
-    assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+    assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+    assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+    assert(desc.storage.serde ==
+      Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
     assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
   }
 
@@ -98,27 +99,27 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
         |AS SELECT * FROM src""".stripMargin
 
     val (desc, exists) = extractTableDesc(s2)
-    assert(exists == true)
+    assert(exists)
     assert(desc.specifiedDatabase == Some("mydb"))
     assert(desc.name == "page_view")
-    assert(desc.tableType == ExternalTable)
-    assert(desc.location == Some("/user/external/page_view"))
+    assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+    assert(desc.storage.locationUri == Some("/user/external/page_view"))
     assert(desc.schema ==
-      HiveColumn("viewtime", "int", null) ::
-        HiveColumn("userid", "bigint", null) ::
-        HiveColumn("page_url", "string", null) ::
-        HiveColumn("referrer_url", "string", null) ::
-        HiveColumn("ip", "string", "IP Address of the User") ::
-        HiveColumn("country", "string", "country of origination") :: Nil)
+      CatalogColumn("viewtime", "int") ::
+      CatalogColumn("userid", "bigint") ::
+      CatalogColumn("page_url", "string") ::
+      CatalogColumn("referrer_url", "string") ::
+      CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
+      CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
     // TODO will be SQLText
     assert(desc.viewText == Option("This is the staging page view table"))
     assert(desc.partitionColumns ==
-      HiveColumn("dt", "string", "date type") ::
-        HiveColumn("hour", "string", "hour of the day") :: Nil)
-    assert(desc.serdeProperties == Map())
-    assert(desc.inputFormat == Option("parquet.hive.DeprecatedParquetInputFormat"))
-    assert(desc.outputFormat == Option("parquet.hive.DeprecatedParquetOutputFormat"))
-    assert(desc.serde == Option("parquet.hive.serde.ParquetHiveSerDe"))
+      CatalogColumn("dt", "string", comment = Some("date type")) ::
+      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+    assert(desc.storage.serdeProperties == Map())
+    assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
+    assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
+    assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe"))
     assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
   }
 
@@ -128,14 +129,15 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
     assert(exists == false)
     assert(desc.specifiedDatabase == None)
     assert(desc.name == "page_view")
-    assert(desc.tableType == ManagedTable)
-    assert(desc.location == None)
-    assert(desc.schema == Seq.empty[HiveColumn])
+    assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+    assert(desc.storage.locationUri == None)
+    assert(desc.schema == Seq.empty[CatalogColumn])
     assert(desc.viewText == None) // TODO will be SQLText
-    assert(desc.serdeProperties == Map())
-    assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
-    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
-    assert(desc.serde.isEmpty)
+    assert(desc.storage.serdeProperties == Map())
+    assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
+    assert(desc.storage.outputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+    assert(desc.storage.serde.isEmpty)
     assert(desc.properties == Map())
   }
 
@@ -162,14 +164,14 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
     assert(exists == false)
     assert(desc.specifiedDatabase == None)
     assert(desc.name == "ctas2")
-    assert(desc.tableType == ManagedTable)
-    assert(desc.location == None)
-    assert(desc.schema == Seq.empty[HiveColumn])
+    assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+    assert(desc.storage.locationUri == None)
+    assert(desc.schema == Seq.empty[CatalogColumn])
     assert(desc.viewText == None) // TODO will be SQLText
-    assert(desc.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
-    assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
-    assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
-    assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
+    assert(desc.storage.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
+    assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+    assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+    assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
     assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d9e4b02..0c288bd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -25,9 +25,9 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
-import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
@@ -724,20 +724,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     val tableName = "spark6655"
     withTable(tableName) {
       val schema = StructType(StructField("int", IntegerType, true) :: Nil)
-      val hiveTable = HiveTable(
+      val hiveTable = CatalogTable(
         specifiedDatabase = Some("default"),
         name = tableName,
+        tableType = CatalogTableType.MANAGED_TABLE,
         schema = Seq.empty,
-        partitionColumns = Seq.empty,
+        storage = CatalogStorageFormat(
+          locationUri = None,
+          inputFormat = None,
+          outputFormat = None,
+          serde = None,
+          serdeProperties = Map(
+            "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
+        ),
         properties = Map(
           "spark.sql.sources.provider" -> "json",
           "spark.sql.sources.schema" -> schema.json,
-          "EXTERNAL" -> "FALSE"),
-        tableType = ManagedTable,
-        serdeProperties = Map(
-          "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))))
+          "EXTERNAL" -> "FALSE"))
 
-      catalog.client.createTable(hiveTable)
+      catalog.client.createTable(hiveTable, ignoreIfExists = false)
 
       invalidateTable(tableName)
       val actualSchema = table(tableName).schema
@@ -916,7 +921,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     // As a proxy for verifying that the table was stored in Hive compatible format, we verify that
     // each column of the table is of native type StringType.
     assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema
-      .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == StringType))
+      .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
 
     catalog.createDataSourceTable(
       tableIdent = TableIdentifier("skip_hive_metadata"),
@@ -930,6 +935,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     // As a proxy for verifying that the table was stored in SparkSQL format, we verify that
     // the table has a column type as array of StringType.
     assert(catalog.client.getTable("default", "skip_hive_metadata").schema
-      .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == ArrayType(StringType)))
+      .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType)))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index c2c896e..488f298 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -26,9 +26,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
 
   private def checkTablePath(dbName: String, tableName: String): Unit = {
     val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName)
-    val expectedPath = hiveContext.catalog.client.getDatabase(dbName).location + "/" + tableName
+    val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
 
-    assert(metastoreTable.serdeProperties("path") === expectedPath)
+    assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
   }
 
   test(s"saveAsTable() to non-default database - with USE - Overwrite") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 1344a2c..d850d52 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.hive.HiveContext
@@ -60,8 +61,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
       hadoopVersion = VersionInfo.getVersion,
       config = buildConf(),
       ivyPath = ivyPath).createClient()
-    val db = new HiveDatabase("default", "")
-    badClient.createDatabase(db)
+    val db = new CatalogDatabase("default", "desc", "loc", Map())
+    badClient.createDatabase(db, ignoreIfExists = true)
   }
 
   private def getNestedMessages(e: Throwable): String = {
@@ -116,29 +117,27 @@ class VersionsSuite extends SparkFunSuite with Logging {
     }
 
     test(s"$version: createDatabase") {
-      val db = HiveDatabase("default", "")
-      client.createDatabase(db)
+      val db = CatalogDatabase("default", "desc", "loc", Map())
+      client.createDatabase(db, ignoreIfExists = true)
     }
 
     test(s"$version: createTable") {
       val table =
-        HiveTable(
+        CatalogTable(
           specifiedDatabase = Option("default"),
           name = "src",
-          schema = Seq(HiveColumn("key", "int", "")),
-          partitionColumns = Seq.empty,
-          properties = Map.empty,
-          serdeProperties = Map.empty,
-          tableType = ManagedTable,
-          location = None,
-          inputFormat =
-            Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
-          outputFormat =
-            Some(classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
-          serde =
-            Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()))
-
-      client.createTable(table)
+          tableType = CatalogTableType.MANAGED_TABLE,
+          schema = Seq(CatalogColumn("key", "int")),
+          storage = CatalogStorageFormat(
+            locationUri = None,
+            inputFormat = Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
+            outputFormat = Some(
+              classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
+            serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()),
+            serdeProperties = Map.empty
+          ))
+
+      client.createTable(table, ignoreIfExists = false)
     }
 
     test(s"$version: getTable") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c3832b2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index b91248b..37c0179 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -149,7 +149,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
       val (actualScannedColumns, actualPartValues) = plan.collect {
         case p @ HiveTableScan(columns, relation, _) =>
           val columnNames = columns.map(_.name)
-          val partValues = if (relation.table.isPartitioned) {
+          val partValues = if (relation.table.partitionColumns.nonEmpty) {
             p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
           } else {
             Seq.empty


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message