carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [23/26] carbondata git commit: [CARBONDATA-3136] Fix JVM crash with preaggregate datamap when average of decimal column is taken with orderby
Date Fri, 30 Nov 2018 16:34:16 GMT
[CARBONDATA-3136] Fix JVM crash with preaggregate datamap when average of decimal column is
taken with orderby

problem: JVM crash with preaggregate datamap when average of decimal column is taken with
orderby.

cause: When preparing plan with preaggregate datamap, decimal is cast to double in average
expression. This was leading to JVM crash in spark as we were filling with wrong precision
(callstack mentioned in JIRA)

solution: division result of average, should be casted to decimal instead of double for decimal
datatype.

This closes #2958


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/63d16098
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/63d16098
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/63d16098

Branch: refs/heads/branch-1.5
Commit: 63d16098ea7901f45eab6e4caa7133c2e309105e
Parents: 2036dc0
Author: ajantha-bhat <ajanthabhat@gmail.com>
Authored: Tue Nov 27 19:37:49 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri Nov 30 21:57:21 2018 +0530

----------------------------------------------------------------------
 .../TestPreAggregateExpressions.scala           | 11 +++++++++
 .../sql/hive/CarbonPreAggregateRules.scala      | 26 +++++++++++++++-----
 2 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/63d16098/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
index b3b71a6..a7511fd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
@@ -164,6 +164,17 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll
{
     }
   }
 
+  test("Test Pre_aggregate with decimal column with order by") {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, decimal_col decimal(30,16)) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',452.564")
+    sql(
+      "create datamap ag1 on table maintable using 'preaggregate' as select name,avg(decimal_col)"
+
+      " from maintable group by name")
+    checkAnswer(sql("select avg(decimal_col) from maintable group by name order by name"),
+      Seq(Row(452.56400000000000000000)))
+  }
+
   override def afterAll: Unit = {
     sql("DROP TABLE IF EXISTS mainTable")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/63d16098/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 76ff41a..9b204f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -989,8 +989,15 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
           Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType))
         newExp
       case Average(exp: Expression) =>
-        val newExp = Seq(AggregateExpression(Sum(Cast(exp, DoubleType)), aggExp.mode, false),
-          Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType))
+        val dataType =
+          if (exp.dataType.isInstanceOf[DecimalType]) {
+            // decimal must not go as double precision.
+            exp.dataType.asInstanceOf[DecimalType]
+          } else {
+            DoubleType
+          }
+        val newExp = Seq(AggregateExpression(Sum(Cast(exp, dataType)), aggExp.mode, false),
+          Cast(AggregateExpression(Count(exp), aggExp.mode, false), dataType))
         newExp
       case _ =>
         val newExp = Seq(aggExp)
@@ -1663,6 +1670,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
               false))
         }
       case Average(exp: Expression) =>
+        val dataType =
+          if (exp.dataType.isInstanceOf[DecimalType]) {
+            // decimal must not go as double precision.
+            exp.dataType.asInstanceOf[DecimalType]
+          } else {
+            DoubleType
+          }
         // for handling Normal table case/Aggregate node added in case of streaming table
         if (!isStreamingTable) {
           // In case of average aggregate function select 2 columns from aggregate table
@@ -1670,24 +1684,24 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
           // Then add divide(sum(column with sum), sum(column with count)).
           Seq(Divide(AggregateExpression(Sum(Cast(
             attrs.head,
-            DoubleType)),
+            dataType)),
             aggExp.mode,
             false),
             AggregateExpression(Sum(Cast(
               attrs.last,
-              DoubleType)),
+              dataType)),
               aggExp.mode,
               false)))
         } else {
           // in case of streaming aggregate table return two aggregate function sum and count
           Seq(AggregateExpression(Sum(Cast(
             attrs.head,
-            DoubleType)),
+            dataType)),
             aggExp.mode,
             false),
             AggregateExpression(Sum(Cast(
               attrs.last,
-              DoubleType)),
+              dataType)),
               aggExp.mode,
               false))
         }


Mime
View raw message