carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [03/18] carbondata git commit: [CARBONDATA-1576][PREAGG][DATAMAP] Support DataMap drop
Date Tue, 14 Nov 2017 17:50:25 GMT
[CARBONDATA-1576][PREAGG][DATAMAP] Support DataMap drop

Added drop datamap sql parser to support dropping datamap from table.

DROP DATAMAP IF EXISTS datamapname ON TABLE tablename
Added restriction on dropping child table if user tries to drop the datamap table directly.

This closes #1489


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1a621895
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1a621895
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1a621895

Branch: refs/heads/fgdatamap
Commit: 1a621895b9c2646480a5d29aa8e6eafb9252c51a
Parents: 1e408c5
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Mon Nov 13 21:27:38 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Nov 14 16:17:01 2017 +0800

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       |  23 ++-
 .../ThriftWrapperSchemaConverterImpl.java       |   4 +-
 .../presto/impl/CarbonTableReader.java          |   4 +-
 .../preaggregate/TestPreAggCreateCommand.scala  |  24 +--
 .../preaggregate/TestPreAggregateDrop.scala     |  57 ++++++-
 .../testsuite/datamap/TestDataMapCommand.scala  |  14 +-
 .../carbondata/events/DropDataMapEvents.scala   |  54 +++++++
 .../org/apache/spark/sql/CarbonSession.scala    |   4 +-
 .../command/CarbonDropTableCommand.scala        |  20 ++-
 .../datamap/CarbonDropDataMapCommand.scala      | 148 +++++++++++++++++++
 .../command/datamap/CreateDataMapCommand.scala  |  69 +++++++++
 .../command/datamap/DataMapListeners.scala      |  72 +++++++++
 .../CreatePreAggregateTableCommand.scala        |  11 +-
 .../preaaggregate/PreAggregateListeners.scala   |  25 ----
 .../preaaggregate/PreAggregateUtil.scala        |   2 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |   2 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  45 +-----
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |   8 +-
 .../spark/sql/hive/CarbonSessionState.scala     |   3 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  15 +-
 20 files changed, 484 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 2b5d5cd..d30483a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -144,23 +144,40 @@ public final class DataMapStoreManager {
    * Clear the datamap/datamaps of a table from memory
    * @param identifier Table identifier
    */
-  public void clearDataMap(AbsoluteTableIdentifier identifier) {
+  public void clearDataMaps(AbsoluteTableIdentifier identifier) {
     List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
     segmentRefreshMap.remove(identifier.uniqueName());
     if (tableDataMaps != null) {
-      int i = 0;
       for (TableDataMap tableDataMap: tableDataMaps) {
         if (tableDataMap != null) {
           tableDataMap.clear();
           break;
         }
-        i++;
       }
       allDataMaps.remove(identifier.uniqueName());
     }
   }
 
   /**
+   * Clear the datamap/datamaps of a table from memory
+   * @param identifier Table identifier
+   */
+  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+    List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
+    if (tableDataMaps != null) {
+      int i = 0;
+      for (TableDataMap tableDataMap: tableDataMaps) {
+        if (tableDataMap != null && dataMapName.equalsIgnoreCase(tableDataMap.getDataMapName())) {
+          tableDataMap.clear();
+          tableDataMaps.remove(i);
+          break;
+        }
+        i++;
+      }
+    }
+  }
+
+  /**
    * Get the blocklet datamap factory to get the detail information of blocklets
    * @param identifier
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index fef2e0f..2027df2 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -326,8 +326,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       if (wrapperChildSchema.getRelationIdentifier() != null) {
         org.apache.carbondata.format.RelationIdentifier relationIdentifier =
             new org.apache.carbondata.format.RelationIdentifier();
-        relationIdentifier.setDatabaseName(
-            wrapperChildSchema.getRelationIdentifier().getDatabaseName());
+        relationIdentifier
+            .setDatabaseName(wrapperChildSchema.getRelationIdentifier().getDatabaseName());
         relationIdentifier.setTableName(wrapperChildSchema.getRelationIdentifier().getTableName());
         relationIdentifier.setTableId(wrapperChildSchema.getRelationIdentifier().getTableId());
         thriftChildSchema.setRelationIdentifire(relationIdentifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index d61322d..9839fc8 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -42,8 +42,6 @@ import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.service.impl.PathFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.events.OperationEventListener;
-import org.apache.carbondata.events.OperationListenerBus;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
@@ -137,7 +135,7 @@ public class CarbonTableReader {
   }
 
   private void removeTableFromCache(SchemaTableName table) {
-    DataMapStoreManager.getInstance().clearDataMap(cc.get(table).carbonTable.getAbsoluteTableIdentifier());
+    DataMapStoreManager.getInstance().clearDataMaps(cc.get(table).carbonTable.getAbsoluteTableIdentifier());
     cc.remove(table);
     tableList.remove(table);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index ce2d1d2..7c06634 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -19,28 +19,28 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("create datamap preagg1 on table PreAggMain using 'preaggregate' as select a,sum(b) from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg1"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg1"), true, "preaggmain_b_sum")
-    sql("drop table PreAggMain_preagg1")
+    sql("drop datamap preagg1 on table PreAggMain")
   }
 
   test("test pre agg create table Two") {
     sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_b_sum")
-    sql("drop table PreAggMain_preagg2")
+    sql("drop datamap preagg2 on table PreAggMain")
   }
 
   test("test pre agg create table Three") {
     sql("create datamap preagg3 on table PreAggMain using 'preaggregate' as select a,sum(b) as sum from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg3"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg3"), true, "preaggmain_b_sum")
-    sql("drop table PreAggMain_preagg3")
+    sql("drop datamap preagg3 on table PreAggMain")
   }
 
   test("test pre agg create table four") {
     sql("create datamap preagg4 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg4"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg4"), true, "preaggmain_b_sum")
-    sql("drop table PreAggMain_preagg4")
+    sql("drop datamap preagg4 on table PreAggMain")
   }
 
 
@@ -49,7 +49,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg11"), true, "preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg11"), true, "preaggmain1_b_sum")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg11"), true, "DICTIONARY")
-    sql("drop table PreAggMain1_preagg11")
+    sql("drop datamap preagg11 on table PreAggMain1")
   }
 
   test("test pre agg create table six") {
@@ -57,7 +57,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "preaggmain1_b_sum")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "DICTIONARY")
-    sql("drop table PreAggMain1_preagg12")
+    sql("drop datamap preagg12 on table PreAggMain1")
   }
 
   test("test pre agg create table seven") {
@@ -65,7 +65,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg13"), true, "preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg13"), true, "preaggmain1_b_sum")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg13"), true, "DICTIONARY")
-    sql("drop table PreAggMain1_preagg13")
+    sql("drop datamap preagg13 on table PreAggMain1")
   }
 
   test("test pre agg create table eight") {
@@ -73,7 +73,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "preaggmain1_b_sum")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "DICTIONARY")
-    sql("drop table PreAggMain1_preagg14")
+    sql("drop datamap preagg14 on table PreAggMain1")
   }
 
 
@@ -82,28 +82,28 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg15"), true, "preaggmain2_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg15"), true, "preaggmain2_b_sum")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg15"), true, "preaggmain2_b_count")
-    sql("drop table PreAggMain2_preagg15")
+    sql("drop datamap preagg15 on table PreAggMain2")
   }
 
   test("test pre agg create table ten") {
     sql("create datamap preagg16 on table PreAggMain2 using 'preaggregate' as select a as a1,max(b) from PreAggMain2 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg16"), true, "preaggmain2_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg16"), true, "preaggmain2_b_max")
-    sql("drop table PreAggMain2_preagg16")
+    sql("drop datamap preagg16 on table PreAggMain2")
   }
 
   test("test pre agg create table eleven") {
     sql("create datamap preagg17 on table PreAggMain2 using 'preaggregate' as select a,min(b) from PreAggMain2 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg17"), true, "preaggmain2_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg17"), true, "preaggmain2_b_min")
-    sql("drop table PreAggMain2_preagg17")
+    sql("drop datamap preagg17 on table PreAggMain2")
   }
 
   test("test pre agg create table twelve") {
     sql("create datamap preagg18 on table PreAggMain2 using 'preaggregate' as select a as a1,count(b) from PreAggMain2 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg18"), true, "preaggmain2_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg18"), true, "preaggmain2_b_count")
-    sql("drop table PreAggMain2_preagg18")
+    sql("drop datamap preagg18 on table PreAggMain2")
   }
 
   test("test pre agg create table thirteen") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index fee73a7..147cb6d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -24,8 +24,8 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
     sql("drop table if exists maintable")
-    sql("drop table if exists maintable_preagg1")
-    sql("drop table if exists maintable_preagg2")
+    sql("drop datamap if exists preagg1 on table maintable")
+    sql("drop datamap if exists preagg2 on table maintable")
     sql("create table maintable (a string, b string, c string) stored by 'carbondata'")
   }
 
@@ -33,7 +33,7 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
     sql(
       "create datamap preagg1 on table maintable using 'preaggregate' as select" +
       " a,sum(b) from maintable group by a")
-    sql("drop table if exists maintable_preagg1")
+    sql("drop datamap if exists preagg1 on table maintable")
     checkExistence(sql("show tables"), false, "maintable_preagg1")
   }
 
@@ -42,14 +42,54 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
       "create datamap preagg1 on table maintable using 'preaggregate' as select" +
       " a,sum(b) from maintable group by a")
     sql(
-      "create datamap preagg2 on table maintable using 'preaggregate'  as select" +
+      "create datamap preagg2 on table maintable using 'preaggregate' as select" +
       " a,sum(c) from maintable group by a")
-    sql("drop table if exists maintable_preagg2")
+    sql("drop datamap if exists preagg2 on table maintable")
     val showTables = sql("show tables")
     checkExistence(showTables, false, "maintable_preagg2")
     checkExistence(showTables, true, "maintable_preagg1")
   }
-  
+
+  test("drop datamap which is not existed") {
+    intercept[RuntimeException] {
+      sql("drop datamap newpreagg on table maintable")
+    }
+  }
+
+  test("drop datamap with same name on different tables") {
+    sql("drop table if exists maintable1")
+    sql("create datamap preagg_same on table maintable using 'preaggregate' as select" +
+    " a,sum(c) from maintable group by a")
+    sql("create table maintable1 (a string, b string, c string) stored by 'carbondata'")
+    sql("create datamap preagg_same on table maintable1 using 'preaggregate' as select" +
+    " a,sum(c) from maintable1 group by a")
+
+    sql("drop datamap preagg_same on table maintable")
+    var showTables = sql("show tables")
+    checkExistence(showTables, false, "maintable_preagg_same")
+    checkExistence(showTables, true, "maintable1_preagg_same")
+    sql("drop datamap preagg_same on table maintable1")
+    showTables = sql("show tables")
+    checkExistence(showTables, false, "maintable1_preagg_same")
+    sql("drop table if exists maintable1")
+  }
+
+  test("drop datamap and create again with same name") {
+    sql("create datamap preagg_same1 on table maintable using 'preaggregate' as select" +
+    " a,sum(c) from maintable group by a")
+
+    sql("drop datamap preagg_same1 on table maintable")
+    var showTables = sql("show tables")
+    checkExistence(showTables, false, "maintable_preagg_same1")
+    sql("create datamap preagg_same1 on table maintable using 'preaggregate' as select" +
+        " a,sum(c) from maintable group by a")
+    showTables = sql("show tables")
+    checkExistence(showTables, true, "maintable_preagg_same1")
+    sql("drop datamap preagg_same1 on table maintable")
+  }
+
+
+
   test("drop main table and check if preaggreagte is deleted") {
     sql(
       "create datamap preagg2 on table maintable using 'preaggregate' as select" +
@@ -60,8 +100,9 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
 
   override def afterAll() {
     sql("drop table if exists maintable")
-    sql("drop table if exists maintable_preagg1")
-    sql("drop table if exists maintable_preagg2")
+    sql("drop table if exists maintable1")
+    sql("drop datamap if exists preagg1 on table maintable")
+    sql("drop datamap if exists preagg2 on table maintable")
   }
   
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 2512def..b7121b7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -63,7 +63,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test datamap create with preagg") {
-    sql("drop table if exists datamap3")
+    sql("drop datamap if exists datamap3 on table datamaptest")
     sql(
       "create datamap datamap3 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest")
     val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
@@ -77,7 +77,6 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
   test("test datamap create with preagg with duplicate name") {
     intercept[Exception] {
-      sql("drop table if exists datamap2")
       sql(
         "create datamap datamap2 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest")
 
@@ -88,6 +87,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     assert(dataMapSchemaList.size() == 3)
   }
 
+  test("test datamap drop with preagg") {
+    intercept[Exception] {
+      sql("drop table datamap3")
+
+    }
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 3)
+  }
+
 
   override def afterAll {
     sql("drop table if exists datamaptest")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropDataMapEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropDataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropDataMapEvents.scala
new file mode 100644
index 0000000..5254c4e
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropDataMapEvents.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+
+
+/**
+ * This event is fired before creating datamap
+ * @param ifExistsSet
+ * @param sparkSession
+ */
+case class DropDataMapPreEvent(
+    dataMapSchema: Option[DataMapSchema],
+    ifExistsSet: Boolean,
+    sparkSession: SparkSession) extends Event
+
+
+/**
+ * This event is fired after creating datamap.
+ * @param ifExistsSet
+ * @param sparkSession
+ */
+case class DropDataMapPostEvent(
+    dataMapSchema: Option[DataMapSchema],
+    ifExistsSet: Boolean,
+    sparkSession: SparkSession) extends Event
+
+
+/**
+ * This event is fired when any abort operation during datamap creation.
+ * @param ifExistsSet
+ * @param sparkSession
+ */
+case class DropDataMapAbortEvent(
+    dataMapSchema: Option[DataMapSchema],
+    ifExistsSet: Boolean,
+    sparkSession: SparkSession) extends Event

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index c0393d3..88cbc36 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener}
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
 import org.apache.spark.sql.hive.CarbonSessionState
@@ -227,7 +228,7 @@ object CarbonSession {
 
   def initListeners(): Unit = {
     OperationListenerBus.getInstance()
-      .addListener(classOf[DropTablePostEvent], DropPreAggregateTablePostListener)
+      .addListener(classOf[DropTablePostEvent], DataMapDropTablePostListener)
       .addListener(classOf[LoadTablePostExecutionEvent], LoadPostAggregateListener)
       .addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener)
       .addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener)
@@ -238,5 +239,6 @@ object CarbonSession {
       .addListener(classOf[AlterTableRenamePreEvent], PreAggregateRenameTablePreListener)
       .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener)
       .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
+      .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index a8e6c37..1bf17b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -27,16 +27,17 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{DropTablePostEvent, DropTablePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events._
 
 case class CarbonDropTableCommand(
     ifExistsSet: Boolean,
     databaseNameOp: Option[String],
-    tableName: String)
+    tableName: String,
+    dropChildTable: Boolean = false)
   extends RunnableCommand with SchemaProcessCommand with DataProcessCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -76,6 +77,19 @@ case class CarbonDropTableCommand(
               None
           }
         }
+      if (carbonTable.isDefined) {
+        val relationIdentifiers = carbonTable.get.getTableInfo.getParentRelationIdentifiers
+        if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
+          if (!dropChildTable) {
+            if (!ifExistsSet) {
+              throw new Exception("Child table which is associated with datamap cannot " +
+                                  "be dropped, use DROP DATAMAP command to drop")
+            } else {
+              return Seq.empty
+            }
+          }
+        }
+      }
       val operationContext = new OperationContext
       val dropTablePreEvent: DropTablePreEvent =
         DropTablePreEvent(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
new file mode 100644
index 0000000..dc3b1ae
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.datamap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.execution.command.{DataProcessCommand, RunnableCommand, SchemaProcessCommand}
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.events._
+
+
+/**
+ * Drops the datamap and any related tables associated with the datamap
+ * @param dataMapName
+ * @param ifExistsSet
+ * @param databaseNameOp
+ * @param tableName
+ */
+case class CarbonDropDataMapCommand(
+    dataMapName: String,
+    ifExistsSet: Boolean,
+    databaseNameOp: Option[String],
+    tableName: String)
+  extends RunnableCommand with SchemaProcessCommand with DataProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+    processData(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+    val identifier = TableIdentifier(tableName, Option(dbName))
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
+    val carbonEnv = CarbonEnv.getInstance(sparkSession)
+    val catalog = carbonEnv.carbonMetastore
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
+        dbName.toLowerCase, tableName.toLowerCase)
+    catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
+    val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
+    try {
+      locksToBeAcquired foreach {
+        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
+      }
+      LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
+      val carbonTable: Option[CarbonTable] =
+        catalog.getTableFromMetadataCache(dbName, tableName) match {
+          case Some(tableMeta) => Some(tableMeta.carbonTable)
+          case None => try {
+            Some(catalog.lookupRelation(identifier)(sparkSession)
+              .asInstanceOf[CarbonRelation].metaData.carbonTable)
+          } catch {
+            case ex: NoSuchTableException =>
+              if (!ifExistsSet) {
+                throw ex
+              }
+              None
+          }
+        }
+      if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
+        val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
+          find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
+        if (dataMapSchema.isDefined) {
+
+          val operationContext = new OperationContext
+          val dropDataMapPreEvent =
+            DropDataMapPreEvent(
+              Some(dataMapSchema.get._1),
+              ifExistsSet,
+              sparkSession)
+          OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext)
+
+          carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2)
+          val schemaConverter = new ThriftWrapperSchemaConverterImpl
+          PreAggregateUtil.updateSchemaInfo(
+            carbonTable.get,
+            schemaConverter.fromWrapperToExternalTableInfo(
+              carbonTable.get.getTableInfo,
+              dbName,
+              tableName))(sparkSession)
+          // fires the event after dropping datamap from main table schema
+          val dropDataMapPostEvent =
+            DropDataMapPostEvent(
+              Some(dataMapSchema.get._1),
+              ifExistsSet,
+              sparkSession)
+          OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, operationContext)
+        } else if (!ifExistsSet) {
+          throw new IllegalArgumentException(
+            s"Datamap with name $dataMapName does not exist under table $tableName")
+        }
+      }
+
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, s"Dropping datamap $dataMapName failed")
+        sys.error(s"Dropping datamap $dataMapName failed: ${ ex.getMessage }")
+    }
+    finally {
+      if (carbonLocks.nonEmpty) {
+        val unlocked = carbonLocks.forall(_.unlock())
+        if (unlocked) {
+          logInfo("Table MetaData Unlocked Successfully")
+        }
+      }
+    }
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    // delete the table folder
+    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+    DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala
new file mode 100644
index 0000000..abf461e
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+
+/**
+ * Below command class will be used to create datamap on table
+ * and updating the parent table about the datamap information
+ *
+ * @param queryString
+ */
+case class CreateDataMapCommand(
+    dataMapName: String,
+    tableIdentifier: TableIdentifier,
+    dmClassName: String,
+    dmproperties: Map[String, String],
+    queryString: Option[String])
+  extends RunnableCommand with SchemaProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
+        dmClassName.equalsIgnoreCase("preaggregate")) {
+      CreatePreAggregateTableCommand(dataMapName,
+        tableIdentifier,
+        dmClassName,
+        dmproperties,
+        queryString.get).run(sparkSession)
+    } else {
+
+      val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+      dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
+      val dbName = GetDB.getDatabaseName(tableIdentifier.database, sparkSession)
+      // upadting the parent table about dataschema
+      PreAggregateUtil.updateMainTable(dbName, tableIdentifier.table, dataMapSchema, sparkSession)
+    }
+    LOGGER.audit(s"DataMap ${dataMapName} successfully added to Table ${tableIdentifier.table}")
+    Seq.empty
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
new file mode 100644
index 0000000..a693d4c
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.command.CarbonDropTableCommand
+
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.events.{DropDataMapPostEvent, DropTablePostEvent, Event, OperationContext, OperationEventListener}
+
+object DataMapDropTablePostListener extends OperationEventListener {
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
+    val carbonTable = dropPostEvent.carbonTable
+    val sparkSession = dropPostEvent.sparkSession
+    if (carbonTable.isDefined && carbonTable.get.hasDataMapSchema) {
+      val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
+      for (childSchema: DataMapSchema <- childSchemas.asScala) {
+        if (childSchema.getRelationIdentifier != null) {
+          CarbonDropTableCommand(
+            ifExistsSet = true,
+            Some(childSchema.getRelationIdentifier.getDatabaseName),
+            childSchema.getRelationIdentifier.getTableName, true
+          ).run(sparkSession)
+        }
+      }
+    }
+
+  }
+}
+
+object DropDataMapPostListener extends OperationEventListener {
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val dropPostEvent = event.asInstanceOf[DropDataMapPostEvent]
+    val dataMapSchema = dropPostEvent.dataMapSchema
+    val sparkSession = dropPostEvent.sparkSession
+    if (dataMapSchema.isDefined) {
+      if (dataMapSchema.get.getRelationIdentifier != null) {
+        CarbonDropTableCommand(ifExistsSet = true,
+          Some(dataMapSchema.get.getRelationIdentifier.getDatabaseName),
+          dataMapSchema.get.getRelationIdentifier.getTableName, true).run(sparkSession)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 3a78968..dd002f0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -86,11 +86,11 @@ case class CreatePreAggregateTableCommand(
     CarbonCreateTableCommand(tableModel).run(sparkSession)
     try {
       val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
-        lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+      lookupRelation( tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
       val tableInfo = relation.tableMeta.carbonTable.getTableInfo
       // child schema object which will be updated on parent table about the
-      val childSchema = tableInfo.getFactTable.buildChildSchema(
-        dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
+      val childSchema = tableInfo.getFactTable
+        .buildChildSchema(dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
         tableInfo.getDatabaseName, queryString, "AGGREGATION")
       dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
       // updating the parent table about child table
@@ -102,8 +102,9 @@ case class CreatePreAggregateTableCommand(
       }
     } catch {
       case e: Exception =>
-        sparkSession.sql(
-          s"""DROP TABLE IF EXISTS ${ tableModel.databaseName }.${ tableModel.tableName }""")
+        CarbonDropTableCommand(
+          ifExistsSet = true,
+          Some( tableModel.databaseName ), tableModel.tableName ).run(sparkSession)
         throw e
 
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 6785f73..506a405 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -26,31 +26,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.events._
 
-object DropPreAggregateTablePostListener extends OperationEventListener {
-
-  /**
-   * Called on a specified event occurrence
-   *
-   * @param event
-   */
-  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
-    val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
-    val carbonTable = dropPostEvent.carbonTable
-    val sparkSession = dropPostEvent.sparkSession
-    if (carbonTable.isDefined && carbonTable.get.hasDataMapSchema) {
-      val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
-      for (childSchema: DataMapSchema <- childSchemas.asScala) {
-        if (childSchema.getRelationIdentifier != null) {
-          CarbonDropTableCommand(ifExistsSet = true,
-            Some(childSchema.getRelationIdentifier.getDatabaseName),
-            childSchema.getRelationIdentifier.getTableName).run(sparkSession)
-        }
-      }
-    }
-
-  }
-}
-
 object LoadPostAggregateListener extends OperationEventListener {
   /**
    * Called on a specified event occurrence

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 62e7623..d693061 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -298,7 +298,7 @@ object PreAggregateUtil {
         precision = precision,
         scale = scale,
         rawSchema = rawSchema), dataMapField)
-    } else {
+} else {
       (Field(column = actualColumnName,
         dataType = Some(dataType.typeName),
         name = Some(actualColumnName),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 874bdfb..c2e5cf0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -86,7 +86,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       carbonTable = tableMeta.carbonTable
       // invalid data map for the old table, see CARBON-1690
       val oldTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath)
-      DataMapStoreManager.getInstance().clearDataMap(oldTableIdentifier)
+      DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
       // get the latest carbon table and check for column existence
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
       val tableMetadataFile = carbonTablePath.getPath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index a5e1ec4..0343402 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -450,41 +450,6 @@ class CarbonFileMetastore extends CarbonMetaStore {
     }
   }
 
-  protected def updateParentTableInfo(parentRelationIdentifier: RelationIdentifier,
-      childCarbonTable: CarbonTable)(sparkSession: SparkSession): Unit = {
-    val dbName = parentRelationIdentifier.getDatabaseName
-    val tableName = parentRelationIdentifier.getTableName
-    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    try {
-      val tableMeta = metaStore.getTableFromMetadataCache(dbName, tableName)
-      if (tableMeta.isDefined) {
-        val parentCarbonTable = tableMeta.get.carbonTable
-        val childSchemas = parentCarbonTable.getTableInfo.getDataMapSchemaList
-        if (childSchemas == null) {
-          throw UninitializedFieldError("Child schemas is not initialized")
-        }
-        val childSchemaIterator = childSchemas.iterator()
-        while (childSchemaIterator.hasNext) {
-          val childSchema = childSchemaIterator.next().getChildSchema
-          if (childSchema != null &&
-              childSchema.equals(childCarbonTable.getTableInfo.getFactTable)) {
-            childSchemaIterator.remove()
-          }
-        }
-        val schemaConverter = new ThriftWrapperSchemaConverterImpl
-        PreAggregateUtil
-          .updateSchemaInfo(parentCarbonTable,
-            schemaConverter
-              .fromWrapperToExternalTableInfo(parentCarbonTable.getTableInfo,
-                dbName,
-                tableName))(sparkSession)
-      }
-    } catch {
-      case ex: Exception =>
-        LOGGER.error(ex, s"Updating parent table $dbName.$tableName failed.")
-        throw ex
-    }
-  }
 
   def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession) {
@@ -498,14 +463,6 @@ class CarbonFileMetastore extends CarbonMetaStore {
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
     val fileType = FileFactory.getFileType(metadataFilePath)
-    if (carbonTable != null) {
-      val parentRelations = carbonTable.getTableInfo.getParentRelationIdentifiers
-      if (parentRelations != null && !parentRelations.isEmpty) {
-        for (parentRelation: RelationIdentifier <- parentRelations.asScala) {
-          updateParentTableInfo(parentRelation, carbonTable)(sparkSession)
-        }
-      }
-    }
 
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
@@ -518,7 +475,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-      DataMapStoreManager.getInstance().clearDataMap(identifier)
+      DataMapStoreManager.getInstance().clearDataMaps(identifier)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 6bd80f3..a500f00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -81,17 +81,11 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
     checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
-    val parentRelations = carbonTable.getTableInfo.getParentRelationIdentifiers
-    if (parentRelations != null && !parentRelations.isEmpty) {
-      for (parentRelation: RelationIdentifier <- parentRelations.asScala) {
-        updateParentTableInfo(parentRelation, carbonTable)(sparkSession)
-      }
-    }
     removeTableFromMetadata(dbName, tableName)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-    DataMapStoreManager.getInstance().clearDataMap(identifier)
+    DataMapStoreManager.getInstance().clearDataMaps(identifier)
   }
 
   override def checkSchemasModifiedTimeAndReloadTables(storePath: String) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 939f627..825f6ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener}
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
@@ -117,7 +118,7 @@ class CarbonSessionCatalog(
           carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
       refreshTable(identifier)
       DataMapStoreManager.getInstance().
-        clearDataMap(AbsoluteTableIdentifier.from(storePath,
+        clearDataMaps(AbsoluteTableIdentifier.from(storePath,
           identifier.database.getOrElse("default"), identifier.table))
       isRefreshed = true
       logInfo(s"Schema changes have been detected for table: $identifier")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a621895/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 3bed9d1..be89248 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.datamap.CarbonCreateDataMapCommand
+import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDropDataMapCommand}
 import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
 import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
@@ -80,7 +80,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     alterAddPartition | alterSplitPartition | alterDropPartition
 
   protected lazy val datamapManagement: Parser[LogicalPlan] =
-    createDataMap
+    createDataMap | dropDataMap
 
   protected lazy val alterAddPartition: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
@@ -141,6 +141,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           TableIdentifier(tableName, dbName), className, map, query)
     }
 
+  /**
+   * The below syntax is used to drop the datamap.
+   * DROP DATAMAP IF EXISTS datamapName ON TABLE tablename
+   */
+  protected lazy val dropDataMap: Parser[LogicalPlan] =
+    DROP ~> DATAMAP ~> opt(IF ~> EXISTS) ~ ident ~ (ON ~ TABLE) ~
+    (ident <~ ".").? ~ ident <~ opt(";")  ^^ {
+      case ifexists ~ dmname ~ ontable ~ dbName ~ tableName =>
+        CarbonDropDataMapCommand(dmname, ifexists.isDefined, dbName, tableName)
+    }
+
   protected lazy val deleteRecords: Parser[LogicalPlan] =
     (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ {
       case table ~ rest =>


Mime
View raw message