carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kumarvisha...@apache.org
Subject [1/2] incubator-carbondata git commit: Fixed order by limit with select * query
Date Thu, 06 Apr 2017 11:01:21 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 68cbe1508 -> 1b20c2dfd


Fixed order by limit with select * query


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/82b61d47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/82b61d47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/82b61d47

Branch: refs/heads/master
Commit: 82b61d4799a0e2eea6064d4e997fa1524c7f7b1d
Parents: 68cbe15
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Tue Apr 4 15:18:46 2017 +0530
Committer: kumarvishal <kumarvishal.1802@gmail.com>
Committed: Thu Apr 6 16:28:21 2017 +0530

----------------------------------------------------------------------
 .../sortexpr/AllDataTypesTestCaseSort.scala     | 18 +++++++++--
 .../spark/sql/CarbonCatalystOperators.scala     | 19 +++++++++--
 .../spark/sql/CarbonDictionaryDecoder.scala     | 33 +++++++++++++++++++-
 .../execution/CarbonLateDecodeStrategy.scala    | 14 ++++++---
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 27 ++++++++++++++--
 5 files changed, 98 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
index bdb470a..34d3cee 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
@@ -27,6 +27,8 @@ import org.scalatest.BeforeAndAfterAll
 class AllDataTypesTestCaseSort extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
+    sql("drop table if exists alldatatypestablesort")
+    sql("drop table if exists alldatatypestablesort_hive")
     sql("CREATE TABLE alldatatypestablesort (empno int, empname String, designation String,
doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization
int,salary int) STORED BY 'org.apache.carbondata.format'")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE alldatatypestablesort
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""");
 
@@ -41,8 +43,20 @@ class AllDataTypesTestCaseSort extends QueryTest with BeforeAndAfterAll
{
       sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestablesort_hive
where empname in ('arvind','ayushi') group by empno,empname,utilization order by empno"))
   }
 
+  test("select * from alldatatypestablesort order by empname limit 10") {
+    sql("select * from alldatatypestablesort order by empname limit 10").collect()
+  }
+
+  test("select * from alldatatypestablesort order by salary limit 2") {
+    sql("select * from alldatatypestablesort order by salary limit 2").collect()
+  }
+
+  test("select * from alldatatypestablesort where empname='arvind' order by salary limit
2") {
+    sql("select * from alldatatypestablesort where empname='arvind' order by salary limit
2").collect()
+  }
+
   override def afterAll {
-    sql("drop table alldatatypestablesort")
-    sql("drop table alldatatypestablesort_hive")
+    sql("drop table if exists alldatatypestablesort")
+    sql("drop table if exists alldatatypestablesort_hive")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 4070088..9b1533e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.{HiveContext, HiveSessionCatalog}
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types.{StringType, TimestampType}
@@ -33,8 +34,22 @@ case class CarbonDictionaryCatalystDecoder(
     isOuter: Boolean,
     child: LogicalPlan) extends UnaryNode {
   // the output should be updated with converted datatype, it is need for limit+sort plan.
-  override val output: Seq[Attribute] =
-    CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap)
+  override def output: Seq[Attribute] = {
+    child match {
+      case l: LogicalRelation =>
+        // If the child is logical plan then firts update all dictionary attr with IntegerType
+        val logicalOut =
+          CarbonDictionaryDecoder.updateAttributes(child.output, relations, aliasMap)
+        CarbonDictionaryDecoder.convertOutput(logicalOut, relations, profile, aliasMap)
+      case Filter(cond, l: LogicalRelation) =>
+        // If the child is logical plan then firts update all dictionary attr with IntegerType
+        val logicalOut =
+          CarbonDictionaryDecoder.updateAttributes(child.output, relations, aliasMap)
+        CarbonDictionaryDecoder.convertOutput(logicalOut, relations, profile, aliasMap)
+      case _ => CarbonDictionaryDecoder.convertOutput(child.output, relations, profile,
aliasMap)
+    }
+  }
+
 }
 
 abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 543da6f..d450b69 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -347,11 +347,42 @@ object CarbonDictionaryDecoder {
   }
 
   /**
+   * Updates all dictionary attributes with integer datatype.
+   */
+  def updateAttributes(output: Seq[Attribute],
+      relations: Seq[CarbonDecoderRelation],
+      aliasMap: CarbonAliasDecoderRelation): Seq[Attribute] = {
+    output.map { a =>
+      val attr = aliasMap.getOrElse(a, a)
+      val relation = relations.find(p => p.contains(attr))
+      if (relation.isDefined) {
+        val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+        val carbonDimension = carbonTable
+          .getDimensionByName(carbonTable.getFactTableName, attr.name)
+        if (carbonDimension != null &&
+            carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+            !carbonDimension.isComplex()) {
+          val newAttr = AttributeReference(a.name,
+            IntegerType,
+            a.nullable,
+            a.metadata)(a.exprId).asInstanceOf[Attribute]
+          newAttr
+        } else {
+          a
+        }
+      } else {
+        a
+      }
+    }
+  }
+
+  /**
    * Whether the attributed requires to decode or not based on the profile.
    */
   def canBeDecoded(attr: Attribute, profile: CarbonProfile): Boolean = {
     profile match {
-      case ip: IncludeProfile if ip.attributes.nonEmpty =>
+      case ip: IncludeProfile =>
         ip.attributes
           .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
       case ep: ExcludeProfile =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 16e8a99..ed5d362 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -61,11 +61,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
             a.map(_.name).toArray, f), needDecoder)) ::
             Nil
       case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
-        CarbonDictionaryDecoder(relations,
-          profile,
-          aliasMap,
-          planLater(child)
-        ) :: Nil
+        if (profile.isInstanceOf[IncludeProfile] && profile.isEmpty) {
+          planLater(child) :: Nil
+        } else {
+          CarbonDictionaryDecoder(relations,
+            profile,
+            aliasMap,
+            planLater(child)
+          ) :: Nil
+        }
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 36478b4..181328d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -168,7 +168,21 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
           if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
             child = CarbonDictionaryTempDecoder(attrsOnSort,
               new util.HashSet[AttributeReferenceWrapper](), sort.child)
+          } else {
+            // In case of select * from query it gets logical relation and there is no way
+            // to convert the datatypes of attributes, so just add this dummy decoder to
convert
+            // to dictionary datatypes.
+            child match {
+              case l: LogicalRelation =>
+                child = CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+                  new util.HashSet[AttributeReferenceWrapper](), sort.child)
+              case Filter(cond, l: LogicalRelation) =>
+                child = CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+                  new util.HashSet[AttributeReferenceWrapper](), sort.child)
+              case _ =>
+            }
           }
+
           if (!decoder) {
             decoder = true
             CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
@@ -609,9 +623,16 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
     }
     // Remove unnecessary decoders
     val finalPlan = transFormedPlan transform {
-      case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
-        if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
-        child
+      case cd@ CarbonDictionaryCatalystDecoder(_, profile, _, false, child) =>
+        if (profile.isInstanceOf[IncludeProfile] && profile.isEmpty) {
+          child match {
+            case l: LogicalRelation => cd
+            case Filter(condition, l: LogicalRelation) => cd
+            case _ => child
+          }
+        } else {
+          cd
+        }
     }
     finalPlan
   }


Mime
View raw message