carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [25/50] [abbrv] carbondata git commit: [CARBONDATA-2010] Block streaming on main table of preaggregate datamap
Date Sun, 28 Jan 2018 06:45:54 GMT
[CARBONDATA-2010] Block streaming on main table of preaggregate datamap

If the table has 'preaggregate' DataMap, it doesn't support streaming now

This closes #1791


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fc81d831
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fc81d831
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fc81d831

Branch: refs/heads/fgdatamap
Commit: fc81d831cdc608435104d27eb2c39a57f66c80ed
Parents: 943588d
Author: QiangCai <qiangcai@qq.com>
Authored: Thu Jan 11 15:04:22 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Mon Jan 15 14:43:58 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java      | 15 +++++++++++++++
 .../spark/sql/execution/strategy/DDLStrategy.scala   | 10 ++++++++++
 .../carbondata/TestStreamingTableOperation.scala     | 12 ++++++++++++
 3 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc81d831/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index e27b126..74dfef6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -733,6 +733,21 @@ public class CarbonTable implements Serializable {
     return streaming != null && streaming.equalsIgnoreCase("true");
   }
 
+  /**
+   * whether this table has aggregation DataMap or not
+   */
+  public boolean hasAggregationDataMap() {
+    List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList();
+    if (dataMapSchemaList != null && !dataMapSchemaList.isEmpty()) {
+      for (DataMapSchema dataMapSchema : dataMapSchemaList) {
+        if (dataMapSchema instanceof AggregationDataMapSchema) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   public int getDimensionOrdinalMax() {
     return dimensionOrdinalMax;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc81d831/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 6ff762a..f058e96 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -218,6 +218,16 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       case AlterTableSetPropertiesCommand(tableName, properties, isView)
         if CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(tableName)(sparkSession) => {
+
+        // TODO remove this limiation after streaming table support 'preaggregate' DataMap
+        // if the table has 'preaggregate' DataMap, it doesn't support streaming now
+        val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+        if (carbonTable.hasAggregationDataMap) {
+          throw new MalformedCarbonCommandException(
+            "The table has 'preaggregate' DataMap, it doesn't support streaming")
+        }
+
         // TODO remove this limitation later
         val property = properties.find(_._1.equalsIgnoreCase("streaming"))
         if (property.isDefined) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc81d831/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index a8ab6fb..62076bf 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -128,6 +128,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     // 18. block drop table while streaming is in progress
     createTable(tableName = "stream_table_drop", streaming = true, withBatchLoad = false)
+
+    // 19. block streaming on 'preaggregate' main table
+    createTable(tableName = "agg_table_block", streaming = false, withBatchLoad = false)
   }
 
   test("validate streaming property") {
@@ -216,6 +219,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     sql("drop table if exists streaming.stream_table_close_auto_handoff")
     sql("drop table if exists streaming.stream_table_reopen")
     sql("drop table if exists streaming.stream_table_drop")
+    sql("drop table if exists streaming.agg_table_block")
+    sql("drop table if exists streaming.agg_table_block_agg0")
   }
 
   // normal table not support streaming ingest
@@ -995,6 +1000,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     assertResult(1)(resultStreaming.length)
     assertResult("true")(resultStreaming(0).getString(1).trim)
   }
+
+  test("block streaming for 'preaggregate' table") {
+    sql("create datamap agg_table_block_agg0 on table streaming.agg_table_block using 'preaggregate'
as select city, count(name) from streaming.agg_table_block group by city")
+    val msg = intercept[MalformedCarbonCommandException](sql("ALTER TABLE streaming.agg_table_block
SET TBLPROPERTIES('streaming'='true')"))
+    assertResult("The table has 'preaggregate' DataMap, it doesn't support streaming")(msg.getMessage)
+  }
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,


Mime
View raw message