From commits-return-9350-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Tue Feb 27 17:38:55 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id F30BC1807E2 for ; Tue, 27 Feb 2018 17:38:49 +0100 (CET) Received: (qmail 11909 invoked by uid 500); 27 Feb 2018 16:38:49 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 11412 invoked by uid 99); 27 Feb 2018 16:38:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Feb 2018 16:38:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1C8A6F4E35; Tue, 27 Feb 2018 16:38:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Date: Tue, 27 Feb 2018 16:39:17 -0000 Message-Id: <0aec117bc47a4ff7b8306d2e3eab5747@git.apache.org> In-Reply-To: <990165cd2a4944e9b1cc7d62642e27fe@git.apache.org> References: <990165cd2a4944e9b1cc7d62642e27fe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/50] carbondata git commit: [REBASE] resolve conflict after rebasing to master [REBASE] resolve conflict after rebasing to master Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6f41f36b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6f41f36b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6f41f36b Branch: refs/heads/carbonstore-rebase4 Commit: 6f41f36b53ce6f3226b8e3d59d51adf1d427df79 Parents: 25b0940 Author: Jacky Li Authored: Tue Feb 27 08:51:25 2018 +0800 Committer: Jacky Li Committed: Tue Feb 27 16:59:48 2018 +0800 ---------------------------------------------------------------------- .../carbondata/core/datamap/TableDataMap.java | 2 +- .../core/datamap/dev/AbstractDataMapWriter.java | 5 ++-- .../core/datamap/dev/DataMapFactory.java | 2 +- .../core/indexstore/BlockletDetailsFetcher.java | 2 +- .../indexstore/SegmentPropertiesFetcher.java | 3 +- .../blockletindex/BlockletDataMap.java | 2 +- .../blockletindex/BlockletDataMapFactory.java | 21 ++++++------- .../core/metadata/SegmentFileStore.java | 2 +- .../RowLevelRangeGrtThanFiterExecuterImpl.java | 1 + ...elRangeGrtrThanEquaToFilterExecuterImpl.java | 1 + ...velRangeLessThanEqualFilterExecuterImpl.java | 1 + .../RowLevelRangeLessThanFiterExecuterImpl.java | 1 + .../SegmentUpdateStatusManager.java | 26 ++++------------ .../apache/carbondata/core/util/CarbonUtil.java | 16 ++++------ .../testsuite/datamap/CGDataMapTestCase.scala | 26 ++++++++-------- .../testsuite/datamap/DataMapWriterSuite.scala | 19 ++++++------ .../testsuite/datamap/FGDataMapTestCase.scala | 31 +++++++++----------- .../iud/DeleteCarbonTableTestCase.scala | 2 +- .../TestInsertAndOtherCommandConcurrent.scala | 14 +++++---- .../StandardPartitionTableCleanTestCase.scala | 12 ++++---- .../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +- .../carbondata/spark/util/DataLoadingUtil.scala | 4 +-- .../CreatePreAggregateTableCommand.scala | 2 +- .../apache/spark/sql/hive/CarbonRelation.scala | 3 +- .../datamap/DataMapWriterListener.java | 2 +- .../loading/model/CarbonLoadModel.java | 2 +- .../processing/merger/CarbonDataMergerUtil.java | 15 +++------- .../merger/CompactionResultSortProcessor.java | 6 ++-- .../merger/RowResultMergerProcessor.java | 6 ++-- .../partition/spliter/RowResultProcessor.java | 3 +- .../util/CarbonDataProcessorUtil.java | 4 +-- .../processing/util/CarbonLoaderUtil.java | 2 +- 32 files changed, 104 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index eed650e3..2a6ceaa 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -143,7 +143,7 @@ public final class TableDataMap extends OperationEventListener { blocklets.addAll( dataMap.prune( filterExp, - segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId()), + segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()), partitions)); } BlockletSerializer serializer = new BlockletSerializer(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java index bcc9bad..de6dcb1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap.dev; import java.io.IOException; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -35,10 +36,10 @@ public abstract class AbstractDataMapWriter { protected String writeDirectoryPath; - public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId, + public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, Segment segment, String writeDirectoryPath) { this.identifier = identifier; - this.segmentId = segmentId; + this.segmentId = segment.getSegmentNo(); this.writeDirectoryPath = writeDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index df5670d..50ac279 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -39,7 +39,7 @@ public interface DataMapFactory { /** * Return a new write for this datamap */ - AbstractDataMapWriter createWriter(Segment segment); + AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath); /** * Get the datamap for segmentid http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java index 5a5fc1e..dd592c0 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java @@ -53,5 +53,5 @@ public interface BlockletDetailsFetcher { * @param segment * @return */ - List getAllBlocklets(Segment segment, List partitions) throws IOException; + List getAllBlocklets(Segment segment, List partitions) throws IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java index ec2ae93..6f94be5 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.indexstore; import java.io.IOException; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.block.SegmentProperties; /** @@ -32,5 +33,5 @@ public interface SegmentPropertiesFetcher { * @return * @throws IOException */ - SegmentProperties getSegmentProperties(String segmentId) throws IOException; + SegmentProperties getSegmentProperties(Segment segment) throws IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index ef1bd33..ce6193b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -660,7 +660,7 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache } @Override - public List prune(FilterResolverIntf filterExp, List partitions) { + public List prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List partitions) { if (unsafeMemoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 5ca3ac5..ee849bd 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -38,13 +38,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; -import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; @@ -57,8 +55,7 @@ import org.apache.hadoop.fs.RemoteIterator; * Table map for blocklet */ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory - implements BlockletDetailsFetcher, - SegmentPropertiesFetcher { + implements BlockletDetailsFetcher, SegmentPropertiesFetcher { private AbsoluteTableIdentifier identifier; @@ -75,12 +72,12 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory } @Override - public DataMapWriter createWriter(Segment segment) { + public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) { throw new UnsupportedOperationException("not implemented"); } @Override - public List getDataMaps(Segment segment) throws IOException { + public List getDataMaps(Segment segment) throws IOException { List tableBlockIndexUniqueIdentifiers = getTableBlockIndexUniqueIdentifiers(segment); return cache.getAll(tableBlockIndexUniqueIdentifiers); @@ -262,8 +259,8 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory return null; } - @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException { - List dataMaps = getDataMaps(segmentId); + @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException { + List dataMaps = getDataMaps(segment); assert (dataMaps.size() > 0); AbstractCoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0); assert (coarseGrainDataMap instanceof BlockletDataMap); @@ -271,12 +268,12 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory return dataMap.getSegmentProperties(); } - @Override public List getAllBlocklets(String segmentId, List partitions) + @Override public List getAllBlocklets(Segment segment, List partitions) throws IOException { List blocklets = new ArrayList<>(); - List dataMaps = getDataMaps(segmentId); + List dataMaps = getDataMaps(segment); for (AbstractCoarseGrainDataMap dataMap : dataMaps) { - blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), partitions)); + blocklets.addAll(dataMap.prune(null, getSegmentProperties(segment), partitions)); } return blocklets; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index b5f5a25..f48cc6d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -432,7 +432,7 @@ public class SegmentFileStore { boolean forceDelete) throws IOException { LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath()); + SegmentStatusManager.readLoadMetadata(table.getMetadataPath()); // scan through each segment. for (LoadMetadataDetails segment : details) { // if this segment is valid then only we will go for deletion of related http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java index 1f63a81..e35bb8a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.FilterUtil; import org.apache.carbondata.core.scan.filter.intf.RowIntf; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java index 9140a11..d48abf9 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.FilterUtil; import org.apache.carbondata.core.scan.filter.intf.RowIntf; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java index 120671f..d89a488 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.FilterUtil; import org.apache.carbondata.core.scan.filter.intf.RowIntf; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java index 547ecaa..a00c7db 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.FilterUtil; import org.apache.carbondata.core.scan.filter.intf.RowIntf; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 25ce0c8..6ec6fa2 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -233,10 +233,7 @@ public class SegmentUpdateStatusManager { * @throws Exception */ public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception { -// int tableFactPathLength = CarbonTablePath.getFactDir(identifier.getTablePath()).length() + 1; -// String blockId = blockFilePath.substring(tableFactPathLength); - - String blockId = CarbonUtil.getBlockId(absoluteTableIdentifier, blockFilePath, segmentId); + String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId); String tupleId; if (isPartitionTable) { tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId); @@ -249,13 +246,8 @@ public class SegmentUpdateStatusManager { /** * Returns all delta file paths of specified block - * - * @param tupleId - * @param extension - * @return - * @throws Exception */ - public List getDeltaFiles(String tupleId, String extension) throws Exception { + private List getDeltaFiles(String tupleId, String extension) throws Exception { try { String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID); String completeBlockName = CarbonTablePath.addDataPartPrefix( @@ -263,11 +255,11 @@ public class SegmentUpdateStatusManager { + CarbonCommonConstants.FACT_FILE_EXT); String blockPath; if (isPartitionTable) { - blockPath = absoluteTableIdentifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID) .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; } else { - String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment); + String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment); blockPath = carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; } @@ -391,16 +383,10 @@ public class SegmentUpdateStatusManager { * @return */ public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) { - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId.getSegmentNo()); - + String segmentPath = + CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); - for (SegmentUpdateDetails block : updateDetails) { if ((block.getBlockName().equalsIgnoreCase(blockName)) && (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo())) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 5ec0158..c9b4337 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2361,21 +2361,19 @@ public final class CarbonUtil { } // Get the total size of carbon data and the total size of carbon index - public static HashMap getDataSizeAndIndexSize(CarbonTablePath carbonTablePath, + public static HashMap getDataSizeAndIndexSize(String tablePath, Segment segment) throws IOException { if (segment.getSegmentFileName() != null) { - SegmentFileStore fileStore = - new SegmentFileStore(carbonTablePath.getPath(), segment.getSegmentFileName()); + SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); return getDataSizeAndIndexSize(fileStore); } else { - return getDataSizeAndIndexSize(carbonTablePath, segment.getSegmentNo()); + return getDataSizeAndIndexSize(tablePath, segment.getSegmentNo()); } } // Get the total size of segment. - public static long getSizeOfSegment(CarbonTablePath carbonTablePath, - Segment segment) throws IOException { - HashMap dataSizeAndIndexSize = getDataSizeAndIndexSize(carbonTablePath, segment); + public static long getSizeOfSegment(String tablePath, Segment segment) throws IOException { + HashMap dataSizeAndIndexSize = getDataSizeAndIndexSize(tablePath, segment); long size = 0; for (Long eachSize: dataSizeAndIndexSize.values()) { size += eachSize; @@ -2585,9 +2583,7 @@ public final class CarbonUtil { String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length()); String tablePath = identifier.getTablePath(); if (filePath.startsWith(tablePath)) { - String factDir = - CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier()) - .getFactDir(); + String factDir = CarbonTablePath.getFactDir(tablePath); if (filePath.startsWith(factDir)) { blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId + CarbonCommonConstants.FILE_SEPARATOR + blockName; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index 4b6f231..1cbbcb4 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} import org.apache.carbondata.core.datastore.FileReader import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.compression.SnappyCompressor import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.Blocklet +import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec} import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.scan.expression.Expression @@ -62,16 +62,16 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { /** * Return a new write for this datamap */ - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName) + override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = { + new CGDataMapWriter(identifier, segment, dataWritePath, dataMapName) } /** * Get the datamap for segmentid */ - override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = { + override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = { val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -108,9 +108,9 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { * * @return */ - override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { + override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = { val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -125,7 +125,7 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory { /** * Clears datamap of the segment */ - override def clear(segmentId: String): Unit = { + override def clear(segment: Segment): Unit = { } @@ -175,7 +175,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap { override def prune( filterExp: FilterResolverIntf, segmentProperties: SegmentProperties, - partitions: java.util.List[String]): java.util.List[Blocklet] = { + partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = { val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() val expression = filterExp.getFilterExpression getEqualToExpression(expression, buffer) @@ -184,7 +184,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap { } val meta = findMeta(value(0).getBytes) meta.map { f=> - new Blocklet(f._1, f._2+"") + new Blocklet(f._1, f._2 + "") }.asJava } @@ -219,10 +219,10 @@ class CGDataMap extends AbstractCoarseGrainDataMap { } class CGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, + segment: Segment, dataWritePath: String, dataMapName: String) - extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) { + extends AbstractDataMapWriter(identifier, segment, dataWritePath) { var currentBlockId: String = null val cgwritepath = dataWritePath + "/" + http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index 2f8a1d1..7e93959 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -20,21 +20,19 @@ package org.apache.carbondata.spark.testsuite.datamap import java.util import scala.collection.JavaConverters._ + import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{DataFrame, SaveMode} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} -import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter +import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap} import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -49,15 +47,16 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory { override def fireEvent(event: Event): Unit = ??? - override def clear(segmentId: Segment): Unit = {} + override def clear(segment: Segment): Unit = {} override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ??? - override def getDataMaps(segmentId: Segment): util.List[DataMap] = ??? + override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ??? - override def createWriter(segmentId: Segment): AbstractDataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock + override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = + DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath) override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava) @@ -175,9 +174,9 @@ object DataMapWriterSuite { var callbackSeq: Seq[String] = Seq[String]() - def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String, + def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment, dataWritePath: String) = - new AbstractDataMapWriter(identifier, segmentId, dataWritePath) { + new AbstractDataMapWriter(identifier, segment, dataWritePath) { override def onPageAdded( blockletId: Int, http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index d1bb65f..9c8cc15 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory} import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} import org.apache.carbondata.core.datastore.FileReader import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.compression.SnappyCompressor import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.FineGrainBlocklet +import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec} import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.scan.expression.Expression @@ -62,16 +62,16 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { /** * Return a new write for this datamap */ - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName) + override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = { + new FGDataMapWriter(identifier, segment, dataWritePath, dataMapName) } /** * Get the datamap for segmentid */ - override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = { + override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainDataMap] = { val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -99,9 +99,9 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { * * @return */ - override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { - val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = { + val file = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) val files = file.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") @@ -112,7 +112,6 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { }.toList.asJava } - /** * * @param event @@ -124,7 +123,7 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory { /** * Clears datamap of the segment */ - override def clear(segmentId: String): Unit = { + override def clear(segment: Segment): Unit = { } /** @@ -173,7 +172,7 @@ class FGDataMap extends AbstractFineGrainDataMap { override def prune( filterExp: FilterResolverIntf, segmentProperties: SegmentProperties, - partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = { + partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = { val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() val expression = filterExp.getFilterExpression getEqualToExpression(expression, buffer) @@ -187,7 +186,7 @@ class FGDataMap extends AbstractFineGrainDataMap { } private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int), - value: Array[Byte]): Option[FineGrainBlocklet] = { + value: Array[Byte]): Option[Blocklet] = { val bytes = FileReader.readByteArray(filePath, meta._4, meta._5) val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes)) val obj = new ObjectInputStream(outputStream) @@ -211,12 +210,10 @@ class FGDataMap extends AbstractFineGrainDataMap { pg.setRowId(f._2(p._2).toArray) pg } - pages Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava)) } else { None } - } private def findMeta(value: Array[Byte]) = { @@ -249,8 +246,8 @@ class FGDataMap extends AbstractFineGrainDataMap { } class FGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, dataWriterPath: String, dataMapName: String) - extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) { + segment: Segment, dataWriterPath: String, dataMapName: String) + extends AbstractDataMapWriter(identifier, segment, dataWriterPath) { var currentBlockId: String = null val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap" http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala index 22aa385..f92649a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala @@ -194,7 +194,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("delete from update_status_files where age=5").show() val carbonTable = CarbonEnv .getCarbonTable(Some("iud_db"), "update_status_files")(sqlContext.sparkSession) - val metaPath = carbonTable.getMetaDataFilepath + val metaPath = carbonTable.getMetadataPath val files = FileFactory.getCarbonFile(metaPath) assert(files.listFiles().length == 2) sql("drop table update_status_files") http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 5550358..b39c44c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -269,7 +269,11 @@ object Global { class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory { - override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { } + private var identifier: AbsoluteTableIdentifier = _ + + override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { + this.identifier = identifier + } override def fireEvent(event: Event): Unit = ??? @@ -277,12 +281,12 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory { override def clear(): Unit = {} - override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ??? + override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ??? - override def getDataMaps(segmentId: Segment): util.List[DataMap] = ??? + override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ??? - override def createWriter(segmentId: Segment): AbstractDataMapWriter = { - new AbstractDataMapWriter { + override def createWriter(segment: Segment, writeDirectoryPath: String): AbstractDataMapWriter = { + new AbstractDataMapWriter(identifier, segment, writeDirectoryPath) { override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { } override def onBlockletEnd(blockletId: Int): Unit = { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala index f238d2b..cfc6983 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala @@ -52,14 +52,12 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int, indexes: Int): Unit = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) - val partitions = CarbonFilters - .getPartitions(Seq.empty, - sqlContext.sparkSession, - TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))) + val partitions = CarbonFilters.getPartitions( + Seq.empty, + sqlContext.sparkSession, + TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))) assert(partitions.get.length == partition) - val details = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath) + val details = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath)) val segLoad = details.find(_.getLoadName.equals(segmentId)).get val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile) assert(seg.getIndexFiles.size == indexes) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 8ba2767..97e3061 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -291,7 +291,7 @@ class NewCarbonDataLoadRDD[K, V]( val fileList: java.util.List[String] = new java.util.ArrayList[String]( CarbonCommonConstants.CONSTANT_SIZE_TEN) CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",") - model = carbonLoadModel.getCopyWithPartition( + model = carbonLoadModel.getCopyWithPartition("0", carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) StandardLogService.setThreadName(StandardLogService .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index cf35c12..49e4420 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -441,8 +441,8 @@ object DataLoadingUtil { private def isUpdationRequired(isForceDeletion: Boolean, carbonTable: CarbonTable, - absoluteTableIdentifier: AbsoluteTableIdentifier) = { - val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath) + absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = { + val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) // Delete marked loads val isUpdationRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem( http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 59c43aa..4d0a4c5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -184,7 +184,7 @@ case class CreatePreAggregateTableCommand( CarbonFilters.getCurrentPartitions(sparkSession, TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName))).map(_.asJava).orNull) - val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath) + val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { throw new UnsupportedOperationException( http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index c9833d0..5771503 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -222,9 +222,8 @@ case class CarbonRelation( // for each segment calculate the size segments.foreach {validSeg => if (validSeg.getSegmentFileName != null) { - val fileStore = new SegmentFileStore(tablePath, validSeg.getSegmentFileName) size = size + CarbonUtil.getSizeOfSegment( - carbonTablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName)) + tablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName)) } else { size = size + FileFactory.getDirectorySize( CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 5083ab5..1104229 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -74,7 +74,7 @@ public class DataMapWriterListener { } List columns = factory.getMeta().getIndexedColumns(); List writers = registry.get(columns); - AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null)); + AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath); if (writers != null) { writers.add(writer); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index a17178a..638ad39 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -478,7 +478,7 @@ public class CarbonLoadModel implements Serializable { * @param delimiter * @return */ - public CarbonLoadModel getCopyWithPartition(String uniqueId, List filesForPartition, + public CarbonLoadModel getCopyWithPartition(String uniqueId, String header, String delimiter) { CarbonLoadModel copyObj = new CarbonLoadModel(); copyObj.tableName = tableName; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 89326a3..d2faef5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -44,7 +44,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -612,10 +611,10 @@ public final class CarbonDataMergerUtil { // variable to store one segment size across partition. long sizeOfOneSegmentAcrossPartition; if (segment.getSegmentFile() != null) { - sizeOfOneSegmentAcrossPartition = CarbonUtil - .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile())); + sizeOfOneSegmentAcrossPartition = CarbonUtil.getSizeOfSegment( + tablePath, new Segment(segId, segment.getSegmentFile())); } else { - sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, tableIdentifier, segId); + sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, segId); } // if size of a segment is greater than the Major compaction size. then ignore it. @@ -1006,14 +1005,8 @@ public final class CarbonDataMergerUtil { /** * This method traverses Update Delta Files inside the seg and return true * if UpdateDelta Files are more than IUD Compaction threshold. - * - * @param seg - * @param identifier - * @param segmentUpdateStatusManager - * @param numberDeltaFilesThreshold - * @return */ - public static Boolean checkUpdateDeltaFilesInSeg(Segment seg, + private static Boolean checkUpdateDeltaFilesInSeg(Segment seg, AbsoluteTableIdentifier absoluteTableIdentifier, SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index b71612a..ea11e22 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -406,14 +406,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { + carbonLoadModel.getFactTimeStamp() + ".tmp"; } else { carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(), - tableName, carbonLoadModel.getPartitionId(), carbonLoadModel.getSegmentId()); + .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, tempStoreLocation, carbonStoreLocation); - setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable, - carbonFactDataHandlerModel); + setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel); dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index b41829f..278d5bb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -77,14 +77,12 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { .getFactTimeStamp() + ".tmp"; } else { carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), - tableName, loadModel.getPartitionId(), loadModel.getSegmentId()); + .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation, carbonStoreLocation); - setDataFileAttributesInModel(loadModel, compactionType, carbonTable, - carbonFactDataHandlerModel); + setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel); carbonFactDataHandlerModel.setCompactionFlow(true); dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index ff6ca93..df2e2a2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -48,8 +48,7 @@ public class RowResultProcessor { this.segmentProperties = segProp; String tableName = carbonTable.getTableName(); String carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), - tableName, loadModel.getPartitionId(), loadModel.getSegmentId()); + .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation, carbonStoreLocation); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 2c08c18..dc8ffd7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -371,8 +370,7 @@ public final class CarbonDataProcessorUtil { * * @return data directory path */ - public static String createCarbonStoreLocation(String factStoreLocation, - String databaseName, String tableName, String segmentId) { + public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f41f36b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 1eea61d..a3e889a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -1066,7 +1066,7 @@ public final class CarbonLoaderUtil { */ public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, String segmentId, CarbonTable carbonTable) throws IOException { - Map dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable.getAbsoluteTableIdentifier(), + Map dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable.getTablePath(), new Segment(segmentId, loadMetadataDetails.getSegmentFile())); Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE); loadMetadataDetails.setDataSize(String.valueOf(dataSize));