From commits-return-10405-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Tue Apr 10 12:59:13 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 034B218067B for ; Tue, 10 Apr 2018 12:59:11 +0200 (CEST) Received: (qmail 17556 invoked by uid 500); 10 Apr 2018 10:59:11 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 17540 invoked by uid 99); 10 Apr 2018 10:59:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Apr 2018 10:59:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E7C07F4E5A; Tue, 10 Apr 2018 10:59:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: manishgupta88@apache.org To: commits@carbondata.apache.org Date: Tue, 10 Apr 2018 10:59:11 -0000 Message-Id: In-Reply-To: <76810906d4fc456ab6c4898ba35d3198@git.apache.org> References: <76810906d4fc456ab6c4898ba35d3198@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] carbondata git commit: [CARBONDATA-2310] Refactored code to improve Distributable interface [CARBONDATA-2310] Refactored code to improve Distributable interface Refactored code to improve Distributable interface This closes #2134 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c48df39 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c48df39 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c48df39 Branch: refs/heads/branch-1.3 Commit: 3c48df396f2bafc9efc8091fc7abefca089922d7 Parents: 31c7b50 Author: dhatchayani Authored: Tue Apr 3 11:19:43 2018 +0530 Committer: manishgupta88 Committed: Tue Apr 10 16:16:27 2018 +0530 ---------------------------------------------------------------------- .../org/apache/carbondata/core/cache/Cache.java | 10 ++ .../dictionary/AbstractDictionaryCache.java | 4 + .../core/constants/CarbonCommonConstants.java | 3 + .../core/datamap/dev/CacheableDataMap.java | 47 ++++++ .../carbondata/core/datamap/dev/DataMap.java | 3 +- .../core/datastore/BlockIndexStore.java | 4 + .../core/datastore/SegmentTaskIndexStore.java | 4 + .../core/indexstore/AbstractMemoryDMStore.java | 63 +++++++ .../indexstore/BlockletDataMapIndexStore.java | 92 ++++------ .../core/indexstore/SafeMemoryDMStore.java | 94 +++++++++++ .../TableBlockIndexUniqueIdentifier.java | 3 +- .../core/indexstore/UnsafeMemoryDMStore.java | 23 +-- .../blockletindex/BlockletDataMap.java | 169 +++++++++++++------ .../BlockletDataMapDistributable.java | 18 +- .../blockletindex/BlockletDataMapFactory.java | 95 +++++++---- .../blockletindex/BlockletDataMapModel.java | 13 ++ .../core/indexstore/row/DataMapRow.java | 13 +- .../core/indexstore/row/UnsafeDataMapRow.java | 7 +- .../core/indexstore/schema/CarbonRowSchema.java | 4 +- .../core/util/BlockletDataMapUtil.java | 140 +++++++++++++++ .../carbondata/core/util/SessionParams.java | 5 + .../TestBlockletDataMapFactory.java | 108 ++++++++++++ .../apache/carbondata/hadoop/CacheClient.java | 43 +++++ .../hadoop/api/AbstractDataMapJob.java | 43 +++++ .../hadoop/api/CarbonTableInputFormat.java | 61 ++++++- .../carbondata/hadoop/api/DataMapJob.java | 6 + .../hadoop/util/CarbonInputFormatUtil.java | 44 +++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 9 +- .../carbondata/spark/rdd/SparkDataMapJob.scala | 4 +- .../org/apache/spark/sql/CarbonCountStar.scala | 13 ++ .../execution/command/CarbonHiveCommands.scala | 9 + 31 files changed, 973 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/cache/Cache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java index 04fa18a..6df36fc 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java @@ -20,6 +20,8 @@ package org.apache.carbondata.core.cache; import java.io.IOException; import java.util.List; +import org.apache.carbondata.core.memory.MemoryException; + /** * A semi-persistent mapping from keys to values. Cache entries are manually added using * #get(Key), #getAll(List) , and are stored in the cache until @@ -69,6 +71,14 @@ public interface Cache { void invalidate(K key); /** + * This method will add the value to the cache for the given key + * + * @param key + * @param value + */ + void put(K key, V value) throws IOException, MemoryException; + + /** * Access count of Cacheable entry will be decremented * * @param keys http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java index 598d00e..9ed9007 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java @@ -59,6 +59,10 @@ public abstract class AbstractDictionaryCache getAllUncachedDistributables(List distributables) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index f3642d6..f096dd7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datamap.dev; import java.io.IOException; +import java.io.Serializable; import java.util.List; import org.apache.carbondata.core.indexstore.Blocklet; @@ -27,7 +28,7 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; /** * Datamap is an entity which can store and retrieve index data. */ -public interface DataMap { +public interface DataMap extends Serializable { /** * It is called to load the data map to memory or to initialize it. http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java index f2c38fa..609bf1c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java @@ -229,6 +229,10 @@ public class BlockIndexStore extends AbstractBlockIndexStoreCache { .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); } + @Override public void put(TableBlockUniqueIdentifier key, AbstractIndex value) { + throw new UnsupportedOperationException("Operation not supported"); + } + @Override public void clearAccessCount(List keys) { for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : keys) { SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java index 8ed5c18..744cc93 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java @@ -140,6 +140,10 @@ public class SegmentTaskIndexStore lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } + @Override public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value) { + throw new UnsupportedOperationException("Operation not supported"); + } + /** * returns block timestamp value from the given task * @param taskKey http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java new file mode 100644 index 0000000..41b929a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.io.Serializable; + +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; + +/** + * Store the data map row @{@link DataMapRow} + */ +public abstract class AbstractMemoryDMStore implements Serializable { + + protected boolean isMemoryFreed; + + protected CarbonRowSchema[] schema; + + protected final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); + + public AbstractMemoryDMStore(CarbonRowSchema[] schema) { + this.schema = schema; + } + + public abstract void addIndexRow(DataMapRow indexRow) throws MemoryException; + + public abstract DataMapRow getDataMapRow(int index); + + public abstract void freeMemory(); + + public abstract int getMemoryUsed(); + + public CarbonRowSchema[] getSchema() { + return schema; + } + + public abstract int getRowCount(); + + public void finishWriting() throws MemoryException { + // do nothing in default implementation + } + + public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException { + throw new UnsupportedOperationException("Operation not allowed"); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index ac14105..ce0fe8b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -18,7 +18,6 @@ package org.apache.carbondata.core.indexstore; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -31,19 +30,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CarbonLRUCache; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.util.DataFileFooterConverter; - -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; +import org.apache.carbondata.core.util.BlockletDataMapUtil; /** * Class to handle loading, unloading,clearing,storing of the table @@ -86,7 +77,7 @@ public class BlockletDataMapIndexStore SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); Set filesRead = new HashSet<>(); Map blockMetaInfoMap = - getBlockMetaInfoMap(identifier, indexFileStore, filesRead); + BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead); dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap); } catch (MemoryException e) { LOGGER.error("memory exception when loading datamap: " + e.getMessage()); @@ -96,54 +87,6 @@ public class BlockletDataMapIndexStore return dataMap; } - private Map getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier, - SegmentIndexFileStore indexFileStore, Set filesRead) throws IOException { - if (identifier.getMergeIndexFileName() != null) { - CarbonFile indexMergeFile = FileFactory.getCarbonFile( - identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getMergeIndexFileName()); - if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) { - indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile }); - filesRead.add(indexMergeFile.getPath()); - } - } - if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) { - indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile( - identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getIndexFileName()) }); - } - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); - Map blockMetaInfoMap = new HashMap<>(); - List indexInfo = fileFooterConverter.getIndexInfo( - identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName())); - for (DataFileFooter footer : indexInfo) { - String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath(); - if (FileFactory.isFileExist(blockPath)) { - blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath)); - } else { - LOGGER.warn("Skipping invalid block " + footer.getBlockInfo().getBlockUniqueName() - + " The block does not exist. The block might be got deleted due to clean up post" - + " update/delete operation over table."); - } - } - return blockMetaInfoMap; - } - - private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException { - CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile); - if (carbonFile instanceof AbstractDFSCarbonFile) { - RemoteIterator iter = - ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile)); - LocatedFileStatus fileStatus = iter.next(); - String[] location = fileStatus.getBlockLocations()[0].getHosts(); - long len = fileStatus.getLen(); - return new BlockMetaInfo(location, len); - } else { - return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize()); - } - } - @Override public List getAll( List tableSegmentUniqueIdentifiers) throws IOException { @@ -165,7 +108,7 @@ public class BlockletDataMapIndexStore Set filesRead = new HashSet<>(); for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) { Map blockMetaInfoMap = - getBlockMetaInfoMap(identifier, indexFileStore, filesRead); + BlockletDataMapUtil.getBlockMetaInfoMap(identifier, indexFileStore, filesRead); blockletDataMaps.add( loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap)); } @@ -206,6 +149,35 @@ public class BlockletDataMapIndexStore lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } + @Override public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, + BlockletDataMap blockletDataMap) throws IOException, MemoryException { + String uniqueTableSegmentIdentifier = + tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(); + Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier); + if (lock == null) { + lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier); + } + // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry + // as in that case clearing unsafe memory need to be taken card. If at all datamap entry + // in the cache need to be overwritten then use the invalidate interface + // and then use the put interface + if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) { + synchronized (lock) { + if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) { + try { + blockletDataMap.convertToUnsafeDMStore(); + lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), + blockletDataMap, blockletDataMap.getMemorySize()); + } catch (Throwable e) { + // clear all the memory acquired by data map in case of any failure + blockletDataMap.clear(); + throw new IOException("Problem in adding datamap to cache.", e); + } + } + } + } + } + /** * Below method will be used to load the segment of segments * One segment may have multiple task , so table segment will be loaded http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java new file mode 100644 index 0000000..d51f28d --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; +import org.apache.carbondata.core.memory.MemoryException; + +/** + * Store the data map row @{@link DataMapRow} data to memory. + */ +public class SafeMemoryDMStore extends AbstractMemoryDMStore { + + /** + * holds all blocklets metadata in memory + */ + private List dataMapRows = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + private int runningLength; + + public SafeMemoryDMStore(CarbonRowSchema[] schema) { + super(schema); + } + + /** + * Add the index row to dataMapRows, basically to in memory. + * + * @param indexRow + * @return + */ + @Override + public void addIndexRow(DataMapRow indexRow) throws MemoryException { + dataMapRows.add(indexRow); + runningLength += indexRow.getTotalSizeInBytes(); + } + + @Override + public DataMapRow getDataMapRow(int index) { + assert (index < dataMapRows.size()); + return dataMapRows.get(index); + } + + @Override + public void freeMemory() { + if (!isMemoryFreed) { + if (null != dataMapRows) { + dataMapRows.clear(); + dataMapRows = null; + } + isMemoryFreed = true; + } + } + + @Override + public int getMemoryUsed() { + return runningLength; + } + + @Override + public int getRowCount() { + return dataMapRows.size(); + } + + @Override + public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException { + UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema); + for (DataMapRow dataMapRow : dataMapRows) { + unsafeMemoryDMStore.addIndexRow(dataMapRow); + } + unsafeMemoryDMStore.finishWriting(); + return unsafeMemoryDMStore; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java index c907fa8..8118fe4 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.indexstore; +import java.io.Serializable; import java.util.Objects; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -24,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; /** * Class holds the indexFile information to uniquely identitify the carbon index */ -public class TableBlockIndexUniqueIdentifier { +public class TableBlockIndexUniqueIdentifier implements Serializable { private String indexFilePath; http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 31ecac2..6fe7fd2 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET; import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe; @@ -32,9 +31,11 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe; /** * Store the data map row @{@link DataMapRow} data to unsafe. */ -public class UnsafeMemoryDMStore { +public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { - private MemoryBlock memoryBlock; + private static final long serialVersionUID = -5344592407101055335L; + + private transient MemoryBlock memoryBlock; private static int capacity = 8 * 1024; @@ -42,18 +43,12 @@ public class UnsafeMemoryDMStore { private int runningLength; - private boolean isMemoryFreed; - - private CarbonRowSchema[] schema; - private int[] pointers; private int rowCount; - private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); - public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException { - this.schema = schema; + super(schema); this.allocatedSize = capacity; this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize); this.pointers = new int[1000]; @@ -92,7 +87,7 @@ public class UnsafeMemoryDMStore { * @param indexRow * @return */ - public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException { + public void addIndexRow(DataMapRow indexRow) throws MemoryException { // First calculate the required memory to keep the row in unsafe int rowSize = indexRow.getTotalSizeInBytes(); // Check whether allocated memory is sufficient or not. @@ -172,7 +167,7 @@ public class UnsafeMemoryDMStore { } } - public UnsafeDataMapRow getUnsafeRow(int index) { + public DataMapRow getDataMapRow(int index) { assert (index < rowCount); return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]); } @@ -205,10 +200,6 @@ public class UnsafeMemoryDMStore { return runningLength; } - public CarbonRowSchema[] getSchema() { - return schema; - } - public int getRowCount() { return rowCount; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 9ec7a46..66fa0aa 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -39,11 +39,14 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.datastore.IndexKey; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore; import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.SafeMemoryDMStore; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; @@ -82,6 +85,8 @@ public class BlockletDataMap implements DataMap, Cacheable { public static final String NAME = "clustered.btree.blocklet"; + private static final long serialVersionUID = 4121938766748899140L; + private static int KEY_INDEX = 0; private static int MIN_VALUES_INDEX = 1; @@ -120,14 +125,17 @@ public class BlockletDataMap implements DataMap, Cacheable { private static int SEGMENTID = 5; - private UnsafeMemoryDMStore unsafeMemoryDMStore; + private AbstractMemoryDMStore memoryDMStore; - private UnsafeMemoryDMStore unsafeMemorySummaryDMStore; + private AbstractMemoryDMStore summaryDMStore; - private SegmentProperties segmentProperties; + // As it is a heavy object it is not recommended to serialize this object + private transient SegmentProperties segmentProperties; private int[] columnCardinality; + private TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier; + @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException { long startTime = System.currentTimeMillis(); @@ -153,9 +161,10 @@ public class BlockletDataMap implements DataMap, Cacheable { schemaBinary = convertSchemaToBinary(columnInTable); columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); segmentProperties = new SegmentProperties(columnInTable, columnCardinality); - createSchema(segmentProperties); + createSchema(segmentProperties, + ((BlockletDataMapModel) dataMapModel).isAddToUnsafe()); createSummarySchema(segmentProperties, schemaBinary, filePath, fileName, - segmentId); + segmentId, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe()); } TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); BlockMetaInfo blockMetaInfo = @@ -186,20 +195,20 @@ public class BlockletDataMap implements DataMap, Cacheable { } } } - if (unsafeMemoryDMStore != null) { - unsafeMemoryDMStore.finishWriting(); + if (memoryDMStore != null) { + memoryDMStore.finishWriting(); } - if (null != unsafeMemorySummaryDMStore) { + if (null != summaryDMStore) { addTaskSummaryRowToUnsafeMemoryStore( summaryRow, schemaBinary, filePath, fileName, segmentId); - unsafeMemorySummaryDMStore.finishWriting(); + summaryDMStore.finishWriting(); } LOGGER.info( - "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + ( + "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is " + ( System.currentTimeMillis() - startTime)); } @@ -208,10 +217,10 @@ public class BlockletDataMap implements DataMap, Cacheable { BlockMetaInfo blockMetaInfo, int relativeBlockletId) { int[] minMaxLen = segmentProperties.getColumnsValueSize(); List blockletList = fileFooter.getBlockletList(); - CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); + CarbonRowSchema[] schema = memoryDMStore.getSchema(); // Add one row to maintain task level min max for segment pruning if (!blockletList.isEmpty() && summaryRow == null) { - summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema()); + summaryRow = new DataMapRowImpl(summaryDMStore.getSchema()); } for (int index = 0; index < blockletList.size(); index++) { DataMapRow row = new DataMapRowImpl(schema); @@ -227,7 +236,7 @@ public class BlockletDataMap implements DataMap, Cacheable { row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); // compute and set task level min values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, + summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, TASK_MIN_VALUES_INDEX, true); ordinal++; taskMinMaxOrdinal++; @@ -235,7 +244,7 @@ public class BlockletDataMap implements DataMap, Cacheable { row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); // compute and set task level max values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, + summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, TASK_MAX_VALUES_INDEX, false); ordinal++; @@ -270,7 +279,7 @@ public class BlockletDataMap implements DataMap, Cacheable { row.setShort((short) relativeBlockletId++, ordinal++); // Store block size row.setLong(blockMetaInfo.getSize(), ordinal); - unsafeMemoryDMStore.addIndexRowToUnsafe(row); + memoryDMStore.addIndexRow(row); } catch (Exception e) { throw new RuntimeException(e); } @@ -296,10 +305,10 @@ public class BlockletDataMap implements DataMap, Cacheable { BlockMetaInfo blockMetaInfo) { int[] minMaxLen = segmentProperties.getColumnsValueSize(); BlockletIndex blockletIndex = fileFooter.getBlockletIndex(); - CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); + CarbonRowSchema[] schema = memoryDMStore.getSchema(); // Add one row to maintain task level min max for segment pruning if (summaryRow == null) { - summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema()); + summaryRow = new DataMapRowImpl(summaryDMStore.getSchema()); } DataMapRow row = new DataMapRowImpl(schema); int ordinal = 0; @@ -318,14 +327,14 @@ public class BlockletDataMap implements DataMap, Cacheable { row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal); // compute and set task level min values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues, + summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues, TASK_MIN_VALUES_INDEX, true); ordinal++; taskMinMaxOrdinal++; row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal); // compute and set task level max values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues, + summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues, TASK_MAX_VALUES_INDEX, false); ordinal++; @@ -358,7 +367,7 @@ public class BlockletDataMap implements DataMap, Cacheable { // store block size row.setLong(blockMetaInfo.getSize(), ordinal); - unsafeMemoryDMStore.addIndexRowToUnsafe(row); + memoryDMStore.addIndexRow(row); } catch (Exception e) { throw new RuntimeException(e); } @@ -379,7 +388,7 @@ public class BlockletDataMap implements DataMap, Cacheable { summaryRow.setByteArray(fileName, INDEX_FILE_NAME); summaryRow.setByteArray(segmentId, SEGMENTID); try { - unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow); + summaryDMStore.addIndexRow(summaryRow); } catch (Exception e) { throw new RuntimeException(e); } @@ -517,7 +526,8 @@ public class BlockletDataMap implements DataMap, Cacheable { taskMinMaxRow.setRow(row, ordinal); } - private void createSchema(SegmentProperties segmentProperties) throws MemoryException { + private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe) + throws MemoryException { List indexSchemas = new ArrayList<>(); // Index key @@ -554,8 +564,8 @@ public class BlockletDataMap implements DataMap, Cacheable { // for storing block length. indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); - unsafeMemoryDMStore = - new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()])); + CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]); + memoryDMStore = getMemoryDMStore(schema, addToUnsafe); } /** @@ -566,7 +576,7 @@ public class BlockletDataMap implements DataMap, Cacheable { * @throws MemoryException */ private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary, - byte[] filePath, byte[] fileName, byte[] segmentId) + byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe) throws MemoryException { List taskMinMaxSchemas = new ArrayList<>(); getMinMaxSchema(segmentProperties, taskMinMaxSchemas); @@ -582,8 +592,9 @@ public class BlockletDataMap implements DataMap, Cacheable { // for storing segmentid taskMinMaxSchemas.add( new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length)); - unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore( - taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()])); + CarbonRowSchema[] schema = + taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]); + summaryDMStore = getMemoryDMStore(schema, addToUnsafe); } private void getMinMaxSchema(SegmentProperties segmentProperties, @@ -612,8 +623,8 @@ public class BlockletDataMap implements DataMap, Cacheable { public boolean isScanRequired(FilterResolverIntf filterExp) { FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); - for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) { - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i); + for (int i = 0; i < summaryDMStore.getRowCount(); i++) { + DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i); boolean isScanRequired = FilterExpressionProcessor .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX), getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX)); @@ -626,25 +637,25 @@ public class BlockletDataMap implements DataMap, Cacheable { @Override public List prune(FilterResolverIntf filterExp) { - if (unsafeMemoryDMStore.getRowCount() == 0) { + if (memoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } List blocklets = new ArrayList<>(); if (filterExp == null) { - int rowCount = unsafeMemoryDMStore.getRowCount(); + int rowCount = memoryDMStore.getRowCount(); for (int i = 0; i < rowCount; i++) { - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow(); + DataMapRow safeRow = memoryDMStore.getDataMapRow(i).convertToSafeRow(); blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX))); } } else { // Remove B-tree jump logic as start and end key prepared is not // correct for old store scenarios int startIndex = 0; - int endIndex = unsafeMemoryDMStore.getRowCount(); + int endIndex = memoryDMStore.getRowCount(); FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); while (startIndex < endIndex) { - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow(); + DataMapRow safeRow = memoryDMStore.getDataMapRow(startIndex).convertToSafeRow(); int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX); String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS); @@ -663,7 +674,7 @@ public class BlockletDataMap implements DataMap, Cacheable { @Override public List prune(FilterResolverIntf filterExp, List partitions) { - if (unsafeMemoryDMStore.getRowCount() == 0) { + if (memoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } // if it has partitioned datamap but there is no partitioned information stored, it means @@ -736,7 +747,7 @@ public class BlockletDataMap implements DataMap, Cacheable { public ExtendedBlocklet getDetailedBlocklet(String blockletId) { int index = Integer.parseInt(blockletId); - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow(); + DataMapRow safeRow = memoryDMStore.getDataMapRow(index).convertToSafeRow(); return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)); } @@ -787,7 +798,7 @@ public class BlockletDataMap implements DataMap, Cacheable { private String[] getFileDetails() { try { String[] fileDetails = new String[3]; - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); + DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0); fileDetails[0] = new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET); fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME), @@ -811,14 +822,14 @@ public class BlockletDataMap implements DataMap, Cacheable { private int findStartIndex(DataMapRow key, Comparator comparator) { int childNodeIndex; int low = 0; - int high = unsafeMemoryDMStore.getRowCount() - 1; + int high = memoryDMStore.getRowCount() - 1; int mid = 0; int compareRes = -1; // while (low <= high) { mid = (low + high) >>> 1; // compare the entries - compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid)); if (compareRes < 0) { high = mid - 1; } else if (compareRes > 0) { @@ -827,7 +838,7 @@ public class BlockletDataMap implements DataMap, Cacheable { // if key is matched then get the first entry int currentPos = mid; while (currentPos - 1 >= 0 - && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) { + && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos - 1)) == 0) { currentPos--; } mid = currentPos; @@ -859,14 +870,14 @@ public class BlockletDataMap implements DataMap, Cacheable { private int findEndIndex(DataMapRow key, Comparator comparator) { int childNodeIndex; int low = 0; - int high = unsafeMemoryDMStore.getRowCount() - 1; + int high = memoryDMStore.getRowCount() - 1; int mid = 0; int compareRes = -1; // while (low <= high) { mid = (low + high) >>> 1; // compare the entries - compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid)); if (compareRes < 0) { high = mid - 1; } else if (compareRes > 0) { @@ -874,8 +885,8 @@ public class BlockletDataMap implements DataMap, Cacheable { } else { int currentPos = mid; // if key is matched then get the first entry - while (currentPos + 1 < unsafeMemoryDMStore.getRowCount() - && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) { + while (currentPos + 1 < memoryDMStore.getRowCount() + && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos + 1)) == 0) { currentPos++; } mid = currentPos; @@ -903,13 +914,13 @@ public class BlockletDataMap implements DataMap, Cacheable { buffer.putInt(key.getNoDictionaryKeys().length); buffer.put(key.getDictionaryKeys()); buffer.put(key.getNoDictionaryKeys()); - DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema()); + DataMapRowImpl dataMapRow = new DataMapRowImpl(memoryDMStore.getSchema()); dataMapRow.setByteArray(buffer.array(), 0); return dataMapRow; } private byte[] getColumnSchemaBinary() { - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); + DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0); return unsafeRow.getByteArray(SCHEMA); } @@ -933,15 +944,15 @@ public class BlockletDataMap implements DataMap, Cacheable { @Override public void clear() { - if (unsafeMemoryDMStore != null) { - unsafeMemoryDMStore.freeMemory(); - unsafeMemoryDMStore = null; + if (memoryDMStore != null) { + memoryDMStore.freeMemory(); + memoryDMStore = null; segmentProperties = null; } // clear task min/max unsafe memory - if (null != unsafeMemorySummaryDMStore) { - unsafeMemorySummaryDMStore.freeMemory(); - unsafeMemorySummaryDMStore = null; + if (null != summaryDMStore) { + summaryDMStore.freeMemory(); + summaryDMStore = null; } } @@ -958,13 +969,59 @@ public class BlockletDataMap implements DataMap, Cacheable { @Override public long getMemorySize() { long memoryUsed = 0L; - if (unsafeMemoryDMStore != null) { - memoryUsed += unsafeMemoryDMStore.getMemoryUsed(); + if (memoryDMStore != null) { + memoryUsed += memoryDMStore.getMemoryUsed(); } - if (null != unsafeMemorySummaryDMStore) { - memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed(); + if (null != summaryDMStore) { + memoryUsed += summaryDMStore.getMemoryUsed(); } return memoryUsed; } + public TableBlockIndexUniqueIdentifier getTableBlockUniqueIdentifier() { + return tableBlockUniqueIdentifier; + } + + public void setTableBlockUniqueIdentifier( + TableBlockIndexUniqueIdentifier tableBlockUniqueIdentifier) { + this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier; + } + + public void setSegmentProperties(SegmentProperties segmentProperties) { + this.segmentProperties = segmentProperties; + } + + public int[] getColumnCardinality() { + return columnCardinality; + } + + private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe) + throws MemoryException { + AbstractMemoryDMStore memoryDMStore; + if (addToUnsafe) { + memoryDMStore = new UnsafeMemoryDMStore(schema); + } else { + memoryDMStore = new SafeMemoryDMStore(schema); + } + return memoryDMStore; + } + + /** + * This method will ocnvert safe to unsafe memory DM store + * + * @throws MemoryException + */ + public void convertToUnsafeDMStore() throws MemoryException { + if (memoryDMStore instanceof SafeMemoryDMStore) { + UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore(); + memoryDMStore.freeMemory(); + memoryDMStore = unsafeMemoryDMStore; + } + if (summaryDMStore instanceof SafeMemoryDMStore) { + UnsafeMemoryDMStore unsafeSummaryMemoryDMStore = summaryDMStore.convertToUnsafeDMStore(); + summaryDMStore.freeMemory(); + summaryDMStore = unsafeSummaryMemoryDMStore; + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java index 99e48a5..02ac8d7 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java @@ -16,7 +16,10 @@ */ package org.apache.carbondata.core.indexstore.blockletindex; +import java.util.Set; + import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; /** * This class contains required information to make the Blocklet datamap distributable. @@ -31,11 +34,24 @@ public class BlockletDataMapDistributable extends DataMapDistributable { */ private String filePath; - public BlockletDataMapDistributable(String indexFilePath) { + private Set tableBlockIndexUniqueIdentifiers; + + public BlockletDataMapDistributable(String indexFilePath, + Set tableBlockIndexUniqueIdentifiers) { this.filePath = indexFilePath; + this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers; } public String getFilePath() { return filePath; } + + public Set getTableBlockIndexUniqueIdentifiers() { + return tableBlockIndexUniqueIdentifiers; + } + + public void setTableBlockIndexUniqueIdentifiers( + Set tableBlockIndexUniqueIdentifiers) { + this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 5eb077f..c08c87e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -21,13 +21,16 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.CacheableDataMap; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapWriter; @@ -37,8 +40,10 @@ import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.util.BlockletDataMapUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; @@ -50,12 +55,13 @@ import org.apache.hadoop.fs.RemoteIterator; /** * Table map for blocklet */ -public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher { +public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher, + CacheableDataMap { private AbsoluteTableIdentifier identifier; // segmentId -> list of index file - private Map> segmentMap = new HashMap<>(); + private Map> segmentMap = new HashMap<>(); private Cache cache; @@ -73,33 +79,47 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe @Override public List getDataMaps(Segment segment) throws IOException { + Set identifiers = getTableBlockIndexUniqueIdentifiers(segment); List tableBlockIndexUniqueIdentifiers = - getTableBlockIndexUniqueIdentifiers(segment); + new ArrayList<>(identifiers.size()); + tableBlockIndexUniqueIdentifiers.addAll(identifiers); return cache.getAll(tableBlockIndexUniqueIdentifiers); } - private List getTableBlockIndexUniqueIdentifiers( + @Override public void cache(DataMap dataMap) throws IOException, MemoryException { + BlockletDataMap blockletDataMap = (BlockletDataMap) dataMap; + cache.put(blockletDataMap.getTableBlockUniqueIdentifier(), blockletDataMap); + } + + @Override + public List getAllUncachedDistributables( + List distributables) throws IOException { + List distributablesToBeLoaded = new ArrayList<>(distributables.size()); + for (DataMapDistributable distributable : distributables) { + Segment segment = distributable.getSegment(); + Set tableBlockIndexUniqueIdentifiers = + getTableBlockIndexUniqueIdentifiers(segment); + // filter out the tableBlockIndexUniqueIdentifiers based on distributable + Set validIdentifiers = BlockletDataMapUtil + .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers, + (BlockletDataMapDistributable) distributable); + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : validIdentifiers) { + if (null == cache.getIfPresent(tableBlockIndexUniqueIdentifier)) { + distributablesToBeLoaded.add(distributable); + break; + } + } + } + return distributablesToBeLoaded; + } + + private Set getTableBlockIndexUniqueIdentifiers( Segment segment) throws IOException { - List tableBlockIndexUniqueIdentifiers = + Set tableBlockIndexUniqueIdentifiers = segmentMap.get(segment.getSegmentNo()); if (tableBlockIndexUniqueIdentifiers == null) { - tableBlockIndexUniqueIdentifiers = new ArrayList<>(); - Map indexFiles; - if (segment.getSegmentFileName() == null) { - String path = - CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); - indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path); - } else { - SegmentFileStore fileStore = - new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); - indexFiles = fileStore.getIndexFiles(); - } - for (Map.Entry indexFileEntry: indexFiles.entrySet()) { - Path indexFile = new Path(indexFileEntry.getKey()); - tableBlockIndexUniqueIdentifiers.add( - new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), - indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo())); - } + tableBlockIndexUniqueIdentifiers = + BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment, identifier.getTablePath()); segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers); } return tableBlockIndexUniqueIdentifiers; @@ -121,7 +141,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe } return detailedBlocklets; } - List identifiers = + Set identifiers = getTableBlockIndexUniqueIdentifiers(segment); // Retrieve each blocklets detail information from blocklet datamap for (Blocklet blocklet : blocklets) { @@ -136,12 +156,12 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe if (blocklet instanceof ExtendedBlocklet) { return (ExtendedBlocklet) blocklet; } - List identifiers = + Set identifiers = getTableBlockIndexUniqueIdentifiers(segment); return getExtendedBlocklet(identifiers, blocklet); } - private ExtendedBlocklet getExtendedBlocklet(List identifiers, + private ExtendedBlocklet getExtendedBlocklet(Set identifiers, Blocklet blocklet) throws IOException { String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath()); for (TableBlockIndexUniqueIdentifier identifier : identifiers) { @@ -156,6 +176,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe @Override public List toDistributable(Segment segment) { List distributables = new ArrayList<>(); + Map indexFiles = null; try { CarbonFile[] carbonIndexFiles; if (segment.getSegmentFileName() == null) { @@ -164,11 +185,20 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe } else { SegmentFileStore fileStore = new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); - Map indexFiles = fileStore.getIndexFiles(); + indexFiles = fileStore.getIndexFiles(); carbonIndexFiles = new CarbonFile[indexFiles.size()]; int i = 0; - for (String indexFile : indexFiles.keySet()) { - carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile); + for (Map.Entry entry : indexFiles.entrySet()) { + String indexFile = entry.getKey(); + String mergeFileName = entry.getValue(); + if (null != mergeFileName) { + String mergeIndexPath = indexFile + .substring(0, indexFile.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1) + + mergeFileName; + carbonIndexFiles[i++] = FileFactory.getCarbonFile(mergeIndexPath); + } else { + carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile); + } } } for (int i = 0; i < carbonIndexFiles.length; i++) { @@ -179,7 +209,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe LocatedFileStatus fileStatus = iter.next(); String[] location = fileStatus.getBlockLocations()[0].getHosts(); BlockletDataMapDistributable distributable = - new BlockletDataMapDistributable(path.toString()); + new BlockletDataMapDistributable(path.toString(), null); distributable.setLocations(location); distributables.add(distributable); @@ -197,7 +227,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe @Override public void clear(Segment segment) { - List blockIndexes = segmentMap.remove(segment.getSegmentNo()); + Set blockIndexes = segmentMap.remove(segment.getSegmentNo()); if (blockIndexes != null) { for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { DataMap dataMap = cache.getIfPresent(blockIndex); @@ -251,4 +281,9 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe // TODO: pass SORT_COLUMNS into this class return null; } + + public Map> getSegmentMap() { + return segmentMap; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java index ebeb278..17b2463 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java @@ -32,6 +32,8 @@ public class BlockletDataMapModel extends DataMapModel { private String segmentId; + private boolean addToUnsafe = true; + public BlockletDataMapModel(String filePath, byte[] fileData, Map blockMetaInfoMap, String segmentId) { super(filePath); @@ -40,6 +42,13 @@ public class BlockletDataMapModel extends DataMapModel { this.segmentId = segmentId; } + public BlockletDataMapModel(String filePath, byte[] fileData, + Map blockMetaInfoMap, String segmentId, boolean addToUnsafe) { + this(filePath, fileData, blockMetaInfoMap, segmentId); + this.addToUnsafe = addToUnsafe; + } + + public byte[] getFileData() { return fileData; } @@ -51,4 +60,8 @@ public class BlockletDataMapModel extends DataMapModel { public String getSegmentId() { return segmentId; } + + public boolean isAddToUnsafe() { + return addToUnsafe; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java index b764bdf..496a1d0 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java @@ -16,13 +16,15 @@ */ package org.apache.carbondata.core.indexstore.row; +import java.io.Serializable; + import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; /** * It is just a normal row to store data. Implementation classes could be safe and unsafe. * TODO move this class a global row and use across loading after DataType is changed class */ -public abstract class DataMapRow { +public abstract class DataMapRow implements Serializable { protected CarbonRowSchema[] schemas; @@ -88,4 +90,13 @@ public abstract class DataMapRow { public int getColumnCount() { return schemas.length; } + + /** + * default implementation + * + * @return + */ + public DataMapRow convertToSafeRow() { + return this; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java index 1b95984..323fb24 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -30,7 +30,12 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe; */ public class UnsafeDataMapRow extends DataMapRow { - private MemoryBlock block; + private static final long serialVersionUID = -8649767299407770884L; + + // As it is an unsafe memory block it is not recommended to serialize. + // If at all required to be serialized then override writeObject methods + // to which should take care of clearing the unsafe memory post serialization + private transient MemoryBlock block; private int pointer; http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java index 813be4a..adb8715 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java @@ -16,12 +16,14 @@ */ package org.apache.carbondata.core.indexstore.schema; +import java.io.Serializable; + import org.apache.carbondata.core.metadata.datatype.DataType; /** * It just have 2 types right now, either fixed or variable. */ -public abstract class CarbonRowSchema { +public abstract class CarbonRowSchema implements Serializable { protected DataType dataType; http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java new file mode 100644 index 0000000..766650d --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.BlockMetaInfo; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + + +public class BlockletDataMapUtil { + + public static Map getBlockMetaInfoMap( + TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore, + Set filesRead) throws IOException { + if (identifier.getMergeIndexFileName() != null) { + CarbonFile indexMergeFile = FileFactory.getCarbonFile( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getMergeIndexFileName()); + if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) { + indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile }); + filesRead.add(indexMergeFile.getPath()); + } + } + if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) { + indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getIndexFileName()) }); + } + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + Map blockMetaInfoMap = new HashMap<>(); + List indexInfo = fileFooterConverter.getIndexInfo( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName())); + for (DataFileFooter footer : indexInfo) { + String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath(); + blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath)); + } + return blockMetaInfoMap; + } + + private static BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException { + CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile); + if (carbonFile instanceof AbstractDFSCarbonFile) { + RemoteIterator iter = + ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile)); + LocatedFileStatus fileStatus = iter.next(); + String[] location = fileStatus.getBlockLocations()[0].getHosts(); + long len = fileStatus.getLen(); + return new BlockMetaInfo(location, len); + } else { + return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize()); + } + } + + public static Set getTableBlockUniqueIdentifiers(Segment segment, + String tablePath) throws IOException { + Set tableBlockIndexUniqueIdentifiers = new HashSet<>(); + Map indexFiles; + if (segment.getSegmentFileName() == null) { + String path = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo()); + indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path); + } else { + SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); + indexFiles = fileStore.getIndexFiles(); + } + for (Map.Entry indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + tableBlockIndexUniqueIdentifiers.add( + new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), indexFile.getName(), + indexFileEntry.getValue(), segment.getSegmentNo())); + } + return tableBlockIndexUniqueIdentifiers; + } + + /** + * This method will filter out the TableBlockIndexUniqueIdentifiers belongs to that distributable + * + * @param tableBlockIndexUniqueIdentifiers + * @param distributable + * @return + */ + public static Set filterIdentifiersBasedOnDistributable( + Set tableBlockIndexUniqueIdentifiers, + BlockletDataMapDistributable distributable) { + Set validIdentifiers = + new HashSet<>(tableBlockIndexUniqueIdentifiers.size()); + if (distributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : + tableBlockIndexUniqueIdentifiers) { + if (null != tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) { + validIdentifiers.add(tableBlockIndexUniqueIdentifier); + } + } + } else { + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : + tableBlockIndexUniqueIdentifiers) { + if (null == tableBlockIndexUniqueIdentifier.getMergeIndexFileName()) { + validIdentifiers.add(tableBlockIndexUniqueIdentifier); + } + } + } + return validIdentifiers; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index c232b1e..08b5929 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -203,6 +203,11 @@ public class SessionParams implements Serializable { isValid = true; } else if (key.equalsIgnoreCase(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)) { isValid = true; + } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) { + isValid = CarbonUtil.validateBoolean(value); + if (!isValid) { + throw new InvalidConfigurationException("Invalid value " + value + " for key " + key); + } } else { throw new InvalidConfigurationException( "The key " + key + " not supported for dynamic configuration."); http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java new file mode 100644 index 0000000..67c50e5 --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.Before; +import org.junit.Test; + +public class TestBlockletDataMapFactory { + + private AbsoluteTableIdentifier absoluteTableIdentifier; + + private BlockletDataMapFactory blockletDataMapFactory; + + private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier; + + private Cache cache; + + @Before public void setUp() { + blockletDataMapFactory = new BlockletDataMapFactory(); + blockletDataMapFactory.init(absoluteTableIdentifier, "dataMapName"); + tableBlockIndexUniqueIdentifier = + new TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0", + "0_batchno0-0-1521012756709.carbonindex", null, "0"); + cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP); + } + + @Test public void addDataMapToCache() + throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException, + IllegalAccessException { + BlockletDataMap dataMap = new BlockletDataMap(); + dataMap.setTableBlockUniqueIdentifier(tableBlockIndexUniqueIdentifier); + Method method = + BlockletDataMapFactory.class.getDeclaredMethod("cache", DataMap.class); + method.setAccessible(true); + method.invoke(blockletDataMapFactory, dataMap); + DataMap result = cache.getIfPresent(tableBlockIndexUniqueIdentifier); + assert null != result; + } + + @Test public void getValidDistributables() throws IOException { + BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable( + "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex", null); + Segment segment = new Segment("0", null); + blockletDataMapDistributable.setSegment(segment); + BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable( + "/opt/store/default/carbon_table/Fact/Part0/Segment_0/1521012756710.carbonindexmerge", null); + blockletDataMapDistributable1.setSegment(segment); + List dataMapDistributables = new ArrayList<>(2); + dataMapDistributables.add(blockletDataMapDistributable); + dataMapDistributables.add(blockletDataMapDistributable1); + new MockUp() { + @Mock Set getTableBlockIndexUniqueIdentifiers( + Segment segment) { + TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 = + new TableBlockIndexUniqueIdentifier( + "/opt/store/default/carbon_table/Fact/Part0/Segment_0", + "0_batchno0-0-1521012756701.carbonindex", "1521012756710.carbonindexmerge", "0"); + TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier2 = + new TableBlockIndexUniqueIdentifier( + "/opt/store/default/carbon_table/Fact/Part0/Segment_0", + "0_batchno0-0-1521012756702.carbonindex", "1521012756710.carbonindexmerge", "0"); + Set tableBlockIndexUniqueIdentifiers = new HashSet<>(3); + tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier); + tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1); + tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier2); + return tableBlockIndexUniqueIdentifiers; + } + }; + List validDistributables = + blockletDataMapFactory.getAllUncachedDistributables(dataMapDistributables); + assert 1 == validDistributables.size(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java index 8be1e2e..9f67b22 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java @@ -16,21 +16,38 @@ */ package org.apache.carbondata.hadoop; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datastore.SegmentTaskIndexStore; import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; /** * CacheClient : Holds all the Cache access clients for Btree, Dictionary */ public class CacheClient { + private static final LogService LOGGER = + LogServiceFactory.getLogService(CacheClient.class.getName()); + // segment access client for driver LRU cache private CacheAccessClient segmentAccessClient; + private static Map + segmentProperties = + new HashMap(); + public CacheClient() { Cache segmentCache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE); @@ -45,4 +62,30 @@ public class CacheClient { public void close() { segmentAccessClient.close(); } + + /** + * Method to get the segment properties and avoid construction of new segment properties until + * the schema is not modified + * + * @param tableIdentifier + * @param columnsInTable + * @param columnCardinality + */ + public SegmentProperties getSegmentProperties(AbsoluteTableIdentifier tableIdentifier, + List columnsInTable, int[] columnCardinality) { + SegmentTaskIndexStore.SegmentPropertiesWrapper segmentPropertiesWrapper = + new SegmentTaskIndexStore.SegmentPropertiesWrapper(tableIdentifier, columnsInTable, + columnCardinality); + SegmentProperties segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper); + if (null == segmentProperties) { + // create a metadata details + // this will be useful in query handling + // all the data file metadata will have common segment properties we + // can use first one to get create the segment properties + LOGGER.info("Constructing new SegmentProperties"); + segmentProperties = new SegmentProperties(columnsInTable, columnCardinality); + this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties); + } + return segmentProperties; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java new file mode 100644 index 0000000..35d4b39 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.util.List; + +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * abstract class for data map job + */ +public abstract class AbstractDataMapJob implements DataMapJob { + + @Override + public List execute(CarbonTable carbonTable, FileInputFormat format) { + return null; + } + + @Override public List execute(DistributableDataMapFormat dataMapFormat, + FilterResolverIntf resolverIntf) { + return null; + } +}