carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kumarvisha...@apache.org
Subject carbondata git commit: [CARBONDATA-1881] Insert overwrite value for pre-aggregate load was incorrect
Date Wed, 13 Dec 2017 08:44:30 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 29bae4d28 -> c08fe933c


[CARBONDATA-1881] Insert overwrite value for pre-aggregate load was incorrect

While loading the value for insert overwrite was set to false. Consider the value of insert
overwrite set for maintable for pre-aggregate table loading.

This closes #1639


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c08fe933
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c08fe933
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c08fe933

Branch: refs/heads/master
Commit: c08fe933c1479b05b41c3ef9c3b342c8de674526
Parents: 29bae4d
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Mon Dec 11 11:20:43 2017 +0530
Committer: kumarvishal <kumarvishal.1802@gmail.com>
Committed: Wed Dec 13 14:13:41 2017 +0530

----------------------------------------------------------------------
 .../preaggregate/TestPreAggregateLoad.scala         | 16 ++++++++++++++++
 .../carbondata/spark/rdd/CarbonDataRDDFactory.scala |  1 +
 .../CreatePreAggregateTableCommand.scala            |  1 +
 .../preaaggregate/PreAggregateListeners.scala       |  3 +++
 .../command/preaaggregate/PreAggregateUtil.scala    |  3 ++-
 5 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 569439c..6a5f221 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -187,4 +187,20 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52))
   }
 
+  test("test if pre-aagregate is overwritten if main table is inserted with insert overwrite")
{
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,
sum(age) from maintable group by id"""
+        .stripMargin)
+    sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
+    sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
+    sql(s"insert overwrite table maintable values(1, 'xyz', 'delhi', 29)")
+    checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 29))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8f4af1b..7955e71 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -503,6 +503,7 @@ object CarbonDataRDDFactory {
         sqlContext.sparkSession,
         carbonTable.getCarbonTableIdentifier,
         carbonLoadModel)
+      operationContext.setProperty("isOverwrite", overwriteTable)
       OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
       val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
       if (!done) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 9a84450..8c02f3b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -143,6 +143,7 @@ case class CreatePreAggregateTableCommand(
           queryString,
           segmentToLoad = "*",
           validateSegments = true,
+          isOverwrite = false,
           sparkSession = sparkSession)
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 747e447..1e5b305 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -70,12 +70,15 @@ object LoadPostAggregateListener extends OperationEventListener {
               databasename)
           }
         }
+        val isOverwrite =
+          operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
         PreAggregateUtil.startDataLoadForDataMap(
             table,
             TableIdentifier(childTableName, Some(childDatabaseName)),
             childSelectQuery,
             carbonLoadModel.getSegmentId,
             validateSegments = false,
+            isOverwrite,
             sparkSession)
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 5ad5308..81ccbd2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -504,6 +504,7 @@ object PreAggregateUtil {
       queryString: String,
       segmentToLoad: String,
       validateSegments: Boolean,
+      isOverwrite: Boolean,
       sparkSession: SparkSession): Unit = {
     CarbonSession.threadSet(
       CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
@@ -525,7 +526,7 @@ object PreAggregateUtil {
         null,
         Nil,
         Map("fileheader" -> headers),
-        isOverwriteTable = false,
+        isOverwriteTable = isOverwrite,
         dataFrame = Some(dataFrame),
         internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
         run(sparkSession)


Mime
View raw message