Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 72141200BF4 for ; Fri, 6 Jan 2017 14:57:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 70C99160B49; Fri, 6 Jan 2017 13:57:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 47A77160B48 for ; Fri, 6 Jan 2017 14:57:12 +0100 (CET) Received: (qmail 28464 invoked by uid 500); 6 Jan 2017 13:57:11 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 28455 invoked by uid 99); 6 Jan 2017 13:57:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Jan 2017 13:57:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 09C8C1A05E5 for ; Fri, 6 Jan 2017 13:57:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id FW5FEsVuC4ee for ; Fri, 6 Jan 2017 13:57:06 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id C33185FDC3 for ; Fri, 6 Jan 2017 13:57:02 +0000 (UTC) Received: (qmail 28194 invoked by uid 99); 6 Jan 2017 13:57:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Jan 2017 13:57:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A946EDFC2F; Fri, 6 Jan 2017 13:57:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gvramana@apache.org To: commits@carbondata.incubator.apache.org Date: Fri, 06 Jan 2017 13:57:15 -0000 Message-Id: <6e7c58cf59894955a72817fceda1efe5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/22] incubator-carbondata git commit: IUD Integration to query flow archived-at: Fri, 06 Jan 2017 13:57:13 -0000 IUD Integration to query flow Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d0b4a981 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d0b4a981 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d0b4a981 Branch: refs/heads/master Commit: d0b4a981d80dce839a7339f4c02192f26051acb8 Parents: 427b202 Author: Venkata Ramana G Authored: Mon Jan 2 11:41:37 2017 +0530 Committer: Venkata Ramana G Committed: Fri Jan 6 19:16:29 2017 +0530 ---------------------------------------------------------------------- .../core/carbon/datastore/BlockIndexStore.java | 49 +++++++- .../carbon/datastore/SegmentTaskIndexStore.java | 4 + .../apache/carbondata/core/update/UpdateVO.java | 10 ++ .../SegmentUpdateStatusManager.java | 1 + .../carbondata/scan/model/QueryModel.java | 13 ++ .../carbondata/hadoop/CarbonInputFormat.java | 123 ++++++++++++++----- .../carbondata/hadoop/CarbonInputSplit.java | 11 ++ 7 files changed, 181 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java index e431805..cdaedd8 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java @@ -22,7 +22,9 @@ package org.apache.carbondata.core.carbon.datastore; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; @@ -43,7 +45,11 @@ import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.update.CarbonUpdateUtil; +import org.apache.carbondata.core.update.UpdateVO; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.scan.model.QueryModel; + /** * This class is used to load the B-Tree in Executor LRU Cache @@ -255,7 +261,7 @@ public class BlockIndexStore extends AbstractBlockIndexStoreCache { } } if (exceptionOccurred) { - LOGGER.error("Block B-Tree loading failed. Clearing the access count of the loaded blocks."); + LOGGER.error("Block B-tree loading failed. Clearing the access count of the loaded blocks."); // in case of any failure clear the access count for the valid loaded blocks clearAccessCountForLoadedBlocks(loadedBlockArray); throw new IndexBuilderException("Block B-tree loading failed", exceptionRef); @@ -331,4 +337,45 @@ public class BlockIndexStore extends AbstractBlockIndexStoreCache { } } } + + /** + * remove TableBlocks executer level If Horizontal Compaction Done + * @param queryModel + */ + public void removeTableBlocksIfHorizontalCompactionDone(QueryModel queryModel) { + // get the invalid segments blocks details + Map invalidBlocksVO = queryModel.getInvalidBlockVOForSegmentId(); + if (!invalidBlocksVO.isEmpty()) { + UpdateVO updateMetadata; + Iterator> itr = invalidBlocksVO.entrySet().iterator(); + String blockTimestamp = null; + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(queryModel.getAbsoluteTableIdentifier(), + entry.getKey()); + List blockInfos = segmentIdToBlockListMap + .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + if (null != blockInfos) { + for (BlockInfo blockInfo : blockInfos) { + // reading the updated block names from status manager instance + blockTimestamp = blockInfo.getBlockUniqueName() + .substring(blockInfo.getBlockUniqueName().lastIndexOf('-') + 1, + blockInfo.getBlockUniqueName().length()); + updateMetadata = entry.getValue(); + if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(Long.parseLong(blockTimestamp))) { + Long blockTimeStamp = Long.parseLong(blockTimestamp); + if (blockTimeStamp > updateMetadata.getFactTimestamp() && ( + updateMetadata.getUpdateDeltaStartTimestamp() != null + && blockTimeStamp < updateMetadata.getUpdateDeltaStartTimestamp())) { + String lruCacheKey = + getLruCacheKey(queryModel.getAbsoluteTableIdentifier(), blockInfo); + lruCache.remove(lruCacheKey); + } + } + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java index eb8aea2..08bd85f 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java @@ -39,8 +39,12 @@ import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.scan.model.QueryModel; + + /** * Class to handle loading, unloading,clearing,storing of the table http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java index 4961bc6..ad744c4 100644 --- a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java +++ b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java @@ -30,6 +30,8 @@ public class UpdateVO implements Serializable { private Long updateDeltaStartTimestamp; + private String segmentId; + public Long getLatestUpdateTimestamp() { return latestUpdateTimestamp; } @@ -95,4 +97,12 @@ public class UpdateVO implements Serializable { } return latestUpdateTimestamp; } + + public String getSegmentId() { + return segmentId; + } + + public void setSegmentId(String segmentId) { + this.segmentId = segmentId; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java index 0104eab..dc8320f 100644 --- a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java @@ -883,6 +883,7 @@ public class SegmentUpdateStatusManager { UpdateVO range = new UpdateVO(); for (LoadMetadataDetails segment : segmentDetails) { if (segment.getLoadName().equalsIgnoreCase(segmentId)) { + range.setSegmentId(segmentId); range.setFactTimestamp(segment.getLoadStartTime()); if (!segment.getUpdateDeltaStartTimestamp().isEmpty() && !segment .getUpdateDeltaEndTimestamp().isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java index 89ac225..0afefa4 100644 --- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java @@ -21,6 +21,7 @@ package org.apache.carbondata.scan.model; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +33,8 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColu import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.update.UpdateVO; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.scan.expression.ColumnExpression; import org.apache.carbondata.scan.expression.Expression; @@ -102,6 +105,7 @@ public class QueryModel implements Serializable { * or compacted */ private List invalidSegmentIds; + private Map invalidSegmentBlockIdMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); public QueryModel() { tableBlockInfos = new ArrayList(); @@ -348,4 +352,13 @@ public class QueryModel implements Serializable { public void setVectorReader(boolean vectorReader) { this.vectorReader = vectorReader; } +public void setInvalidBlockForSegmentId(List invalidSegmentTimestampList) { + for (UpdateVO anUpdateVO : invalidSegmentTimestampList) { + this.invalidSegmentBlockIdMap.put(anUpdateVO.getSegmentId(), anUpdateVO); + } + } + + public Map getInvalidBlockVOForSegmentId() { + return invalidSegmentBlockIdMap; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 6950e54..8f94ae0 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -18,14 +18,11 @@ */ package org.apache.carbondata.hadoop; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - +import org.apache.carbondata.common.iudprocessor.iuddata.BlockMappingVO; +import org.apache.carbondata.common.iudprocessor.iuddata.DeleteDeltaDataUtil; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.ColumnarFormatVersion; import org.apache.carbondata.core.carbon.datastore.DataRefNode; @@ -33,11 +30,7 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder; import org.apache.carbondata.core.carbon.datastore.IndexKey; import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier; -import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; -import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; -import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; -import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper; -import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.datastore.block.*; import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder; import org.apache.carbondata.core.carbon.datastore.impl.btree.BlockBTreeLeafNode; import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; @@ -60,14 +53,12 @@ import org.apache.carbondata.hadoop.util.BlockLevelTraverser; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.carbondata.hadoop.util.SchemaReader; -import org.apache.carbondata.lcm.status.SegmentStatusManager; import org.apache.carbondata.scan.expression.Expression; import org.apache.carbondata.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.scan.filter.FilterUtil; import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.scan.model.CarbonQueryPlan; import org.apache.carbondata.scan.model.QueryModel; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -82,6 +73,10 @@ import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.util.StringUtils; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.*; + /** * Carbon Input format class representing one carbon table @@ -99,6 +94,11 @@ public class CarbonInputFormat extends FileInputFormat { private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; /** + * Filter resolver reference to hold filter resolver object for a query + */ + private FilterResolverIntf filterResolver; + + /** * It is optional, if user does not set then it reads from store * * @param configuration @@ -147,6 +147,24 @@ public class CarbonInputFormat extends FileInputFormat { } } + /** + * It sets the resolved filter expression + * + * @param configuration + * @param filterResolver + */ + public void setFilterPredicates(Configuration configuration, + FilterResolverIntf filterResolver) { + try { + if (filterResolver == null) { + return; + } + this.filterResolver = filterResolver; + } catch (Exception e) { + throw new RuntimeException("Error while setting filter expression to Job", e); + } + } + public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { if (projection == null || projection.isEmpty()) { return; @@ -195,6 +213,47 @@ public class CarbonInputFormat extends FileInputFormat { } /** + * @return updateExtension + */ + private String[] getSegmentsFromConfiguration(JobContext job) throws IOException { + String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); + // if no segments + if (segmentString.trim().isEmpty()) { + return new String[0]; + } + + String[] segments = segmentString.split(","); + String[] segmentIds = new String[segments.length]; + int i = 0; + try { + for (; i < segments.length; i++) { + segmentIds[i] = segments[i]; + } + } catch (NumberFormatException e) { + throw new IOException("segment no:" + segments[i] + " should be integer"); + } + return segmentIds; + } + + /** + * Below method will be used to set the segments details if + * segments are not added in the configuration + * + * @param job + * @param absoluteTableIdentifier + * @throws IOException + */ + private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier) + throws IOException { + if (getSegmentsFromConfiguration(job).length == 0) { + // Get the valid segments from the carbon store. + SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = + new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments(); + setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments()); + } + } + + /** * {@inheritDoc} * Configurations FileInputFormat.INPUT_DIR * are used to get table path to read. @@ -207,11 +266,14 @@ public class CarbonInputFormat extends FileInputFormat { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); CacheClient cacheClient = new CacheClient(identifier.getStorePath()); List invalidSegments = new ArrayList<>(); + List invalidTimestampsList = new ArrayList<>(); // get all valid segments and set them into the configuration if (getSegmentsToAccess(job).length == 0) { - SegmentStatusManager.SegmentStatus segments = - SegmentStatusManager.getSegmentStatus(identifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); + SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = + segmentStatusManager.getValidAndInvalidSegments(); + SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments()); if (segments.getValidSegments().size() == 0) { return new ArrayList<>(0); @@ -219,6 +281,9 @@ public class CarbonInputFormat extends FileInputFormat { // remove entry in the segment index if there are invalid segments invalidSegments.addAll(segments.getInvalidSegments()); + for (String invalidSegmentId : invalidSegments) { + invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId)); + } if (invalidSegments.size() > 0) { List invalidSegmentsIds = new ArrayList<>(invalidSegments.size()); @@ -227,11 +292,17 @@ public class CarbonInputFormat extends FileInputFormat { } cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds); } + } catch (Exception ex) { + throw new IOException(ex); } // process and resolve the expression Expression filter = getFilterPredicates(job.getConfiguration()); CarbonTable carbonTable = getCarbonTable(job.getConfiguration()); + // this will be null in case of corrupt schema file. + if(null == carbonTable){ + throw new IOException("Missing/Corrupt schema file for table."); + } CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); // do block filtering and get split @@ -241,6 +312,7 @@ public class CarbonInputFormat extends FileInputFormat { if (invalidSegments.size() > 0) { for (InputSplit split : splits) { ((CarbonInputSplit) split).setInvalidSegments(invalidSegments); + ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList); } } return splits; @@ -322,7 +394,7 @@ public class CarbonInputFormat extends FileInputFormat { throws IndexBuilderException, IOException, CarbonUtilException { QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); QueryStatistic statistic = new QueryStatistic(); - Map segmentIndexMap = + Map segmentIndexMap = getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, updateStatusManager); List resultFilterredBlocks = new LinkedList(); @@ -450,7 +522,6 @@ public class CarbonInputFormat extends FileInputFormat { statusManager.getUpdatedTasksDetailsForSegment(segmentId, updateStatusManager); } } - // if segment tree is not loaded, load the segment tree if (segmentIndexMap == null || isSegmentUpdated) { // if the segment is updated only the updated blocks TableInfo instance has to be @@ -578,14 +649,6 @@ public class CarbonInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); - QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext); - CarbonReadSupport readSupport = getReadSupportClass(configuration); - return new CarbonRecordReader(queryModel, readSupport); - } - - public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException { - Configuration configuration = taskAttemptContext.getConfiguration(); CarbonTable carbonTable = getCarbonTable(configuration); AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(configuration); @@ -608,10 +671,12 @@ public class CarbonInputFormat extends FileInputFormat { queryModel.setInvalidSegmentIds(invalidSegments); } } - return queryModel; + + CarbonReadSupport readSupport = getReadSupportClass(configuration); + return new CarbonRecordReader(queryModel, readSupport); } - public CarbonReadSupport getReadSupportClass(Configuration configuration) { + private CarbonReadSupport getReadSupportClass(Configuration configuration) { String readSupportClass = configuration.get(CARBON_READ_SUPPORT); //By default it uses dictionary decoder read class CarbonReadSupport readSupport = null; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index a7aa0a1..cabbff9 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.carbon.datastore.block.Distributable; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.update.UpdateVO; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.hadoop.internal.index.Block; @@ -70,6 +71,8 @@ public class CarbonInputSplit extends FileSplit private Map blockStorageIdMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + private List invalidTimestampsList; + public CarbonInputSplit() { segmentId = null; taskId = "0"; @@ -182,6 +185,14 @@ public class CarbonInputSplit extends FileSplit this.invalidSegments = invalidSegments; } + public void setInvalidTimestampRange(List invalidTimestamps) { + invalidTimestampsList = invalidTimestamps; + } + + public List getInvalidTimestampRange() { + return invalidTimestampsList; + } + /** * returns the number of blocklets *