spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-13884][SQL] Remove DescribeCommand's dependency on LogicalPlan
Date Tue, 15 Mar 2016 06:09:12 GMT
Repository: spark
Updated Branches:
  refs/heads/master f72743d97 -> e64958001


[SPARK-13884][SQL] Remove DescribeCommand's dependency on LogicalPlan

## What changes were proposed in this pull request?
This patch removes DescribeCommand's dependency on LogicalPlan. After this patch, DescribeCommand
simply accepts a TableIdentifier. It minimizes the dependency, and blocks my next patch (removes
SQLContext dependency from SparkPlanner).

## How was this patch tested?
Should be covered by existing unit tests and Hive compatibility tests that run describe table.

Author: Reynold Xin <rxin@databricks.com>

Closes #11710 from rxin/SPARK-13884.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6495800
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6495800
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6495800

Branch: refs/heads/master
Commit: e64958001cb95d53c441131f8c7a92556f49fd7d
Parents: f72743d
Author: Reynold Xin <rxin@databricks.com>
Authored: Mon Mar 14 23:09:10 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Mon Mar 14 23:09:10 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/SparkQl.scala    |  5 +-
 .../spark/sql/execution/SparkStrategies.scala   |  7 ++-
 .../spark/sql/execution/command/commands.scala  |  7 +--
 .../spark/sql/execution/datasources/ddl.scala   |  5 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  | 14 +----
 .../execution/DescribeHiveTableCommand.scala    | 57 ++++++++++++--------
 6 files changed, 49 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
index 8dde308..11391bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
@@ -271,15 +271,14 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends
Cataly
                   // issue.
                   val tableIdent = TableIdentifier(
                     cleanIdentifier(tableName), Some(cleanIdentifier(dbName)))
-                  datasources.DescribeCommand(
-                    UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined)
+                  datasources.DescribeCommand(tableIdent, isExtended = extended.isDefined)
                 case Token(dbName, Nil) :: Token(tableName, Nil) :: Token(colName, Nil) ::
Nil =>
                   // It is describing a column with the format like "describe db.table column".
                   nodeToDescribeFallback(node)
                 case tableName :: Nil =>
                   // It is describing a table with the format like "describe table".
                   datasources.DescribeCommand(
-                    UnresolvedRelation(TableIdentifier(cleanIdentifier(tableName.text)),
None),
+                    TableIdentifier(cleanIdentifier(tableName.text)),
                     isExtended = extended.isDefined)
                 case _ =>
                   nodeToDescribeFallback(node)

http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index bae0750..6352c48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -398,11 +398,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan]
{
         sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
 
       case describe @ LogicalDescribeCommand(table, isExtended) =>
-        val resultPlan = self.sqlContext.executePlan(table).executedPlan
-        ExecutedCommand(
-          RunnableDescribeCommand(resultPlan, describe.output, isExtended)) :: Nil
+        ExecutedCommand(RunnableDescribeCommand(table, describe.output, isExtended)) :: Nil
 
-      case logical.ShowFunctions(db, pattern) => ExecutedCommand(ShowFunctions(db, pattern))
:: Nil
+      case logical.ShowFunctions(db, pattern) =>
+        ExecutedCommand(ShowFunctions(db, pattern)) :: Nil
 
       case logical.DescribeFunction(function, extended) =>
         ExecutedCommand(DescribeFunction(function, extended)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 54cdcb1..6e36a15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -22,7 +22,7 @@ import java.util.NoSuchElementException
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Dataset, Row, SQLContext}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical
@@ -293,13 +293,14 @@ case object ClearCacheCommand extends RunnableCommand {
 
 
 case class DescribeCommand(
-    child: SparkPlan,
+    table: TableIdentifier,
     override val output: Seq[Attribute],
     isExtended: Boolean)
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    child.schema.fields.map { field =>
+    val relation = sqlContext.sessionState.catalog.lookupRelation(table)
+    relation.schema.fields.map { field =>
       val cmtKey = "comment"
       val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey)
else ""
       Row(field.name, field.dataType.simpleString, comment)

http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 903c991..04e5173 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -33,8 +33,9 @@ import org.apache.spark.sql.types._
  *                   It is effective only when the table is a Hive table.
  */
 case class DescribeCommand(
-    table: LogicalPlan,
-    isExtended: Boolean) extends LogicalPlan with logical.Command {
+    table: TableIdentifier,
+    isExtended: Boolean)
+  extends LogicalPlan with logical.Command {
 
   override def children: Seq[LogicalPlan] = Seq.empty
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index a19dc21..f44937e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -102,18 +102,8 @@ private[hive] trait HiveStrategies {
   case class HiveCommandStrategy(context: HiveContext) extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case describe: DescribeCommand =>
-        val resolvedTable = context.executePlan(describe.table).analyzed
-        resolvedTable match {
-          case t: MetastoreRelation =>
-            ExecutedCommand(
-              DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
-
-          case o: LogicalPlan =>
-            val resultPlan = context.executePlan(o).executedPlan
-            ExecutedCommand(RunnableDescribeCommand(
-              resultPlan, describe.output, describe.isExtended)) :: Nil
-        }
-
+        ExecutedCommand(
+          DescribeHiveTableCommand(describe.table, describe.output, describe.isExtended))
:: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e6495800/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index 57293fc..8481324 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -22,8 +22,10 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 
 import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{DescribeCommand, RunnableCommand}
 import org.apache.spark.sql.hive.MetastoreRelation
 
 /**
@@ -31,33 +33,44 @@ import org.apache.spark.sql.hive.MetastoreRelation
  */
 private[hive]
 case class DescribeHiveTableCommand(
-    table: MetastoreRelation,
+    tableId: TableIdentifier,
     override val output: Seq[Attribute],
     isExtended: Boolean) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    // Trying to mimic the format of Hive's output. But not exactly the same.
-    var results: Seq[(String, String, String)] = Nil
-
-    val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala
-    val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala
-    results ++= columns.map(field => (field.getName, field.getType, field.getComment))
-    if (partitionColumns.nonEmpty) {
-      val partColumnInfo =
-        partitionColumns.map(field => (field.getName, field.getType, field.getComment))
-      results ++=
-        partColumnInfo ++
-          Seq(("# Partition Information", "", "")) ++
-          Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++
-          partColumnInfo
-    }
+    // There are two modes here:
+    // For metastore tables, create an output similar to Hive's.
+    // For other tables, delegate to DescribeCommand.
 
-    if (isExtended) {
-      results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString,
""))
-    }
+    // In the future, we will consolidate the two and simply report what the catalog reports.
+    sqlContext.sessionState.catalog.lookupRelation(tableId) match {
+      case table: MetastoreRelation =>
+        // Trying to mimic the format of Hive's output. But not exactly the same.
+        var results: Seq[(String, String, String)] = Nil
+
+        val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala
+        val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala
+        results ++= columns.map(field => (field.getName, field.getType, field.getComment))
+        if (partitionColumns.nonEmpty) {
+          val partColumnInfo =
+            partitionColumns.map(field => (field.getName, field.getType, field.getComment))
+          results ++=
+            partColumnInfo ++
+              Seq(("# Partition Information", "", "")) ++
+              Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++
+              partColumnInfo
+        }
+
+        if (isExtended) {
+          results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString,
""))
+        }
+
+        results.map { case (name, dataType, comment) =>
+          Row(name, dataType, comment)
+        }
 
-    results.map { case (name, dataType, comment) =>
-      Row(name, dataType, comment)
+      case o: LogicalPlan =>
+        DescribeCommand(tableId, output, isExtended).run(sqlContext)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message