carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] carbondata git commit: reject alter table to disable streaming property
Date Sat, 25 Nov 2017 07:33:12 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master a13d03c74 -> 85f039203


reject alter table to disable streaming property


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/19f9e6cd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/19f9e6cd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/19f9e6cd

Branch: refs/heads/master
Commit: 19f9e6cd8e065251a1fc8b348ff19d407cb0b6c7
Parents: a13d03c
Author: QiangCai <qiangcai@qq.com>
Authored: Fri Nov 24 19:38:01 2017 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Sat Nov 25 15:31:24 2017 +0800

----------------------------------------------------------------------
 .../sql/execution/strategy/DDLStrategy.scala    | 11 ++++++++
 .../TestStreamingTableOperation.scala           | 29 ++++++++++++++++++++
 2 files changed, 40 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/19f9e6cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index d0dca68..e1f1ef2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -156,11 +156,22 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy
{
       case AlterTableSetPropertiesCommand(tableName, properties, isView)
         if CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(tableName)(sparkSession) => {
+        val property = properties.find(_._1.equalsIgnoreCase("streaming"))
+        if (property.isDefined) {
+          if (!property.get._2.trim.equalsIgnoreCase("true")) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter table to disable streaming property")
+          }
+        }
         ExecutedCommandExec(AlterTableSetCommand(tableName, properties, isView)) :: Nil
       }
       case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView)
         if CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(tableName)(sparkSession) => {
+        if (propKeys.find(_.equalsIgnoreCase("streaming")).isDefined) {
+          throw new MalformedCarbonCommandException(
+            "Unsupported alter table to unset streaming properties")
+        }
         ExecutedCommandExec(AlterTableUnsetCommand(tableName, propKeys, ifExists, isView))
:: Nil
       }
       case _ => Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/19f9e6cd/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 33aa2c9..7cbec04 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -106,6 +106,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     // 11. table for delete segment test
     createTable(tableName = "stream_table_delete", streaming = true, withBatchLoad = false)
+
+    // 12. reject alter streaming properties
+    createTable(tableName = "stream_table_alter", streaming = true, withBatchLoad = false)
   }
 
   test("validate streaming property") {
@@ -185,6 +188,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     sql("drop table if exists streaming.stream_table_new")
     sql("drop table if exists streaming.stream_table_tolerant")
     sql("drop table if exists streaming.stream_table_delete")
+    sql("drop table if exists streaming.stream_table_alter")
   }
 
   // normal table not support streaming ingest
@@ -637,6 +641,31 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     }
   }
 
+  test("reject alter streaming properties") {
+    try {
+      sql("ALTER TABLE stream_table_alter UNSET TBLPROPERTIES IF EXISTS ('streaming')")
+      assert(false, "unsupport to unset streaming property")
+    } catch {
+      case _ =>
+        assert(true)
+    }
+    try {
+      sql("ALTER TABLE stream_table_alter SET TBLPROPERTIES('streaming'='true')")
+      assert(true)
+    } catch {
+      case _ =>
+        assert(false, "should support set table to streaming")
+    }
+
+    try {
+      sql("ALTER TABLE stream_table_alter SET TBLPROPERTIES('streaming'='false')")
+      assert(false,  "unsupport disable streaming properties")
+    } catch {
+      case _ =>
+        assert(true)
+    }
+  }
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,


Mime
View raw message