Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9BF6818954 for ; Tue, 15 Mar 2016 06:09:12 +0000 (UTC) Received: (qmail 86215 invoked by uid 500); 15 Mar 2016 06:09:12 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 86184 invoked by uid 500); 15 Mar 2016 06:09:12 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 86175 invoked by uid 99); 15 Mar 2016 06:09:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Mar 2016 06:09:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5EA81DFB86; Tue, 15 Mar 2016 06:09:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: <2e8765404a2945869b82e91bb3af1ffc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-13884][SQL] Remove DescribeCommand's dependency on LogicalPlan Date: Tue, 15 Mar 2016 06:09:12 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master f72743d97 -> e64958001 [SPARK-13884][SQL] Remove DescribeCommand's dependency on LogicalPlan ## What changes were proposed in this pull request? This patch removes DescribeCommand's dependency on LogicalPlan. After this patch, DescribeCommand simply accepts a TableIdentifier. It minimizes the dependency, and blocks my next patch (removes SQLContext dependency from SparkPlanner). ## How was this patch tested? Should be covered by existing unit tests and Hive compatibility tests that run describe table. Author: Reynold Xin Closes #11710 from rxin/SPARK-13884. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6495800 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6495800 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6495800 Branch: refs/heads/master Commit: e64958001cb95d53c441131f8c7a92556f49fd7d Parents: f72743d Author: Reynold Xin Authored: Mon Mar 14 23:09:10 2016 -0700 Committer: Reynold Xin Committed: Mon Mar 14 23:09:10 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/SparkQl.scala | 5 +- .../spark/sql/execution/SparkStrategies.scala | 7 ++- .../spark/sql/execution/command/commands.scala | 7 +-- .../spark/sql/execution/datasources/ddl.scala | 5 +- .../apache/spark/sql/hive/HiveStrategies.scala | 14 +---- .../execution/DescribeHiveTableCommand.scala | 57 ++++++++++++-------- 6 files changed, 49 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala index 8dde308..11391bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala @@ -271,15 +271,14 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly // issue. val tableIdent = TableIdentifier( cleanIdentifier(tableName), Some(cleanIdentifier(dbName))) - datasources.DescribeCommand( - UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined) + datasources.DescribeCommand(tableIdent, isExtended = extended.isDefined) case Token(dbName, Nil) :: Token(tableName, Nil) :: Token(colName, Nil) :: Nil => // It is describing a column with the format like "describe db.table column". nodeToDescribeFallback(node) case tableName :: Nil => // It is describing a table with the format like "describe table". datasources.DescribeCommand( - UnresolvedRelation(TableIdentifier(cleanIdentifier(tableName.text)), None), + TableIdentifier(cleanIdentifier(tableName.text)), isExtended = extended.isDefined) case _ => nodeToDescribeFallback(node) http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index bae0750..6352c48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -398,11 +398,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case describe @ LogicalDescribeCommand(table, isExtended) => - val resultPlan = self.sqlContext.executePlan(table).executedPlan - ExecutedCommand( - RunnableDescribeCommand(resultPlan, describe.output, isExtended)) :: Nil + ExecutedCommand(RunnableDescribeCommand(table, describe.output, isExtended)) :: Nil - case logical.ShowFunctions(db, pattern) => ExecutedCommand(ShowFunctions(db, pattern)) :: Nil + case logical.ShowFunctions(db, pattern) => + ExecutedCommand(ShowFunctions(db, pattern)) :: Nil case logical.DescribeFunction(function, extended) => ExecutedCommand(DescribeFunction(function, extended)) :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 54cdcb1..6e36a15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -22,7 +22,7 @@ import java.util.NoSuchElementException import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical @@ -293,13 +293,14 @@ case object ClearCacheCommand extends RunnableCommand { case class DescribeCommand( - child: SparkPlan, + table: TableIdentifier, override val output: Seq[Attribute], isExtended: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - child.schema.fields.map { field => + val relation = sqlContext.sessionState.catalog.lookupRelation(table) + relation.schema.fields.map { field => val cmtKey = "comment" val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" Row(field.name, field.dataType.simpleString, comment) http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 903c991..04e5173 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -33,8 +33,9 @@ import org.apache.spark.sql.types._ * It is effective only when the table is a Hive table. */ case class DescribeCommand( - table: LogicalPlan, - isExtended: Boolean) extends LogicalPlan with logical.Command { + table: TableIdentifier, + isExtended: Boolean) + extends LogicalPlan with logical.Command { override def children: Seq[LogicalPlan] = Seq.empty http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index a19dc21..f44937e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -102,18 +102,8 @@ private[hive] trait HiveStrategies { case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case describe: DescribeCommand => - val resolvedTable = context.executePlan(describe.table).analyzed - resolvedTable match { - case t: MetastoreRelation => - ExecutedCommand( - DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil - - case o: LogicalPlan => - val resultPlan = context.executePlan(o).executedPlan - ExecutedCommand(RunnableDescribeCommand( - resultPlan, describe.output, describe.isExtended)) :: Nil - } - + ExecutedCommand( + DescribeHiveTableCommand(describe.table, describe.output, describe.isExtended)) :: Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index 57293fc..8481324 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -22,8 +22,10 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{DescribeCommand, RunnableCommand} import org.apache.spark.sql.hive.MetastoreRelation /** @@ -31,33 +33,44 @@ import org.apache.spark.sql.hive.MetastoreRelation */ private[hive] case class DescribeHiveTableCommand( - table: MetastoreRelation, + tableId: TableIdentifier, override val output: Seq[Attribute], isExtended: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - // Trying to mimic the format of Hive's output. But not exactly the same. - var results: Seq[(String, String, String)] = Nil - - val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala - val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala - results ++= columns.map(field => (field.getName, field.getType, field.getComment)) - if (partitionColumns.nonEmpty) { - val partColumnInfo = - partitionColumns.map(field => (field.getName, field.getType, field.getComment)) - results ++= - partColumnInfo ++ - Seq(("# Partition Information", "", "")) ++ - Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++ - partColumnInfo - } + // There are two modes here: + // For metastore tables, create an output similar to Hive's. + // For other tables, delegate to DescribeCommand. - if (isExtended) { - results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) - } + // In the future, we will consolidate the two and simply report what the catalog reports. + sqlContext.sessionState.catalog.lookupRelation(tableId) match { + case table: MetastoreRelation => + // Trying to mimic the format of Hive's output. But not exactly the same. + var results: Seq[(String, String, String)] = Nil + + val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala + val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala + results ++= columns.map(field => (field.getName, field.getType, field.getComment)) + if (partitionColumns.nonEmpty) { + val partColumnInfo = + partitionColumns.map(field => (field.getName, field.getType, field.getComment)) + results ++= + partColumnInfo ++ + Seq(("# Partition Information", "", "")) ++ + Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++ + partColumnInfo + } + + if (isExtended) { + results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) + } + + results.map { case (name, dataType, comment) => + Row(name, dataType, comment) + } - results.map { case (name, dataType, comment) => - Row(name, dataType, comment) + case o: LogicalPlan => + DescribeCommand(tableId, output, isExtended).run(sqlContext) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org