spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-14821][SQL] Implement AnalyzeTable in sql/core and remove HiveSqlAstBuilder
Date Fri, 22 Apr 2016 00:41:33 GMT
Repository: spark
Updated Branches:
  refs/heads/master 4e726227a -> f181aee07


[SPARK-14821][SQL] Implement AnalyzeTable in sql/core and remove HiveSqlAstBuilder

## What changes were proposed in this pull request?
This patch moves analyze table parsing into SparkSqlAstBuilder and removes HiveSqlAstBuilder.

In order to avoid extensive refactoring, I created a common trait for CatalogRelation and
MetastoreRelation, and match on that. In the future we should probably just consolidate the
two into a single thing so we don't need this common trait.

## How was this patch tested?
Updated unit tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #12584 from rxin/SPARK-14821.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f181aee0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f181aee0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f181aee0

Branch: refs/heads/master
Commit: f181aee07c0ee105b2a34581105eeeada7d42363
Parents: 4e72622
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu Apr 21 17:41:29 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Apr 21 17:41:29 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |   2 +-
 .../spark/sql/catalyst/catalog/interface.scala  |  22 +++-
 .../catalyst/catalog/SessionCatalogSuite.scala  |   8 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  19 ++++
 .../sql/execution/command/AnalyzeTable.scala    | 111 +++++++++++++++++++
 .../spark/sql/internal/SessionState.scala       |  10 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   4 +-
 .../spark/sql/hive/HiveSessionState.scala       |  18 ---
 .../spark/sql/hive/MetastoreRelation.scala      |  62 +++++------
 .../sql/hive/execution/HiveSqlParser.scala      |  70 ------------
 .../spark/sql/hive/execution/commands.scala     |  92 +--------------
 .../apache/spark/sql/hive/StatisticsSuite.scala |   5 +-
 .../spark/sql/hive/execution/PruningSuite.scala |   2 +-
 13 files changed, 199 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 34e1cb7..ab5124e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -266,7 +266,7 @@ class SessionCatalog(
     val relation =
       if (name.database.isDefined || !tempTables.contains(table)) {
         val metadata = externalCatalog.getTable(db, table)
-        CatalogRelation(db, metadata, alias)
+        SimpleCatalogRelation(db, metadata, alias)
       } else {
         tempTables(table)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index ad989a9..d2294ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -295,17 +295,31 @@ object ExternalCatalog {
 
 
 /**
+ * An interface that is implemented by logical plans to return the underlying catalog table.
+ * If we can in the future consolidate SimpleCatalogRelation and MetastoreRelation, we should
+ * probably remove this interface.
+ */
+trait CatalogRelation {
+  def catalogTable: CatalogTable
+}
+
+
+/**
  * A [[LogicalPlan]] that wraps [[CatalogTable]].
+ *
+ * Note that in the future we should consolidate this and HiveCatalogRelation.
  */
-case class CatalogRelation(
-    db: String,
+case class SimpleCatalogRelation(
+    databaseName: String,
     metadata: CatalogTable,
     alias: Option[String] = None)
-  extends LeafNode {
+  extends LeafNode with CatalogRelation {
 
   // TODO: implement this
   override def output: Seq[Attribute] = Seq.empty
 
-  require(metadata.identifier.database == Some(db),
+  override def catalogTable: CatalogTable = metadata
+
+  require(metadata.identifier.database == Some(databaseName),
     "provided database does not match the one specified in the table definition")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 426273e..27205c4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -372,25 +372,25 @@ class SessionCatalogSuite extends SparkFunSuite {
     sessionCatalog.setCurrentDatabase("db2")
     // If we explicitly specify the database, we'll look up the relation in that database
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
-      == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1)))
+      == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
     // Otherwise, we'll first look up a temporary table with the same name
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
       == SubqueryAlias("tbl1", tempTable1))
     // Then, if that does not exist, look up the relation in the current database
     sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
-      == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1)))
+      == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
   }
 
   test("lookup table relation with alias") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val alias = "monster"
     val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
-    val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata))
+    val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata))
     val relationWithAlias =
       SubqueryAlias(alias,
         SubqueryAlias("tbl1",
-          CatalogRelation("db2", tableMetadata, Some(alias))))
+          SimpleCatalogRelation("db2", tableMetadata, Some(alias))))
     assert(catalog.lookupRelation(
       TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
     assert(catalog.lookupRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 05fb1ef..cae6430 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -87,6 +87,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
+   * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option
(other
+   * options are passed on to Hive) e.g.:
+   * {{{
+   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
+   * }}}
+   */
+  override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
+    if (ctx.partitionSpec == null &&
+      ctx.identifier != null &&
+      ctx.identifier.getText.toLowerCase == "noscan") {
+      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+    } else {
+      // Always just run the no scan analyze. We should fix this and implement full analyze
+      // command in the future.
+      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+    }
+  }
+
+  /**
    * Create a [[SetDatabaseCommand]] logical plan.
    */
   override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
new file mode 100644
index 0000000..7fa246b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+
+
+/**
+ * Analyzes the given table in the current database to generate statistics, which will be
+ * used in query optimizations.
+ *
+ * Right now, it only supports Hive tables and it only updates the size of a Hive table
+ * in the Hive metastore.
+ */
+case class AnalyzeTable(tableName: String) extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val sessionState = sqlContext.sessionState
+    val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
+    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+    relation match {
+      case relation: CatalogRelation =>
+        val catalogTable: CatalogTable = relation.catalogTable
+        // This method is mainly based on
+        // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+        // in Hive 0.13 (except that we do not use fs.getContentSummary).
+        // TODO: Generalize statistics collection.
+        // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+        // Can we use fs.getContentSummary in future?
+        // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
+        // countFileSize to count the table size.
+        val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
+        def calculateTableSize(fs: FileSystem, path: Path): Long = {
+          val fileStatus = fs.getFileStatus(path)
+          val size = if (fileStatus.isDirectory) {
+            fs.listStatus(path)
+              .map { status =>
+                if (!status.getPath.getName.startsWith(stagingDir)) {
+                  calculateTableSize(fs, status.getPath)
+                } else {
+                  0L
+                }
+              }.sum
+          } else {
+            fileStatus.getLen
+          }
+
+          size
+        }
+
+        val tableParameters = catalogTable.properties
+        val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
+        val newTotalSize =
+          catalogTable.storage.locationUri.map { p =>
+            val path = new Path(p)
+            try {
+              val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+              calculateTableSize(fs, path)
+            } catch {
+              case NonFatal(e) =>
+                logWarning(
+                  s"Failed to get the size of table ${catalogTable.identifier.table} in the
" +
+                    s"database ${catalogTable.identifier.database} because of ${e.toString}",
e)
+                0L
+            }
+          }.getOrElse(0L)
+
+        // Update the Hive metastore if the total size of the table is different than the
size
+        // recorded in the Hive metastore.
+        // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
+        if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+          sessionState.catalog.alterTable(
+            catalogTable.copy(
+              properties = relation.catalogTable.properties +
+                (AnalyzeTable.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
+        }
+
+      case otherRelation =>
+        throw new UnsupportedOperationException(
+          s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
+    }
+    Seq.empty[Row]
+  }
+}
+
+object AnalyzeTable {
+  val TOTAL_SIZE_FIELD = "totalSize"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index e1be4b8..c423b84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.AnalyzeTable
 import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename,
ResolveDataSource}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -162,8 +163,15 @@ private[sql] class SessionState(ctx: SQLContext) {
     ctx.sparkContext.addJar(path)
   }
 
+  /**
+   * Analyzes the given table in the current database to generate statistics, which will
be
+   * used in query optimizations.
+   *
+   * Right now, it only supports catalog tables and it only updates the size of a catalog
table
+   * in the external catalog.
+   */
   def analyze(tableName: String): Unit = {
-    throw new UnsupportedOperationException
+    AnalyzeTable(tableName).run(ctx)
   }
 
   def runNativeSql(sql: String): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ca39791..df2b6be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -392,7 +392,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging
{
     cachedDataSourceTables.getIfPresent(tableIdentifier) match {
       case null => None // Cache miss
       case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
-        val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq
+        val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
 
         expectedFileFormat match {
@@ -467,7 +467,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging
{
         Some(partitionSpec))
 
       val hadoopFsRelation = cached.getOrElse {
-        val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil
+        val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil
         val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
 
         val inferredSchema = if (fileType.equals("parquet")) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 6f4332c..4db0d78 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -24,12 +24,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlanner
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
-import org.apache.spark.sql.hive.execution.{AnalyzeTable, HiveSqlParser}
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 
 
@@ -106,11 +104,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
   }
 
   /**
-   * Parser for HiveQl query texts.
-   */
-  override lazy val sqlParser: ParserInterface = new HiveSqlParser(conf)
-
-  /**
    * Planner that takes into account Hive-specific strategies.
    */
   override def planner: SparkPlanner = {
@@ -175,17 +168,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
   }
 
   /**
-   * Analyzes the given table in the current database to generate statistics, which will
be
-   * used in query optimizations.
-   *
-   * Right now, it only supports Hive tables and it only updates the size of a Hive table
-   * in the Hive metastore.
-   */
-  override def analyze(tableName: String): Unit = {
-    AnalyzeTable(tableName).run(ctx)
-  }
-
-  /**
    * Execute a SQL statement by passing the query text directly to Hive.
    */
   override def runNativeSql(sql: String): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index a66c325..cd45706 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTablePartition,
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
@@ -37,13 +37,13 @@ import org.apache.spark.sql.hive.client.HiveClient
 
 
 private[hive] case class MetastoreRelation(
-  databaseName: String,
-  tableName: String,
-  alias: Option[String])
-  (val table: CatalogTable,
-    @transient private val client: HiveClient,
-    @transient private val sqlContext: SQLContext)
-  extends LeafNode with MultiInstanceRelation with FileRelation {
+    databaseName: String,
+    tableName: String,
+    alias: Option[String])
+    (val catalogTable: CatalogTable,
+     @transient private val client: HiveClient,
+     @transient private val sqlContext: SQLContext)
+  extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {
 
   override def equals(other: Any): Boolean = other match {
     case relation: MetastoreRelation =>
@@ -58,7 +58,7 @@ private[hive] case class MetastoreRelation(
     Objects.hashCode(databaseName, tableName, alias, output)
   }
 
-  override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
+  override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sqlContext :: Nil
 
   private def toHiveColumn(c: CatalogColumn): FieldSchema = {
     new FieldSchema(c.name, c.dataType, c.comment.orNull)
@@ -69,14 +69,14 @@ private[hive] case class MetastoreRelation(
     // We start by constructing an API table as Hive performs several important transformations
     // internally when converting an API table to a QL table.
     val tTable = new org.apache.hadoop.hive.metastore.api.Table()
-    tTable.setTableName(table.identifier.table)
-    tTable.setDbName(table.database)
+    tTable.setTableName(catalogTable.identifier.table)
+    tTable.setDbName(catalogTable.database)
 
     val tableParameters = new java.util.HashMap[String, String]()
     tTable.setParameters(tableParameters)
-    table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
+    catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
 
-    tTable.setTableType(table.tableType match {
+    tTable.setTableType(catalogTable.tableType match {
       case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
       case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
       case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
@@ -87,22 +87,22 @@ private[hive] case class MetastoreRelation(
     tTable.setSd(sd)
 
     // Note: In Hive the schema and partition columns must be disjoint sets
-    val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
-      table.partitionColumnNames.contains(c.getName)
+    val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
+      catalogTable.partitionColumnNames.contains(c.getName)
     }
     sd.setCols(schema.asJava)
     tTable.setPartitionKeys(partCols.asJava)
 
-    table.storage.locationUri.foreach(sd.setLocation)
-    table.storage.inputFormat.foreach(sd.setInputFormat)
-    table.storage.outputFormat.foreach(sd.setOutputFormat)
+    catalogTable.storage.locationUri.foreach(sd.setLocation)
+    catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
+    catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
 
     val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-    table.storage.serde.foreach(serdeInfo.setSerializationLib)
+    catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
     sd.setSerdeInfo(serdeInfo)
 
     val serdeParameters = new java.util.HashMap[String, String]()
-    table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k,
v) }
     serdeInfo.setParameters(serdeParameters)
 
     new HiveTable(tTable)
@@ -130,11 +130,11 @@ private[hive] case class MetastoreRelation(
 
   // When metastore partition pruning is turned off, we cache the list of all partitions
to
   // mimic the behavior of Spark < 1.5
-  private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
+  private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable)
 
   def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
     val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
-      client.getPartitionsByFilter(table, predicates)
+      client.getPartitionsByFilter(catalogTable, predicates)
     } else {
       allPartitions
     }
@@ -147,7 +147,7 @@ private[hive] case class MetastoreRelation(
 
       val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
       tPartition.setSd(sd)
-      sd.setCols(table.schema.map(toHiveColumn).asJava)
+      sd.setCols(catalogTable.schema.map(toHiveColumn).asJava)
       p.storage.locationUri.foreach(sd.setLocation)
       p.storage.inputFormat.foreach(sd.setInputFormat)
       p.storage.outputFormat.foreach(sd.setOutputFormat)
@@ -158,7 +158,7 @@ private[hive] case class MetastoreRelation(
       p.storage.serde.foreach(serdeInfo.setSerializationLib)
 
       val serdeParameters = new java.util.HashMap[String, String]()
-      table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v)
}
+      catalogTable.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k,
v) }
       p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
       serdeInfo.setParameters(serdeParameters)
 
@@ -195,12 +195,12 @@ private[hive] case class MetastoreRelation(
   }
 
   /** PartitionKey attributes */
-  val partitionKeys = table.partitionColumns.map(_.toAttribute)
+  val partitionKeys = catalogTable.partitionColumns.map(_.toAttribute)
 
   /** Non-partitionKey attributes */
   // TODO: just make this hold the schema itself, not just non-partition columns
-  val attributes = table.schema
-    .filter { c => !table.partitionColumnNames.contains(c.name) }
+  val attributes = catalogTable.schema
+    .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
     .map(_.toAttribute)
 
   val output = attributes ++ partitionKeys
@@ -213,19 +213,19 @@ private[hive] case class MetastoreRelation(
 
   override def inputFiles: Array[String] = {
     val partLocations = client
-      .getPartitionsByFilter(table, Nil)
+      .getPartitionsByFilter(catalogTable, Nil)
       .flatMap(_.storage.locationUri)
       .toArray
     if (partLocations.nonEmpty) {
       partLocations
     } else {
       Array(
-        table.storage.locationUri.getOrElse(
-          sys.error(s"Could not get the location of ${table.qualifiedName}.")))
+        catalogTable.storage.locationUri.getOrElse(
+          sys.error(s"Could not get the location of ${catalogTable.qualifiedName}.")))
     }
   }
 
   override def newInstance(): MetastoreRelation = {
-    MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
+    MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sqlContext)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
deleted file mode 100644
index 35530b9..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.catalyst.parser._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.HiveNativeCommand
-import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
-
-/**
- * Concrete parser for HiveQl statements.
- */
-class HiveSqlParser(conf: SQLConf) extends AbstractSqlParser {
-
-  val astBuilder = new HiveSqlAstBuilder(conf)
-
-  private val substitutor = new VariableSubstitution(conf)
-
-  protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T =
{
-    super.parse(substitutor.substitute(command))(toResult)
-  }
-
-  protected override def nativeCommand(sqlText: String): LogicalPlan = {
-    HiveNativeCommand(substitutor.substitute(sqlText))
-  }
-}
-
-/**
- * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
- */
-class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
-
-  import ParserUtils._
-
-  /**
-   * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option
(other
-   * options are passed on to Hive) e.g.:
-   * {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
-   * }}}
-   */
-  override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
-    if (ctx.partitionSpec == null &&
-      ctx.identifier != null &&
-      ctx.identifier.getText.toLowerCase == "noscan") {
-      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
-    } else {
-      // Always just run the no scan analyze. We should fix this and implement full analyze
-      // command in the future.
-      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 7e9669a..6899f46 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -17,109 +17,19 @@
 
 package org.apache.spark.sql.hive.execution
 
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.MetaStoreUtils
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation,
LogicalRelation}
-import org.apache.spark.sql.hive.{HiveSessionState, MetastoreRelation}
+import org.apache.spark.sql.hive.HiveSessionState
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 
 
-/**
- * Analyzes the given table in the current database to generate statistics, which will be
- * used in query optimizations.
- *
- * Right now, it only supports Hive tables and it only updates the size of a Hive table
- * in the Hive metastore.
- */
-private[hive]
-case class AnalyzeTable(tableName: String) extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-    val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
-    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
-
-    relation match {
-      case relation: MetastoreRelation =>
-        val catalogTable: CatalogTable = relation.table
-        // This method is mainly based on
-        // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
-        // in Hive 0.13 (except that we do not use fs.getContentSummary).
-        // TODO: Generalize statistics collection.
-        // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
-        // Can we use fs.getContentSummary in future?
-        // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
-        // countFileSize to count the table size.
-        val stagingDir = sessionState.metadataHive.getConf(
-          HiveConf.ConfVars.STAGINGDIR.varname,
-          HiveConf.ConfVars.STAGINGDIR.defaultStrVal)
-
-        def calculateTableSize(fs: FileSystem, path: Path): Long = {
-          val fileStatus = fs.getFileStatus(path)
-          val size = if (fileStatus.isDirectory) {
-            fs.listStatus(path)
-              .map { status =>
-              if (!status.getPath().getName().startsWith(stagingDir)) {
-                calculateTableSize(fs, status.getPath)
-              } else {
-                0L
-              }
-            }
-              .sum
-          } else {
-            fileStatus.getLen
-          }
-
-          size
-        }
-
-        val tableParameters = catalogTable.properties
-        val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
-        val newTotalSize =
-          catalogTable.storage.locationUri.map { p =>
-            val path = new Path(p)
-            try {
-              val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-              calculateTableSize(fs, path)
-            } catch {
-              case NonFatal(e) =>
-                logWarning(
-                  s"Failed to get the size of table ${catalogTable.identifier.table} in the
" +
-                    s"database ${catalogTable.identifier.database} because of ${e.toString}",
e)
-                0L
-            }
-          }.getOrElse(0L)
-
-        // Update the Hive metastore if the total size of the table is different than the
size
-        // recorded in the Hive metastore.
-        // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
-        if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
-          sessionState.catalog.alterTable(
-            catalogTable.copy(
-              properties = relation.table.properties +
-                (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
-        }
-
-      case otherRelation =>
-        throw new UnsupportedOperationException(
-          s"Analyze only works for Hive tables, but $tableName is a ${otherRelation.nodeName}")
-    }
-    Seq.empty[Row]
-  }
-}
-
 private[hive]
 case class CreateMetastoreDataSource(
     tableIdent: TableIdentifier,

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 7a6f1ce..565b310 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,9 +21,8 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.HiveNativeCommand
+import org.apache.spark.sql.execution.command.AnalyzeTable
 import org.apache.spark.sql.execution.joins._
-import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 
@@ -117,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
     // Try to analyze a temp table
     sql("""SELECT * FROM src""").registerTempTable("tempTable")
     intercept[UnsupportedOperationException] {
-      hiveContext.sessionState.analyze("tempTable")
+      hiveContext.sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
     }
     hiveContext.sessionState.catalog.dropTable(
       TableIdentifier("tempTable"), ignoreIfNotExists = true)

http://git-wip-us.apache.org/repos/asf/spark/blob/f181aee0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 79ac53c..12f30e2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -153,7 +153,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
       val (actualScannedColumns, actualPartValues) = plan.collect {
         case p @ HiveTableScan(columns, relation, _) =>
           val columnNames = columns.map(_.name)
-          val partValues = if (relation.table.partitionColumnNames.nonEmpty) {
+          val partValues = if (relation.catalogTable.partitionColumnNames.nonEmpty) {
             p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
           } else {
             Seq.empty


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message