From commits-return-12801-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Tue Aug 7 15:09:38 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 903581807AC for ; Tue, 7 Aug 2018 15:09:36 +0200 (CEST) Received: (qmail 53365 invoked by uid 500); 7 Aug 2018 13:09:27 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 52703 invoked by uid 99); 7 Aug 2018 13:09:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Aug 2018 13:09:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3A43CE11A7; Tue, 7 Aug 2018 13:09:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Date: Tue, 07 Aug 2018 13:10:08 -0000 Message-Id: <2d11ab762e2e42b9bb0285d8550567f6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] carbondata git commit: [CARBONDATA-2809][DataMap] Block rebuilding for bloom/lucene and preagg datamap [CARBONDATA-2809][DataMap] Block rebuilding for bloom/lucene and preagg datamap As manual refresh currently only works fine for MV, it has some bugs with other types of datamap such as preaggregate, timeserials, lucene, bloomfilter, we will block 'deferred rebuild' for them as well as block rebuild command for them. Fix bugs in deferred rebuild for MV MV datamap will be deferred rebuild no matter whether the deferred flag is set or not. This closes #2594 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/abcd4f6e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/abcd4f6e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/abcd4f6e Branch: refs/heads/external-format Commit: abcd4f6e23f8b8a9ac543e7dfad01cff40bf4ae1 Parents: b702a1b Author: xuchuanyin Authored: Mon Aug 6 19:26:04 2018 +0800 Committer: Jacky Li Committed: Tue Aug 7 18:12:07 2018 +0800 ---------------------------------------------------------------------- .../core/datamap/DataMapProvider.java | 1 + .../core/datamap/dev/DataMapFactory.java | 6 +++ .../mv/datamap/MVDataMapProvider.scala | 2 + docs/datamap/datamap-management.md | 32 ++++++++------- .../lucene/LuceneFineGrainDataMapSuite.scala | 33 +++++++++++++-- ...eneFineGrainDataMapWithSearchModeSuite.scala | 2 +- .../preaggregate/TestPreAggCreateCommand.scala | 10 ----- .../preaggregate/TestPreAggregateLoad.scala | 36 +++++++++++++++++ .../testsuite/datamap/TestDataMapCommand.scala | 1 - .../testsuite/datamap/TestDataMapStatus.scala | 2 + .../datamap/IndexDataMapProvider.java | 5 +++ .../datamap/PreAggregateDataMapProvider.java | 5 +++ .../datamap/CarbonCreateDataMapCommand.scala | 21 +++++++--- .../datamap/CarbonDataMapRebuildCommand.scala | 19 +++++++-- .../datamap/CarbonDataMapShowCommand.scala | 11 ++++- .../bloom/BloomCoarseGrainDataMapSuite.scala | 42 +++++++++++++++++--- 16 files changed, 182 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java index 086a1c0..cc05d31 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java @@ -125,4 +125,5 @@ public abstract class DataMapProvider { public abstract DataMapFactory getDataMapFactory(); + public abstract boolean supportRebuild(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index 67f82b2..de8dc58 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -173,4 +173,10 @@ public abstract class DataMapFactory { return false; } + /** + * whether this datamap support rebuild + */ + public boolean supportRebuild() { + return false; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala index 2aba23e..7108bf8 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala @@ -122,4 +122,6 @@ class MVDataMapProvider( override def getDataMapFactory: DataMapFactory[_ <: DataMap[_ <: Blocklet]] = { throw new UnsupportedOperationException } + + override def supportRebuild(): Boolean = true } http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/docs/datamap/datamap-management.md ---------------------------------------------------------------------- diff --git a/docs/datamap/datamap-management.md b/docs/datamap/datamap-management.md index 23f1517..b5d1aaa 100644 --- a/docs/datamap/datamap-management.md +++ b/docs/datamap/datamap-management.md @@ -1,17 +1,17 @@ @@ -31,26 +31,30 @@ DataMap can be created using following DDL SELECT statement ``` -Currently, there are 5 DataMap implementation in CarbonData. +Currently, there are 5 DataMap implementations in CarbonData. | DataMap Provider | Description | DMPROPERTIES | Management | | ---------------- | ---------------------------------------- | ---------------------------------------- | ---------------- | | preaggregate | single table pre-aggregate table | No DMPROPERTY is required | Automatic | -| timeseries | time dimension rollup table. | event_time, xx_granularity, please refer to [Timeseries DataMap](https://github.com/apache/carbondata/blob/master/docs/datamap/timeseries-datamap-guide.md) | Automatic | -| mv | multi-table pre-aggregate table, | No DMPROPERTY is required | Manual | -| lucene | lucene indexing for text column | index_columns to specifying the index columns | Manual/Automatic | -| bloomfilter | bloom filter for high cardinality column, geospatial column | index_columns to specifying the index columns | Manual/Automatic | +| timeseries | time dimension rollup table | event_time, xx_granularity, please refer to [Timeseries DataMap](https://github.com/apache/carbondata/blob/master/docs/datamap/timeseries-datamap-guide.md) | Automatic | +| mv | multi-table pre-aggregate table | No DMPROPERTY is required | Manual | +| lucene | lucene indexing for text column | index_columns to specifying the index columns | Automatic | +| bloomfilter | bloom filter for high cardinality column, geospatial column | index_columns to specifying the index columns | Automatic | ## DataMap Management There are two kinds of management semantic for DataMap. -1. Automatic Refresh: Create datamap without `WITH DEFERED REBUILD` in the statement, which is by default. -2. Manual Refresh: Create datamap with `WITH DEFERED REBUILD` in the statement +1. Automatic Refresh: Create datamap without `WITH DEFERRED REBUILD` in the statement, which is by default. +2. Manual Refresh: Create datamap with `WITH DEFERRED REBUILD` in the statement + +**CAUTION:** +Manual refresh currently only works fine for MV, it has some bugs with other types of datamap in Carbondata 1.4.1, so we block this option for them in this version. +If user create MV datamap without specifying `WITH DEFERRED REBUILD`, carbondata will give a warning and treat the datamap as deferred rebuild. ### Automatic Refresh -When user creates a datamap on the main table without using `WITH DEFERED REBUILD` syntax, the datamap will be managed by system automatically. +When user creates a datamap on the main table without using `WITH DEFERRED REBUILD` syntax, the datamap will be managed by system automatically. For every data load to the main table, system will immediately triger a load to the datamap automatically. These two data loading (to main table and datamap) is executed in a transactional manner, meaning that it will be either both success or neither success. The data loading to datamap is incremental based on Segment concept, avoiding a expesive total rebuild. http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala index 54cad00..e7bd366 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -132,7 +132,34 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { sql("drop datamap dm on table datamap_test") } - test("test lucene rebuild data map") { + // for CARBONDATA-2820, we will first block deferred rebuild for lucene + test("test block rebuild for lucene") { + val deferredRebuildException = intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING 'lucene' + | WITH DEFERRED REBUILD + | DMProperties('INDEX_COLUMNS'='city') + """.stripMargin) + } + assert(deferredRebuildException.getMessage.contains( + s"DEFERRED REBUILD is not supported on this datamap dm with provider lucene")) + + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING 'lucene' + | DMProperties('INDEX_COLUMNS'='city') + """.stripMargin) + val exception = intercept[MalformedDataMapCommandException] { + sql(s"REBUILD DATAMAP dm ON TABLE datamap_test") + } + sql("drop datamap dm on table datamap_test") + assert(exception.getMessage.contains("Non-lazy datamap dm does not support rebuild")) + } + + ignore("test lucene rebuild data map") { sql("DROP TABLE IF EXISTS datamap_test4") sql( """ @@ -658,7 +685,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { assert(ex6.getMessage.contains("Delete operation is not supported")) } - test("test lucene fine grain multiple data map on table") { + ignore("test lucene fine grain multiple data map on table") { sql("DROP TABLE IF EXISTS datamap_test5") sql( """ @@ -691,7 +718,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS datamap_test5") } - test("test lucene fine grain datamap rebuild") { + ignore("test lucene fine grain datamap rebuild") { sql("DROP TABLE IF EXISTS datamap_test5") sql( """ http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala index e6a2a36..0ac6e72 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala @@ -133,7 +133,7 @@ class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAnd sql("DROP TABLE IF EXISTS datamap_test_table") } - test("test lucene fine grain datamap rebuild") { + ignore("test lucene fine grain datamap rebuild") { sql("DROP TABLE IF EXISTS datamap_test5") sql( """ http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index ddfb9e7..f0c335d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -437,16 +437,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { } } - test("test pre agg datamap with deferred rebuild") { - val e = intercept[MalformedDataMapCommandException] { - sql("create datamap failure on table PreAggMain1 " + - "using 'preaggregate' " + - "with deferred rebuild " + - "as select a as a1,sum(b) as sum from PreAggMain1 group by a") - } - assert(e.getMessage.contains("DEFERRED REBUILD is not supported on this DataMap")) - } - // TODO: Need to Fix ignore("test creation of multiple preaggregate of same name concurrently") { sql("DROP TABLE IF EXISTS tbl_concurr") http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index 55994e8..818dd7c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -964,4 +964,40 @@ test("check load and select for avg double datatype") { .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") } + test("test deferred rebuild is not supported for preagg") { + val baseTable = "maintable" + val preagg = "preaggtable" + sql(s"DROP TABLE IF EXISTS $baseTable") + sql( + s""" + | CREATE TABLE $baseTable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val deferredRebuildException = intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP $preagg ON TABLE $baseTable + | USING 'preaggregate' + | WITH DEFERRED REBUILD + | AS select id, sum(age) from $baseTable group by id + """.stripMargin) + } + assert(deferredRebuildException.getMessage.contains( + s"DEFERRED REBUILD is not supported on this datamap $preagg with provider preaggregate")) + + sql( + s""" + | CREATE DATAMAP $preagg ON TABLE $baseTable + | USING 'preaggregate' + | AS select id, sum(age) from $baseTable group by id + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table $baseTable") + checkExistence(sql(s"SHOW DATAMAP ON TABLE $baseTable"), true, preagg, "preaggregate") + val exception = intercept[MalformedDataMapCommandException] { + sql(s"REBUILD DATAMAP $preagg ON TABLE $baseTable").show() + } + LOGGER.error(s"XU ${exception.getMessage}") + assert(exception.getMessage.contains(s"Non-lazy datamap $preagg does not support rebuild")) + sql(s"DROP TABLE IF EXISTS $baseTable") + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index afca3b2..8ebed1f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -225,7 +225,6 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { sql( s""" | create datamap $datamapName3 on table $tableName using 'bloomfilter' - | with deferred rebuild | DMPROPERTIES ('index_columns'='c') """.stripMargin) var result = sql(s"show datamap on table $tableName").cache() http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index f1c9432..fec2279 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -310,4 +310,6 @@ class TestDataMapFactory( } } } + + override def supportRebuild(): Boolean = true } http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java index cb5a1b1..487148d 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java @@ -127,4 +127,9 @@ public class IndexDataMapProvider extends DataMapProvider { public DataMapFactory getDataMapFactory() { return dataMapFactory; } + + @Override + public boolean supportRebuild() { + return dataMapFactory.supportRebuild(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java index 099d65d..233c41f 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java @@ -104,4 +104,9 @@ public class PreAggregateDataMapProvider extends DataMapProvider { public DataMapFactory getDataMapFactory() { throw new UnsupportedOperationException(); } + + @Override + public boolean supportRebuild() { + return false; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 1e4c2c3..17376a9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -44,9 +44,10 @@ case class CarbonCreateDataMapCommand( dmProperties: Map[String, String], queryString: Option[String], ifNotExistsSet: Boolean = false, - deferredRebuild: Boolean = false) + var deferredRebuild: Boolean = false) extends AtomicRunnableCommand { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) private var dataMapProvider: DataMapProvider = _ private var mainTable: CarbonTable = _ private var dataMapSchema: DataMapSchema = _ @@ -89,6 +90,13 @@ case class CarbonCreateDataMapCommand( val property = dmProperties.map(x => (x._1.trim, x._2.trim)).asJava val javaMap = new java.util.HashMap[String, String](property) + // for MV, it is deferred rebuild by default and cannot be non-deferred rebuild + if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.MV.getShortName)) { + if (!deferredRebuild) { + LOGGER.warn(s"DEFERRED REBUILD is enabled by default for MV datamap $dataMapName") + } + deferredRebuild = true + } javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString) dataMapSchema.setProperties(javaMap) @@ -97,6 +105,12 @@ case class CarbonCreateDataMapCommand( "For this datamap, main table is required. Use `CREATE DATAMAP ... ON TABLE ...` ") } dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession) + if (deferredRebuild && !dataMapProvider.supportRebuild()) { + throw new MalformedDataMapCommandException( + s"DEFERRED REBUILD is not supported on this datamap $dataMapName" + + s" with provider ${dataMapSchema.getProviderName}") + } + val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation val operationContext: OperationContext = new OperationContext() @@ -138,10 +152,6 @@ case class CarbonCreateDataMapCommand( dataMapProvider.initMeta(queryString.orNull) DataMapStatusManager.disableDataMap(dataMapName) case _ => - if (deferredRebuild) { - throw new MalformedDataMapCommandException( - "DEFERRED REBUILD is not supported on this DataMap") - } dataMapProvider.initMeta(queryString.orNull) } val createDataMapPostExecutionEvent: CreateDataMapPostExecutionEvent = @@ -149,7 +159,6 @@ case class CarbonCreateDataMapCommand( systemFolderLocation, tableIdentifier, dmProviderName) OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent, operationContext) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) LOGGER.audit(s"DataMap $dataMapName successfully added") Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala index f3db6ca..0c85fe1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala @@ -21,10 +21,10 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.DataCommand -import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager} +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRebuildRDD} +import org.apache.carbondata.datamap.DataMapManager import org.apache.carbondata.events.{UpdateDataMapPostExecutionEvent, _} /** @@ -36,7 +36,19 @@ case class CarbonDataMapRebuildCommand( tableIdentifier: Option[TableIdentifier]) extends DataCommand { override def processData(sparkSession: SparkSession): Seq[Row] = { - val schema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) + import scala.collection.JavaConverters._ + val schemaOption = CarbonDataMapShowCommand(tableIdentifier).getAllDataMaps(sparkSession) + .asScala + .find(p => p.getDataMapName.equalsIgnoreCase(dataMapName)) + if (schemaOption.isEmpty) { + throw new MalformedDataMapCommandException( + s"Datamap with name $dataMapName does not exist on table ${tableIdentifier.get.table}") + } + val schema = schemaOption.get + if (!schema.isLazy) { + throw new MalformedDataMapCommandException( + s"Non-lazy datamap $dataMapName does not support rebuild") + } val table = tableIdentifier match { case Some(identifier) => @@ -47,6 +59,7 @@ case class CarbonDataMapRebuildCommand( schema.getRelationIdentifier.getTableName )(sparkSession) } + val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession) provider.rebuild() http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala index 3ee8e67..b583a30 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala @@ -47,6 +47,13 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) } override def processData(sparkSession: SparkSession): Seq[Row] = { + convertToRow(getAllDataMaps(sparkSession)) + } + + /** + * get all datamaps for this table, including preagg, index datamaps and mv + */ + def getAllDataMaps(sparkSession: SparkSession): util.List[DataMapSchema] = { val dataMapSchemaList: util.List[DataMapSchema] = new util.ArrayList[DataMapSchema]() tableIdentifier match { case Some(table) => @@ -59,10 +66,10 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) if (!indexSchemas.isEmpty) { dataMapSchemaList.addAll(indexSchemas) } - convertToRow(dataMapSchemaList) case _ => - convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas) + dataMapSchemaList.addAll(DataMapStoreManager.getInstance().getAllDataMapSchemas) } + dataMapSchemaList } private def convertToRow(schemaList: util.List[DataMapSchema]) = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/abcd4f6e/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala index 1d57268..cca1b67 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala @@ -177,7 +177,39 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - test("test create bloom datamap and REBUILD DATAMAP") { + // for CARBONDATA-2820, we will first block deferred rebuild for bloom + test("test block deferred rebuild for bloom") { + sql( + s""" + | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + val deferredRebuildException = intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | WITH DEFERRED REBUILD + | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') + """.stripMargin) + } + assert(deferredRebuildException.getMessage.contains( + s"DEFERRED REBUILD is not supported on this datamap $dataMapName with provider bloomfilter")) + + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') + """.stripMargin) + val exception = intercept[MalformedDataMapCommandException] { + sql(s"REBUILD DATAMAP $dataMapName ON TABLE $bloomDMSampleTable") + } + assert(exception.getMessage.contains(s"Non-lazy datamap $dataMapName does not support rebuild")) + } + + ignore("test create bloom datamap and REBUILD DATAMAP") { sql( s""" | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, @@ -219,7 +251,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - test("test create bloom datamap with DEFERRED REBUILD, query hit datamap") { + ignore("test create bloom datamap with DEFERRED REBUILD, query hit datamap") { sql( s""" | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, @@ -297,7 +329,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - test("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") { + ignore("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") { sql( s""" | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, @@ -466,7 +498,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with sql( s""" | CREATE DATAMAP $dataMapName ON TABLE $normalTable - | USING 'bloomfilter' WITH DEFERRED REBUILD + | USING 'bloomfilter' | DMProperties( 'INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') """.stripMargin) val exception: MalformedCarbonCommandException = intercept[MalformedCarbonCommandException] { @@ -487,7 +519,6 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with s""" | CREATE DATAMAP $dataMapName ON TABLE $normalTable | USING 'bloomfilter' - | WITH DEFERRED REBUILD | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') """.stripMargin) val exception: MalformedCarbonCommandException = intercept[MalformedCarbonCommandException] { @@ -544,7 +575,6 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with s""" | CREATE DATAMAP $dataMapName ON TABLE $normalTable | USING 'bloomfilter' - | WITH DEFERRED REBUILD | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') | """.stripMargin) }