From commits-return-14069-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Thu Oct 25 08:20:16 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 155CE180670 for ; Thu, 25 Oct 2018 08:20:15 +0200 (CEST) Received: (qmail 45749 invoked by uid 500); 25 Oct 2018 06:20:15 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 45725 invoked by uid 99); 25 Oct 2018 06:20:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Oct 2018 06:20:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F1656DFB32; Thu, 25 Oct 2018 06:20:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xuchuanyin@apache.org To: commits@carbondata.apache.org Message-Id: <1fb43f36284b42c3a50fb87e7e0a7bf9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: carbondata git commit: [CARBONDATA-3040][BloomDataMap] Fix bug for merging bloom index Date: Thu, 25 Oct 2018 06:20:14 +0000 (UTC) Repository: carbondata Updated Branches: refs/heads/master e4806b9a0 -> 33a6dc2ac [CARBONDATA-3040][BloomDataMap] Fix bug for merging bloom index Problem There is a bug which causes query failure when we create two bloom datamaps on same table with data. Analyze Since we already have data, each create datamap will trigger rebuild datamap task and then trigger bloom index file merging. By debuging, we found the first datamap's bloom index files would be merged two times and the second time made bloom index file empty. The procedure goes as below: 1. create table 2. load data 3. create bloom datamap1: rebuild datamap1 for existing data, event listener is trigger to merge index files for all bloom datamaps( currently only datamap1 ) 4. create bloom datamap2: rebuild datamap2 for existing data, event listener is trigger to merge index files for all bloom datamaps (currently datamap1 and datamap2) Because the event does not has information which datamap it rebuilt, it always rebuilds all bloom datamap. So datamap1's bloom index files would be merged 2 times, but only remains a mergeShard folder when it ran the second merged such that no file input for merging and the final merge bloom index files are empty. Solution Send the datamap name in rebuild event for filter and only merge bloom index files for the specific datamap. Also add file check whether mergeShard already exists before merging. This closes #2851 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/33a6dc2a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/33a6dc2a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/33a6dc2a Branch: refs/heads/master Commit: 33a6dc2ac996cbb0bfb4f354d7fc80b297d652bb Parents: e4806b9 Author: Manhua Authored: Wed Oct 24 16:20:13 2018 +0800 Committer: xuchuanyin Committed: Thu Oct 25 14:17:29 2018 +0800 ---------------------------------------------------------------------- .../datamap/bloom/BloomIndexFileStore.java | 20 +++++++++++++++----- .../carbondata/events/DataMapEvents.scala | 10 +++++++++- .../datamap/IndexDataMapRebuildRDD.scala | 2 +- .../spark/rdd/CarbonTableCompactor.scala | 2 +- .../events/MergeBloomIndexEventListener.scala | 10 ++++++++-- .../management/CarbonLoadDataCommand.scala | 2 +- 6 files changed, 35 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java index 17813ba..3d6ad9b 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java @@ -60,6 +60,9 @@ public class BloomIndexFileStore { public static void mergeBloomIndexFile(String dmSegmentPathString, List indexCols) { + + // Step 1. check current folders + // get all shard paths of old store CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString, FileFactory.getFileType(dmSegmentPathString)); @@ -72,6 +75,9 @@ public class BloomIndexFileStore { String mergeShardPath = dmSegmentPathString + File.separator + MERGE_BLOOM_INDEX_SHARD_NAME; String mergeInprogressFile = dmSegmentPathString + File.separator + MERGE_INPROGRESS_FILE; + + // Step 2. prepare for fail-safe merging + try { // delete mergeShard folder if exists if (FileFactory.isFileExist(mergeShardPath)) { @@ -87,10 +93,12 @@ public class BloomIndexFileStore { throw new RuntimeException("Failed to create directory " + mergeShardPath); } } catch (IOException e) { - LOGGER.error("Error occurs while create directory " + mergeShardPath, e); - throw new RuntimeException("Error occurs while create directory " + mergeShardPath); + throw new RuntimeException(e); } + // Step 3. merge index files + // Query won't use mergeShard until MERGE_INPROGRESS_FILE is deleted + // for each index column, merge the bloomindex files from all shards into one for (String indexCol: indexCols) { String mergeIndexFile = getMergeBloomIndexFile(mergeShardPath, indexCol); @@ -115,15 +123,17 @@ public class BloomIndexFileStore { } } catch (IOException e) { LOGGER.error("Error occurs while merge bloom index file of column: " + indexCol, e); - // delete merge shard of bloom index for this segment when failed + // if any column failed, delete merge shard for this segment and exit FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeShardPath)); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeInprogressFile)); throw new RuntimeException( - "Error occurs while merge bloom index file of column: " + indexCol); + "Error occurs while merge bloom index file of column: " + indexCol, e); } finally { CarbonUtil.closeStreams(dataInputStream, dataOutputStream); } } - // delete flag file and mergeShard can be used + + // Step 4. delete flag file and mergeShard can be used try { FileFactory.deleteFile(mergeInprogressFile, FileFactory.getFileType(mergeInprogressFile)); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala index 503729a..06da3d3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala @@ -58,9 +58,17 @@ case class BuildDataMapPreExecutionEvent(sparkSession: SparkSession, /** * For handling operation's after finish of index build over table with index datamap * example: bloom datamap, Lucene datamap + * + * @param sparkSession + * @param identifier + * @param dmName set to specify datamap name in rebuild process; + * set to Null in loading and compaction and it will deal all datamaps + * @param segmentIdList + * @param isFromRebuild set to false in loading process for skipping lazy datamap */ case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession, - identifier: AbsoluteTableIdentifier, segmentIdList: Seq[String], isFromRebuild: Boolean) + identifier: AbsoluteTableIdentifier, dmName: String, + segmentIdList: Seq[String], isFromRebuild: Boolean) extends Event with TableEventInfo /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index 3f486d0..a35de58 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -133,7 +133,7 @@ object IndexDataMapRebuildRDD { } val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession, - tableIdentifier, validSegments.asScala.map(_.getSegmentNo), true) + tableIdentifier, schema.getDataMapName, validSegments.asScala.map(_.getSegmentNo), true) OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 756d30c..ac83212 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -289,7 +289,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, if (null != tableDataMaps) { val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent( sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier, - Seq(carbonLoadModel.getSegmentId), true) + null, Seq(mergedLoadNumber), true) OperationListenerBus.getInstance() .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala index 2d4fe84..ea0f9e7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala @@ -48,8 +48,14 @@ class MergeBloomIndexEventListener extends OperationEventListener with Logging { _.getDataMapSchema.getProviderName.equalsIgnoreCase( DataMapClassProvider.BLOOMFILTER.getShortName)) - // for load process, filter lazy datamap - if (!datamapPostEvent.isFromRebuild) { + if (datamapPostEvent.isFromRebuild) { + if (null != datamapPostEvent.dmName) { + // for rebuild process + bloomDatamaps = bloomDatamaps.filter( + _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) + } + } else { + // for load process, skip lazy datamap bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/33a6dc2a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index f8077ae..29cc6a9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -332,7 +332,7 @@ case class CarbonLoadDataCommand( OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext) if (tableDataMaps.size() > 0) { val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession, - table.getAbsoluteTableIdentifier, Seq(carbonLoadModel.getSegmentId), false) + table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false) OperationListenerBus.getInstance() .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) }