carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [15/22] incubator-carbondata git commit: IUD Integration to query flow
Date Fri, 06 Jan 2017 13:57:15 GMT
IUD Integration to query flow


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d0b4a981
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d0b4a981
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d0b4a981

Branch: refs/heads/master
Commit: d0b4a981d80dce839a7339f4c02192f26051acb8
Parents: 427b202
Author: Venkata Ramana G <g.ramana.v@gmail.com>
Authored: Mon Jan 2 11:41:37 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 6 19:16:29 2017 +0530

----------------------------------------------------------------------
 .../core/carbon/datastore/BlockIndexStore.java  |  49 +++++++-
 .../carbon/datastore/SegmentTaskIndexStore.java |   4 +
 .../apache/carbondata/core/update/UpdateVO.java |  10 ++
 .../SegmentUpdateStatusManager.java             |   1 +
 .../carbondata/scan/model/QueryModel.java       |  13 ++
 .../carbondata/hadoop/CarbonInputFormat.java    | 123 ++++++++++++++-----
 .../carbondata/hadoop/CarbonInputSplit.java     |  11 ++
 7 files changed, 181 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index e431805..cdaedd8 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -22,7 +22,9 @@ package org.apache.carbondata.core.carbon.datastore;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
@@ -43,7 +45,11 @@ import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.update.CarbonUpdateUtil;
+import org.apache.carbondata.core.update.UpdateVO;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.scan.model.QueryModel;
+
 
 /**
  * This class is used to load the B-Tree in Executor LRU Cache
@@ -255,7 +261,7 @@ public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K,
V> {
       }
     }
     if (exceptionOccurred) {
-      LOGGER.error("Block B-Tree loading failed. Clearing the access count of the loaded
blocks.");
+      LOGGER.error("Block B-tree loading failed. Clearing the access count of the loaded
blocks.");
       // in case of any failure clear the access count for the valid loaded blocks
       clearAccessCountForLoadedBlocks(loadedBlockArray);
       throw new IndexBuilderException("Block B-tree loading failed", exceptionRef);
@@ -331,4 +337,45 @@ public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K,
V> {
       }
     }
   }
+
+  /**
+   * remove TableBlocks executer level If Horizontal Compaction Done
+   * @param queryModel
+   */
+  public void removeTableBlocksIfHorizontalCompactionDone(QueryModel queryModel) {
+    // get the invalid segments blocks details
+    Map<String, UpdateVO> invalidBlocksVO = queryModel.getInvalidBlockVOForSegmentId();
+    if (!invalidBlocksVO.isEmpty()) {
+      UpdateVO updateMetadata;
+      Iterator<Map.Entry<String, UpdateVO>> itr = invalidBlocksVO.entrySet().iterator();
+      String blockTimestamp = null;
+      while (itr.hasNext()) {
+        Map.Entry<String, UpdateVO> entry = itr.next();
+        TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+            new TableSegmentUniqueIdentifier(queryModel.getAbsoluteTableIdentifier(),
+                entry.getKey());
+        List<BlockInfo> blockInfos = segmentIdToBlockListMap
+            .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+        if (null != blockInfos) {
+          for (BlockInfo blockInfo : blockInfos) {
+            // reading the updated block names from status manager instance
+            blockTimestamp = blockInfo.getBlockUniqueName()
+                .substring(blockInfo.getBlockUniqueName().lastIndexOf('-') + 1,
+                    blockInfo.getBlockUniqueName().length());
+            updateMetadata = entry.getValue();
+            if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(Long.parseLong(blockTimestamp)))
{
+              Long blockTimeStamp = Long.parseLong(blockTimestamp);
+              if (blockTimeStamp > updateMetadata.getFactTimestamp() && (
+                  updateMetadata.getUpdateDeltaStartTimestamp() != null
+                      && blockTimeStamp < updateMetadata.getUpdateDeltaStartTimestamp()))
{
+                String lruCacheKey =
+                    getLruCacheKey(queryModel.getAbsoluteTableIdentifier(), blockInfo);
+                lruCache.remove(lruCacheKey);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index eb8aea2..08bd85f 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -39,8 +39,12 @@ import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.model.QueryModel;
+
+
 
 /**
  * Class to handle loading, unloading,clearing,storing of the table

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
index 4961bc6..ad744c4 100644
--- a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
+++ b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
@@ -30,6 +30,8 @@ public class UpdateVO implements Serializable {
 
   private Long updateDeltaStartTimestamp;
 
+  private String segmentId;
+
   public Long getLatestUpdateTimestamp() {
     return latestUpdateTimestamp;
   }
@@ -95,4 +97,12 @@ public class UpdateVO implements Serializable {
     }
     return latestUpdateTimestamp;
   }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java
index 0104eab..dc8320f 100644
--- a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java
@@ -883,6 +883,7 @@ public class SegmentUpdateStatusManager {
     UpdateVO range = new UpdateVO();
     for (LoadMetadataDetails segment : segmentDetails) {
       if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
+        range.setSegmentId(segmentId);
         range.setFactTimestamp(segment.getLoadStartTime());
         if (!segment.getUpdateDeltaStartTimestamp().isEmpty() && !segment
             .getUpdateDeltaEndTimestamp().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
index 89ac225..0afefa4 100644
--- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.scan.model;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -32,6 +33,8 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColu
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.update.UpdateVO;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.scan.expression.ColumnExpression;
 import org.apache.carbondata.scan.expression.Expression;
@@ -102,6 +105,7 @@ public class QueryModel implements Serializable {
    * or compacted
    */
   private List<String> invalidSegmentIds;
+  private Map<String, UpdateVO> invalidSegmentBlockIdMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
   public QueryModel() {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
@@ -348,4 +352,13 @@ public class QueryModel implements Serializable {
   public void setVectorReader(boolean vectorReader) {
     this.vectorReader = vectorReader;
   }
+public void setInvalidBlockForSegmentId(List<UpdateVO> invalidSegmentTimestampList)
{
+    for (UpdateVO anUpdateVO : invalidSegmentTimestampList) {
+      this.invalidSegmentBlockIdMap.put(anUpdateVO.getSegmentId(), anUpdateVO);
+    }
+  }
+
+  public Map<String,UpdateVO>  getInvalidBlockVOForSegmentId() {
+    return  invalidSegmentBlockIdMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 6950e54..8f94ae0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -18,14 +18,11 @@
  */
 package org.apache.carbondata.hadoop;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
+import org.apache.carbondata.common.iudprocessor.iuddata.BlockMappingVO;
+import org.apache.carbondata.common.iudprocessor.iuddata.DeleteDeltaDataUtil;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
@@ -33,11 +30,7 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
 import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.*;
 import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
 import org.apache.carbondata.core.carbon.datastore.impl.btree.BlockBTreeLeafNode;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -60,14 +53,12 @@ import org.apache.carbondata.hadoop.util.BlockLevelTraverser;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.hadoop.util.SchemaReader;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
 import org.apache.carbondata.scan.expression.Expression;
 import org.apache.carbondata.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.scan.filter.FilterUtil;
 import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.scan.model.QueryModel;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -82,6 +73,10 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.StringUtils;
 
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.*;
+
 
 /**
  * Carbon Input format class representing one carbon table
@@ -99,6 +94,11 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
 
   /**
+   * Filter resolver reference to hold filter resolver object for a query
+   */
+  private FilterResolverIntf filterResolver;
+
+  /**
    * It is optional, if user does not set then it reads from store
    *
    * @param configuration
@@ -147,6 +147,24 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     }
   }
 
+  /**
+   * It sets the resolved filter expression
+   *
+   * @param configuration
+   * @param filterResolver
+   */
+  public void setFilterPredicates(Configuration configuration,
+                                  FilterResolverIntf filterResolver) {
+    try {
+      if (filterResolver == null) {
+        return;
+      }
+      this.filterResolver = filterResolver;
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting filter expression to Job", e);
+    }
+  }
+
   public static void setColumnProjection(Configuration configuration, CarbonProjection projection)
{
     if (projection == null || projection.isEmpty()) {
       return;
@@ -195,6 +213,47 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   }
 
   /**
+   * @return updateExtension
+   */
+  private String[] getSegmentsFromConfiguration(JobContext job) throws IOException {
+    String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+    // if no segments
+    if (segmentString.trim().isEmpty()) {
+      return new String[0];
+    }
+
+    String[] segments = segmentString.split(",");
+    String[] segmentIds = new String[segments.length];
+    int i = 0;
+    try {
+      for (; i < segments.length; i++) {
+        segmentIds[i] = segments[i];
+      }
+    } catch (NumberFormatException e) {
+      throw new IOException("segment no:" + segments[i] + " should be integer");
+    }
+    return segmentIds;
+  }
+
+  /**
+   * Below method will be used to set the segments details if
+   * segments are not added in the configuration
+   *
+   * @param job
+   * @param absoluteTableIdentifier
+   * @throws IOException
+   */
+  private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier)
+          throws IOException {
+    if (getSegmentsFromConfiguration(job).length == 0) {
+      // Get the valid segments from the carbon store.
+      SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
+              new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+      setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments());
+    }
+  }
+
+  /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR
    * are used to get table path to read.
@@ -207,11 +266,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
     CacheClient cacheClient = new CacheClient(identifier.getStorePath());
     List<String> invalidSegments = new ArrayList<>();
+    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
 
     // get all valid segments and set them into the configuration
     if (getSegmentsToAccess(job).length == 0) {
-      SegmentStatusManager.SegmentStatus segments =
-          SegmentStatusManager.getSegmentStatus(identifier);
+      SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+      SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+              segmentStatusManager.getValidAndInvalidSegments();
+      SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
       setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments());
       if (segments.getValidSegments().size() == 0) {
         return new ArrayList<>(0);
@@ -219,6 +281,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
 
       // remove entry in the segment index if there are invalid segments
       invalidSegments.addAll(segments.getInvalidSegments());
+      for (String invalidSegmentId : invalidSegments) {
+        invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+      }
       if (invalidSegments.size() > 0) {
         List<TableSegmentUniqueIdentifier> invalidSegmentsIds
             = new ArrayList<>(invalidSegments.size());
@@ -227,11 +292,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
         }
         cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
       }
+    } catch (Exception ex) {
+      throw new IOException(ex);
     }
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
     CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+	// this will be null in case of corrupt schema file.
+    if(null == carbonTable){
+      throw new IOException("Missing/Corrupt schema file for table.");
+    }
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
     FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
     // do block filtering and get split
@@ -241,6 +312,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     if (invalidSegments.size() > 0) {
       for (InputSplit split : splits) {
         ((CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+        ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList);
       }
     }
     return splits;
@@ -322,7 +394,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       throws IndexBuilderException, IOException, CarbonUtilException {
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
-    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+    Map<String, AbstractIndex> segmentIndexMap =
         getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, updateStatusManager);
 
     List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
@@ -450,7 +522,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
                 statusManager.getUpdatedTasksDetailsForSegment(segmentId, updateStatusManager);
       }
     }
-
     // if segment tree is not loaded, load the segment tree
     if (segmentIndexMap == null || isSegmentUpdated) {
       // if the segment is updated only the updated blocks TableInfo instance has to be
@@ -578,14 +649,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
-    QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
-    CarbonReadSupport readSupport = getReadSupportClass(configuration);
-    return new CarbonRecordReader<T>(queryModel, readSupport);
-  }
-
-  public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-      throws IOException {
-    Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getCarbonTable(configuration);
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(configuration);
 
@@ -608,10 +671,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
         queryModel.setInvalidSegmentIds(invalidSegments);
       }
     }
-    return queryModel;
+
+    CarbonReadSupport readSupport = getReadSupportClass(configuration);
+    return new CarbonRecordReader<T>(queryModel, readSupport);
   }
 
-  public CarbonReadSupport getReadSupportClass(Configuration configuration) {
+  private CarbonReadSupport getReadSupportClass(Configuration configuration) {
     String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
     //By default it uses dictionary decoder read class
     CarbonReadSupport readSupport = null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d0b4a981/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index a7aa0a1..cabbff9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.carbon.datastore.block.Distributable;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.update.UpdateVO;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
@@ -70,6 +71,8 @@ public class CarbonInputSplit extends FileSplit
   private Map<String, String> blockStorageIdMap =
           new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
+  private List<UpdateVO> invalidTimestampsList;
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -182,6 +185,14 @@ public class CarbonInputSplit extends FileSplit
     this.invalidSegments = invalidSegments;
   }
 
+  public void setInvalidTimestampRange(List<UpdateVO> invalidTimestamps) {
+    invalidTimestampsList = invalidTimestamps;
+  }
+
+  public List<UpdateVO> getInvalidTimestampRange() {
+    return invalidTimestampsList;
+  }
+
   /**
    * returns the number of blocklets
    *


Mime
View raw message