carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: Supporting scalar subquery in carbon
Date Mon, 27 Feb 2017 10:23:21 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 740358c13 -> 565bc9b8f


Supporting scalar subquery in carbon

Removed unused imports

Changed format

Changed format

changed testcases


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/538b64ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/538b64ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/538b64ec

Branch: refs/heads/master
Commit: 538b64ec4d9467fc41a953af7bac1ea46fcd9d84
Parents: 740358c
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Wed Feb 1 23:04:05 2017 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Mon Feb 27 00:29:52 2017 +0800

----------------------------------------------------------------------
 .../sql/CarbonDatasourceHadoopRelation.scala    |  3 +-
 .../spark/sql/hive/CarbonSessionState.scala     | 37 ++++++++++++++++++--
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  6 ++++
 .../bucketing/TableBucketingTestCase.scala      | 14 ++++++++
 4 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/538b64ec/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index eb1730f..1491cff 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -39,7 +39,8 @@ case class CarbonDatasourceHadoopRelation(
     sparkSession: SparkSession,
     paths: Array[String],
     parameters: Map[String, String],
-    tableSchema: Option[StructType])
+    tableSchema: Option[StructType],
+    var isSubquery: Boolean = false)
   extends BaseRelation with InsertableRelation {
 
   lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/538b64ec/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 066acce..331efe9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -16,10 +16,16 @@
  */
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions.ScalarSubquery
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.{CarbonLateDecodeStrategy, SparkOptimizer}
 import org.apache.spark.sql.execution.command.DDLStrategy
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
@@ -35,4 +41,31 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
     Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
   experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
 
+  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+}
+
+class CarbonOptimizer(
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer
rule, And
+    // optimize whole plan at once.
+    val transFormedPlan = plan.transform {
+      case filter: Filter =>
+        filter.transformExpressions {
+          case s: ScalarSubquery =>
+            val tPlan = s.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery = true
+                lr
+            }
+            ScalarSubquery(tPlan, s.children, s.exprId)
+        }
+    }
+    super.execute(transFormedPlan)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/538b64ec/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index afbbae1..a20326f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -58,6 +58,12 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
   def apply(plan: LogicalPlan): LogicalPlan = {
     relations = collectCarbonRelation(plan)
     if (relations.nonEmpty && !isOptimized(plan)) {
+      // In case scalar subquery skip the transformation and update the flag.
+      if (relations.exists(_.carbonRelation.isSubquery)) {
+        relations.foreach(p => p.carbonRelation.isSubquery = false)
+        LOGGER.info("Skip CarbonOptimizer for scalar sub query")
+        return plan
+      }
       LOGGER.info("Starting to optimize plan")
       val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
       val queryStatistic = new QueryStatistic()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/538b64ec/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 65a726a..33f710b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -191,6 +191,20 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll
{
     assert(shuffleExists, "shuffle should exist on non bucket tables")
   }
 
+  test("test scalar subquery with equal") {
+    sql(
+      """select sum(salary) from t4 t1
+        |where ID = (select sum(ID) from t4 t2 where t1.name = t2.name)""".stripMargin)
+      .count()
+  }
+
+  test("test scalar subquery with lessthan") {
+    sql(
+      """select sum(salary) from t4 t1
+        |where ID < (select sum(ID) from t4 t2 where t1.name = t2.name)""".stripMargin)
+      .count()
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS t3")
     sql("DROP TABLE IF EXISTS t4")


Mime
View raw message