carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] incubator-carbondata git commit: Fixed Union bug
Date Mon, 25 Jul 2016 13:18:34 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master e71e5ca25 -> 10ed89ab5


Fixed Union bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a1e9a994
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a1e9a994
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a1e9a994

Branch: refs/heads/master
Commit: a1e9a994ef1a3a7459bde72763326af97a311c22
Parents: e71e5ca
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Sat Jul 16 14:45:34 2016 +0530
Committer: chenliang613 <chenliang613@apache.org>
Committed: Mon Jul 25 21:16:22 2016 +0800

----------------------------------------------------------------------
 .../CarbonDecoderOptimizerHelper.scala          | 14 ++--
 .../spark/sql/optimizer/CarbonOptimizer.scala   | 79 +++++++++++---------
 .../AllDataTypesTestCaseAggregate.scala         | 14 ++++
 3 files changed, 65 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1e9a994/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index 0f583d0..3784f84 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -29,7 +29,7 @@ abstract class AbstractNode
 
 case class Node(cd: CarbonDictionaryTempDecoder) extends AbstractNode
 
-case class JoinNode(left: util.List[AbstractNode], right: util.List[AbstractNode])
+case class BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode])
   extends AbstractNode
 
 case class CarbonDictionaryTempDecoder(
@@ -58,7 +58,7 @@ class CarbonDecoderProcessor {
       case j: BinaryNode =>
         val leftList = new util.ArrayList[AbstractNode]
         val rightList = new util.ArrayList[AbstractNode]
-        nodeList.add(JoinNode(leftList, rightList))
+        nodeList.add(BinaryCarbonNode(leftList, rightList))
         process(j.left, leftList)
         process(j.right, rightList)
       case e: UnaryNode => process(e.child, nodeList)
@@ -79,7 +79,7 @@ class CarbonDecoderProcessor {
         decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add)
         decoderNotDecode.asScala.foreach(cd.attrList.remove)
         decoderNotDecode.addAll(cd.attrList)
-      case JoinNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) =>
+      case BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode])
=>
         val leftNotDecode = new util.HashSet[Attribute]
         val rightNotDecode = new util.HashSet[Attribute]
         updateDecoderInternal(left.asScala, leftNotDecode)
@@ -91,7 +91,7 @@ class CarbonDecoderProcessor {
 
 }
 
-case class Marker(set: util.Set[Attribute], join: Boolean = false)
+case class Marker(set: util.Set[Attribute], binary: Boolean = false)
 
 class CarbonPlanMarker {
   val markerStack = new util.Stack[Marker]
@@ -101,8 +101,8 @@ class CarbonPlanMarker {
     markerStack.push(Marker(attrs))
   }
 
-  def pushJoinMarker(attrs: util.Set[Attribute]): Unit = {
-    markerStack.push(Marker(attrs, join = true))
+  def pushBinaryMarker(attrs: util.Set[Attribute]): Unit = {
+    markerStack.push(Marker(attrs, binary = true))
     joinCount = joinCount + 1
   }
 
@@ -110,7 +110,7 @@ class CarbonPlanMarker {
     if (joinCount > 0) {
       while (!markerStack.empty()) {
         val marker = markerStack.pop()
-        if (marker.join) {
+        if (marker.binary) {
           joinCount = joinCount - 1
           return marker.set
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1e9a994/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 9241474..995c109 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -115,25 +115,50 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
             Sort(sort.order, sort.global, child)
           }
 
+        case union: Union
+          if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
+               union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
+          val leftCondAttrs = new util.HashSet[Attribute]
+          val rightCondAttrs = new util.HashSet[Attribute]
+          union.left.output.foreach(attr => leftCondAttrs.add(aliasMap.getOrElse(attr,
attr)))
+          union.right.output.foreach(attr => rightCondAttrs.add(aliasMap.getOrElse(attr,
attr)))
+          var leftPlan = union.left
+          var rightPlan = union.right
+          if (leftCondAttrs.size() > 0 &&
+              !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+            leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
+              new util.HashSet[Attribute](),
+              union.left)
+          }
+          if (rightCondAttrs.size() > 0 &&
+              !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+            rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
+              new util.HashSet[Attribute](),
+              union.right)
+          }
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+              new util.HashSet[Attribute](),
+              Union(leftPlan, rightPlan),
+              isOuter = true)
+          } else {
+            Union(leftPlan, rightPlan)
+          }
+
         case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOndimAggs = new util.HashSet[Attribute]
           agg.aggregateExpressions.map {
             case attr: AttributeReference =>
-            case a@Alias(attr: AttributeReference, name) => aliasMap.put(a.toAttribute,
attr)
+            case a@Alias(attr: AttributeReference, name) =>
             case aggExp: AggregateExpression =>
               aggExp.transform {
                 case aggExp: AggregateExpression =>
                   collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap)
                   aggExp
-                case a@Alias(attr: Attribute, name) =>
-                  aliasMap.put(a.toAttribute, attr)
-                  a
               }
             case others =>
               others.collect {
-                case a@ Alias(attr: AttributeReference, _) => aliasMap.put(a.toAttribute,
attr)
-                case a@Alias(exp, _) if !exp.isInstanceOf[AttributeReference] =>
-                  aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
                 case attr: AttributeReference
                   if isDictionaryEncoded(attr, relations, aliasMap) =>
                   attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
@@ -160,7 +185,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
           expand.projections.map {s =>
             s.map {
               case attr: AttributeReference =>
-              case a@Alias(attr: AttributeReference, name) => aliasMap.put(a.toAttribute,
attr)
+              case a@Alias(attr: AttributeReference, name) =>
               case others =>
                 others.collect {
                   case attr: AttributeReference
@@ -264,7 +289,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
           val attrsOnProjects = new util.HashSet[Attribute]
           p.projectList.map {
             case attr: AttributeReference =>
-            case a@Alias(attr: AttributeReference, name) => aliasMap.put(a.toAttribute,
attr)
+            case a@Alias(attr: AttributeReference, name) =>
             case others =>
               others.collect {
                 case attr: AttributeReference
@@ -417,8 +442,11 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
         }
         Filter(filterExps, filter.child)
       case j: Join =>
-        marker.pushJoinMarker(allAttrsNotDecode)
+        marker.pushBinaryMarker(allAttrsNotDecode)
         j
+      case u: Union =>
+        marker.pushBinaryMarker(allAttrsNotDecode)
+        u
       case p: Project if relations.nonEmpty =>
         val prExps = p.projectList.map { prExp =>
           prExp.transform {
@@ -487,31 +515,13 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
 
   private def collectInformationOnAttributes(plan: LogicalPlan,
       aliasMap: CarbonAliasDecoderRelation) {
-    plan transformUp {
-      case project: Project =>
-        project.projectList.map { p =>
-          p transform {
-            case a@Alias(attr: Attribute, name) =>
-              aliasMap.put(a.toAttribute, attr)
-              a
-            case a@Alias(_, name) =>
-              aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
-              a
-          }
-        }
-        project
-      case agg: Aggregate =>
-        agg.aggregateExpressions.map { aggExp =>
-          aggExp.transform {
-            case a@Alias(attr: Attribute, name) =>
-              aliasMap.put(a.toAttribute, attr)
-              a
-            case a@Alias(_, name) =>
-              aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
-              a
-          }
+    plan transformAllExpressions  {
+      case a@Alias(exp, name) =>
+        exp match {
+          case attr: Attribute => aliasMap.put(a.toAttribute, attr)
+          case _ => aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
         }
-        agg
+        a
     }
   }
 
@@ -522,7 +532,6 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
     aggExp collect {
       case attr: AttributeReference if isDictionaryEncoded(attr, relations, aliasMap) =>
         attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
-      case a@Alias(attr: Attribute, name) => aliasMap.put(a.toAttribute, attr)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1e9a994/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index a64e6ac..fe56a58 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -1087,4 +1087,18 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll
{
       Seq(Row(96981.54360516652)))
   })
 
+  test("CARBONDATA-60-union-defect")({
+    sql("drop table if exists carbonunion")
+    import implicits._
+    val df=sc.parallelize(1 to 1000).map(x => (x+"", (x+100)+"")).toDF("c1", "c2")
+    df.registerTempTable("sparkunion")
+    import org.carbondata.spark._
+    df.saveAsCarbonFile(Map("tableName" -> "carbonunion"))
+
+    checkAnswer(
+      sql("select c1,count(c1) from (select c1 as c1,c2 as c2 from carbonunion union all
select c2 as c1,c1 as c2 from carbonunion)t where c1='200' group by c1"),
+      sql("select c1,count(c1) from (select c1 as c1,c2 as c2 from sparkunion union all select
c2 as c1,c1 as c2 from sparkunion)t where c1='200' group by c1"))
+  })
+  sql("drop table if exists carbonunion")
+
 }
\ No newline at end of file


Mime
View raw message