carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [16/42] carbondata git commit: Fixed full join query issue with aggregate
Date Thu, 15 Jun 2017 11:50:21 GMT
Fixed full join query issue with aggregate

Fixed in spark-1.6

Fixed style


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e67003cf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e67003cf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e67003cf

Branch: refs/heads/branch-1.1
Commit: e67003cf657e743194cf449792b67f896b1adc74
Parents: 0c6f5f3
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Tue May 23 10:32:21 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Jun 15 12:57:35 2017 +0530

----------------------------------------------------------------------
 .../joinquery/AllDataTypesTestCaseJoin.scala    |   9 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   | 101 ++++++++++++-------
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 101 +++++++++++--------
 3 files changed, 131 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
index be0f8e6..08fad0b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 class AllDataTypesTestCaseJoin extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
-    sql("CREATE TABLE alldatatypestableJoin (empno int, empname String, designation String,
doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql("CREATE TABLE alldatatypestableJoin (empno int, empname String, designation String,
doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,salary int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='empno','TABLE_BLOCKSIZE'='4')")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE alldatatypestableJoin
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""");
 
     sql("CREATE TABLE alldatatypestableJoin_hive (empno int, empname String, designation
String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname
String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,salary int)row format delimited fields terminated by ','")
@@ -90,6 +90,13 @@ class AllDataTypesTestCaseJoin extends QueryTest with BeforeAndAfterAll
{
     sql("DROP TABLE IF EXISTS carbon_table2")
   }
 
+  test("join with aggregate plan") {
+    checkAnswer(sql("SELECT c1.empno,c1.empname, c2.empno FROM (SELECT empno,empname FROM
alldatatypestableJoin GROUP BY empno,empname) c1 FULL JOIN " +
+                    "(SELECT empno FROM alldatatypestableJoin GROUP BY empno) c2 ON c1.empno
= c2.empno"),
+      sql("SELECT c1.empno,c1.empname, c2.empno FROM (SELECT empno,empname FROM alldatatypestableJoin_hive
GROUP BY empno,empname) c1 FULL JOIN " +
+          "(SELECT empno FROM alldatatypestableJoin_hive GROUP BY empno) c2 ON c1.empno =
c2.empno"))
+  }
+
   override def afterAll {
     sql("drop table alldatatypestableJoin")
     sql("drop table alldatatypestableJoin_hive")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 9aa8158..02ac5f8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -206,6 +206,47 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
     relations.foreach(_.fillAttributeMap(attrMap))
 
     def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
+
+      def transformAggregateExpression(agg: Aggregate,
+          aggonGroups: util.HashSet[AttributeReferenceWrapper] = null): LogicalPlan = {
+        val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
+        if (aggonGroups != null) {
+          attrsOndimAggs.addAll(aggonGroups)
+        }
+        agg.aggregateExpressions.map {
+          case attr: AttributeReference =>
+          case a@Alias(attr: AttributeReference, name) =>
+          case aggExp: AggregateExpression =>
+            aggExp.transform {
+              case aggExp: AggregateExpression =>
+                collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap)
+                aggExp
+            }
+          case others =>
+            others.collect {
+              case attr: AttributeReference
+                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
+                attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+            }
+        }
+        var child = agg.child
+        // Incase if the child also aggregate then push down decoder to child
+        if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
+          child = CarbonDictionaryTempDecoder(attrsOndimAggs,
+            new util.HashSet[AttributeReferenceWrapper](),
+            agg.child)
+        }
+        if (!decoder && aggonGroups == null) {
+          decoder = true
+          CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+            new util.HashSet[AttributeReferenceWrapper](),
+            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
+            isOuter = true)
+        } else {
+          Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
+        }
+      }
+
       currentPlan match {
         case limit@Limit(_, child: Sort) =>
           if (!decoder) {
@@ -288,39 +329,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
           }
 
         case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
-          agg.aggregateExpressions.map {
-            case attr: AttributeReference =>
-            case a@Alias(attr: AttributeReference, name) =>
-            case aggExp: AggregateExpression =>
-              aggExp.transform {
-                case aggExp: AggregateExpression =>
-                  collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap)
-                  aggExp
-              }
-            case others =>
-              others.collect {
-                case attr: AttributeReference
-                  if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                  attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-              }
-          }
-          var child = agg.child
-          // Incase if the child also aggregate then push down decoder to child
-          if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
-            child = CarbonDictionaryTempDecoder(attrsOndimAggs,
-              new util.HashSet[AttributeReferenceWrapper](),
-              agg.child)
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
-              isOuter = true)
-          } else {
-            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
-          }
+          transformAggregateExpression(agg)
         case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper]
           expand.projections.map {s =>
@@ -410,15 +419,29 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
             var rightPlan = j.right
             if (leftCondAttrs.size() > 0 &&
                 !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-                new util.HashSet[AttributeReferenceWrapper](),
-                j.left)
+              leftPlan = leftPlan match {
+                case agg: Aggregate =>
+                  CarbonDictionaryTempDecoder(leftCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    transformAggregateExpression(agg, leftCondAttrs))
+                case _ =>
+                  CarbonDictionaryTempDecoder(leftCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    j.left)
+              }
             }
             if (rightCondAttrs.size() > 0 &&
                 !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-                new util.HashSet[AttributeReferenceWrapper](),
-                j.right)
+              rightPlan = rightPlan match {
+                case agg: Aggregate =>
+                  CarbonDictionaryTempDecoder(rightCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    transformAggregateExpression(agg, rightCondAttrs))
+                case _ =>
+                  CarbonDictionaryTempDecoder(rightCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    j.right)
+              }
             }
             if (!decoder) {
               decoder = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index fd6f14e..d1a0c90 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -178,6 +178,46 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
     relations.foreach(_.fillAttributeMap(attrMap))
 
     def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
+
+      def transformAggregateExpression(agg: Aggregate,
+          attrsOnGroup: util.HashSet[AttributeReferenceWrapper] = null): LogicalPlan = {
+        val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
+        if (attrsOnGroup != null) {
+          attrsOndimAggs.addAll(attrsOnGroup)
+        }
+        agg.aggregateExpressions.map {
+          case attr: AttributeReference =>
+          case a@Alias(attr: AttributeReference, name) =>
+          case aggExp: AggregateExpression =>
+            aggExp.transform {
+              case aggExp: AggregateExpression =>
+                collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap)
+                aggExp
+            }
+          case others =>
+            others.collect {
+              case attr: AttributeReference
+                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
+                attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+            }
+        }
+        var child = agg.child
+        // Incase if the child also aggregate then push down decoder to child
+        if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
+          child = CarbonDictionaryTempDecoder(attrsOndimAggs,
+            new util.HashSet[AttributeReferenceWrapper](),
+            agg.child)
+        }
+        if (!decoder && attrsOnGroup == null) {
+          decoder = true
+          CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+            new util.HashSet[AttributeReferenceWrapper](),
+            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
+            isOuter = true)
+        } else {
+          Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
+        }
+      }
       currentPlan match {
         case limit@GlobalLimit(_, LocalLimit(_, child: Sort)) =>
           if (!decoder) {
@@ -259,39 +299,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
             Union(children)
           }
         case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
-          agg.aggregateExpressions.map {
-            case attr: AttributeReference =>
-            case a@Alias(attr: AttributeReference, name) =>
-            case aggExp: AggregateExpression =>
-              aggExp.transform {
-                case aggExp: AggregateExpression =>
-                  collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap)
-                  aggExp
-              }
-            case others =>
-              others.collect {
-                case attr: AttributeReference
-                  if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                  attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-              }
-          }
-          var child = agg.child
-          // Incase if the child also aggregate then push down decoder to child
-          if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
-            child = CarbonDictionaryTempDecoder(attrsOndimAggs,
-              new util.HashSet[AttributeReferenceWrapper](),
-              agg.child)
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
-              isOuter = true)
-          } else {
-            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
-          }
+          transformAggregateExpression(agg)
         case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper]
           expand.projections.map {s =>
@@ -381,15 +389,29 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
             var rightPlan = j.right
             if (leftCondAttrs.size() > 0 &&
                 !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-                new util.HashSet[AttributeReferenceWrapper](),
-                j.left)
+              leftPlan = leftPlan match {
+                case agg: Aggregate =>
+                  CarbonDictionaryTempDecoder(leftCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    transformAggregateExpression(agg, leftCondAttrs))
+                case _ =>
+                  CarbonDictionaryTempDecoder(leftCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    j.left)
+              }
             }
             if (rightCondAttrs.size() > 0 &&
                 !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-                new util.HashSet[AttributeReferenceWrapper](),
-                j.right)
+              rightPlan = rightPlan match {
+                case agg: Aggregate =>
+                  CarbonDictionaryTempDecoder(rightCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    transformAggregateExpression(agg, rightCondAttrs))
+                case _ =>
+                  CarbonDictionaryTempDecoder(rightCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    j.right)
+              }
             }
             Join(leftPlan, rightPlan, j.joinType, j.condition)
           } else {
@@ -503,7 +525,6 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
 
         case others => others
       }
-
     }
 
     val transFormedPlan =


Mime
View raw message