carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [39/42] carbondata git commit: Fixed Synchronization issue and improve IUD performance
Date Thu, 15 Jun 2017 11:50:44 GMT
Fixed Synchronization issue and improve IUD performance


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da952e82
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da952e82
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da952e82

Branch: refs/heads/branch-1.1
Commit: da952e82b443839e9c8b7fdeebaed092d3232652
Parents: bbf5dc1
Author: kumarvishal <kumarvishal.1802@gmail.com>
Authored: Mon Jun 12 16:06:24 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Jun 15 13:32:15 2017 +0530

----------------------------------------------------------------------
 .../core/datastore/block/AbstractIndex.java     |  41 ++++++++
 .../core/datastore/block/TableBlockInfo.java    |  22 +++-
 .../core/mutate/CarbonUpdateUtil.java           |  16 +++
 .../core/mutate/DeleteDeltaBlockletDetails.java |  15 +--
 .../carbondata/core/mutate/DeleteDeltaVo.java   |  60 +++++++++++
 .../reader/CarbonDeleteFilesDataReader.java     |  47 +++++++++
 .../impl/DictionaryBasedResultCollector.java    |  11 +-
 .../collector/impl/RawBasedResultCollector.java |   7 +-
 ...structureBasedDictionaryResultCollector.java |   7 +-
 .../RestructureBasedRawResultCollector.java     |   7 +-
 .../executor/impl/AbstractQueryExecutor.java    |   9 +-
 .../scan/executor/infos/BlockExecutionInfo.java |  56 ++++++----
 .../scan/executor/infos/DeleteDeltaInfo.java    |  82 +++++++++++++++
 .../core/scan/result/AbstractScannedResult.java |  61 +++++++----
 .../AbstractDetailQueryResultIterator.java      | 103 ++++++++++++++++++-
 .../scan/scanner/AbstractBlockletScanner.java   |   9 --
 .../core/scan/scanner/impl/FilterScanner.java   |  10 --
 .../SegmentUpdateStatusManager.java             |  29 ++++--
 .../datastore/SegmentTaskIndexStoreTest.java    |   2 +-
 .../core/datastore/block/BlockInfoTest.java     |  12 +--
 .../datastore/block/TableBlockInfoTest.java     |  32 +++---
 .../core/datastore/block/TableTaskInfoTest.java |   8 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   4 +-
 .../core/util/DataFileFooterConverterTest.java  |   8 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |  11 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  39 +++++--
 .../internal/index/impl/InMemoryBTreeIndex.java |   5 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../carbon/datastore/BlockIndexStoreTest.java   |  28 ++---
 31 files changed, 574 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
index b538dc3..4d0e56d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
@@ -17,11 +17,13 @@
 package org.apache.carbondata.core.datastore.block;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 
 public abstract class AbstractIndex implements Cacheable {
 
@@ -51,6 +53,16 @@ public abstract class AbstractIndex implements Cacheable {
   protected long memorySize;
 
   /**
+   * last fetch delete deltaFile timestamp
+   */
+  private long deleteDeltaTimestamp;
+
+  /**
+   * map of blockletidAndPageId to
+   * deleted rows
+   */
+  private Map<String, DeleteDeltaVo> deletedRowsMap;
+  /**
    * @return the segmentProperties
    */
   public SegmentProperties getSegmentProperties() {
@@ -124,4 +136,33 @@ public abstract class AbstractIndex implements Cacheable {
   public void setMemorySize(long memorySize) {
     this.memorySize = memorySize;
   }
+
+  /**
+   * @return latest deleted delta timestamp
+   */
+  public long getDeleteDeltaTimestamp() {
+    return deleteDeltaTimestamp;
+  }
+
+  /**
+   * set the latest delete delta timestamp
+   * @param deleteDeltaTimestamp
+   */
+  public void setDeleteDeltaTimestamp(long deleteDeltaTimestamp) {
+    this.deleteDeltaTimestamp = deleteDeltaTimestamp;
+  }
+
+  /**
+   * @return the deleted record for block map
+   */
+  public Map<String, DeleteDeltaVo> getDeletedRowsMap() {
+    return deletedRowsMap;
+  }
+
+  /**
+   * @param deletedRowsMap
+   */
+  public void setDeletedRowsMap(Map<String, DeleteDeltaVo> deletedRowsMap) {
+    this.deletedRowsMap = deletedRowsMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 8fbaa4a..44347cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -72,14 +72,20 @@ public class TableBlockInfo implements Distributable, Serializable {
   private Map<String, String> blockStorageIdMap =
           new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
+  /**
+   * delete delta files path for this block
+   */
+  private String[] deletedDeltaFilePath;
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
-      long blockLength, ColumnarFormatVersion version) {
+      long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockOffset = blockOffset;
     this.segmentId = segmentId;
     this.locations = locations;
     this.blockLength = blockLength;
     this.version = version;
+    this.deletedDeltaFilePath = deletedDeltaFilePath;
   }
 
   /**
@@ -93,8 +99,9 @@ public class TableBlockInfo implements Distributable, Serializable {
    * @param blockletInfos
    */
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
-      long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version) {
-    this(filePath, blockOffset, segmentId, locations, blockLength, version);
+      long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
+      String[] deletedDeltaFilePath) {
+    this(filePath, blockOffset, segmentId, locations, blockLength, version, deletedDeltaFilePath);
     this.blockletInfos = blockletInfos;
   }
 
@@ -112,8 +119,9 @@ public class TableBlockInfo implements Distributable, Serializable {
    */
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
       long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version,
-      Map<String, String> blockStorageIdMap) {
-    this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version);
+      Map<String, String> blockStorageIdMap, String[] deletedDeltaFilePath) {
+    this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version,
+        deletedDeltaFilePath);
     this.blockStorageIdMap = blockStorageIdMap;
   }
 
@@ -307,4 +315,8 @@ public class TableBlockInfo implements Distributable, Serializable {
   public void setBlockStorageIdMap(Map<String, String> blockStorageIdMap) {
     this.blockStorageIdMap = blockStorageIdMap;
   }
+
+  public String[] getDeletedDeltaFilePath() {
+    return deletedDeltaFilePath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index fef5905..b5a632f 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -800,4 +800,20 @@ public class CarbonUpdateUtil {
 
   }
 
+  /**
+   * Below method will be used to get the latest delete delta file timestamp
+   * @param deleteDeltaFiles
+   * @return latest delete delta file time stamp
+   */
+  public static long getLatestDeleteDeltaTimestamp(String[] deleteDeltaFiles) {
+    long latestTimestamp = 0;
+    for (int i = 0; i < deleteDeltaFiles.length; i++) {
+      long convertTimeStampToLong = Long.parseLong(
+          CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(deleteDeltaFiles[i]));
+      if (latestTimestamp < convertTimeStampToLong) {
+        latestTimestamp = convertTimeStampToLong;
+      }
+    }
+    return latestTimestamp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
index 7df5f22..0f54f3a 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
@@ -21,9 +21,6 @@ import java.io.Serializable;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
 /**
  * This class stores the blocklet details of delete delta file
  */
@@ -35,12 +32,6 @@ public class DeleteDeltaBlockletDetails implements Serializable {
 
   private Set<Integer> deletedRows;
 
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteDeltaBlockletDetails.class.getName());
-
   public DeleteDeltaBlockletDetails(String id, Integer pageId) {
     this.id = id;
     deletedRows = new TreeSet<Integer>();
@@ -84,7 +75,11 @@ public class DeleteDeltaBlockletDetails implements Serializable {
   }
 
   @Override public int hashCode() {
-    return id.hashCode();
+    return id.hashCode() + pageId.hashCode();
+  }
+
+  public String getBlockletKey() {
+    return this.id + '_' + this.pageId;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
new file mode 100644
index 0000000..d68e4e9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.mutate;
+
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Class which keep the information about the rows
+ * while got deleted
+ */
+public class DeleteDeltaVo {
+
+  /**
+   * deleted rows bitset
+   */
+  private BitSet bitSet;
+
+  public DeleteDeltaVo() {
+    bitSet = new BitSet();
+  }
+
+  /**
+   * Below method will be used to insert the rows
+   * which are deleted
+   *
+   * @param data
+   */
+  public void insertData(Set<Integer> data) {
+    Iterator<Integer> iterator = data.iterator();
+    while (iterator.hasNext()) {
+      bitSet.set(iterator.next());
+    }
+  }
+
+  /**
+   * below method will be used to check the row is deleted or not
+   *
+   * @param counter
+   * @return
+   */
+  public boolean containsRow(int counter) {
+    return bitSet.get(counter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index e689566..417ad29 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockletDetails;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 
@@ -120,7 +122,52 @@ public class CarbonDeleteFilesDataReader {
       }
     }
     return pageIdDeleteRowsMap;
+  }
 
+  /**
+   * Below method will be used to read the delete delta files
+   * and get the map of blockletid and page id mapping to deleted
+   * rows
+   *
+   * @param deltaFiles delete delta files array
+   * @return map of blockletid_pageid to deleted rows
+   */
+  public Map<String, DeleteDeltaVo> getDeletedRowsDataVo(String[] deltaFiles) {
+    List<Future<DeleteDeltaBlockDetails>> taskSubmitList = new ArrayList<>();
+    ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
+    for (final String deltaFile : deltaFiles) {
+      taskSubmitList.add(executorService.submit(new Callable<DeleteDeltaBlockDetails>() {
+        @Override public DeleteDeltaBlockDetails call() throws IOException {
+          CarbonDeleteDeltaFileReaderImpl deltaFileReader =
+              new CarbonDeleteDeltaFileReaderImpl(deltaFile, FileFactory.getFileType(deltaFile));
+          return deltaFileReader.readJson();
+        }
+      }));
+    }
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(30, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOGGER.error("Error while reading the delete delta files : " + e.getMessage());
+    }
+    Map<String, DeleteDeltaVo> pageIdToBlockLetVo = new HashMap<>();
+    List<DeleteDeltaBlockletDetails> blockletDetails = null;
+    for (int i = 0; i < taskSubmitList.size(); i++) {
+      try {
+        blockletDetails = taskSubmitList.get(i).get().getBlockletDetails();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+      for (DeleteDeltaBlockletDetails blockletDetail : blockletDetails) {
+        DeleteDeltaVo deleteDeltaVo = pageIdToBlockLetVo.get(blockletDetail.getBlockletKey());
+        if (null == deleteDeltaVo) {
+          deleteDeltaVo = new DeleteDeltaVo();
+          pageIdToBlockLetVo.put(blockletDetail.getBlockletKey(), deleteDeltaVo);
+        }
+        deleteDeltaVo.insertData(blockletDetail.getDeletedRows());
+      }
+    }
+    return pageIdToBlockLetVo;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index d4d16d0..dba92ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -17,9 +17,11 @@
 package org.apache.carbondata.core.scan.collector.impl;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -90,8 +92,6 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
     int[] surrogateResult;
     String[] noDictionaryKeys;
     byte[][] complexTypeKeyArray;
-    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
-        scannedResult.getDeleteDeltaDataCache();
     while (scannedResult.hasNext() && rowCounter < batchSize) {
       Object[] row = new Object[queryDimensions.length + queryMeasures.length];
       if (isDimensionExists) {
@@ -108,8 +108,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
       } else {
         scannedResult.incrementCounter();
       }
-      if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId(), scannedResult.getCurrentPageCounter())) {
+      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
         continue;
       }
       fillMeasureData(scannedResult, row);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 478dc8c..3e82257 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -20,7 +20,6 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.QueryMeasure;
@@ -54,15 +53,11 @@ public class RawBasedResultCollector extends AbstractScannedResultCollector {
   @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
-    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
-        scannedResult.getDeleteDeltaDataCache();
     // scan the record and add to list
     int rowCounter = 0;
     while (scannedResult.hasNext() && rowCounter < batchSize) {
       scanResultAndGetData(scannedResult);
-      if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId(),
-              scannedResult.getCurrentPageCounter())) {
+      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
         continue;
       }
       prepareRow(scannedResult, listBasedResult, queryMeasures);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 4fa1494..8f89760 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
@@ -50,8 +49,6 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
     int[] surrogateResult;
     String[] noDictionaryKeys;
     byte[][] complexTypeKeyArray;
-    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
-        scannedResult.getDeleteDeltaDataCache();
     Map<Integer, GenericQueryType> comlexDimensionInfoMap =
         tableBlockExecutionInfos.getComlexDimensionInfoMap();
     while (scannedResult.hasNext() && rowCounter < batchSize) {
@@ -80,9 +77,7 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe
       } else {
         scannedResult.incrementCounter();
       }
-      if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId(),
-              scannedResult.getCurrentPageCounter())) {
+      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
         continue;
       }
       fillMeasureData(scannedResult, row);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 2de74fa..479a684 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
@@ -152,15 +151,11 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
   @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
-    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
-        scannedResult.getDeleteDeltaDataCache();
     // scan the record and add to list
     int rowCounter = 0;
     while (scannedResult.hasNext() && rowCounter < batchSize) {
       scanResultAndGetData(scannedResult);
-      if (null != deleteDeltaDataCache && deleteDeltaDataCache
-          .contains(scannedResult.getCurrentRowId(),
-              scannedResult.getCurrentPageCounter())) {
+      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
         continue;
       }
       // re-fill dictionary and no dictionary key arrays for the newly added columns

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 2a5c342..ba7530d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -193,7 +193,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
           getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i),
               queryModel.getTableBlockInfos().get(i).getBlockletInfos().getStartBlockletNumber(),
               queryModel.getTableBlockInfos().get(i).getBlockletInfos().getNumberOfBlockletToScan(),
-              queryModel.getTableBlockInfos().get(i).getFilePath()));
+              queryModel.getTableBlockInfos().get(i).getFilePath(),
+              queryModel.getTableBlockInfos().get(i).getDeletedDeltaFilePath()));
     }
     if (null != queryModel.getStatisticsRecorder()) {
       QueryStatistic queryStatistic = new QueryStatistic();
@@ -214,7 +215,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    * @throws QueryExecutionException any failure during block info creation
    */
   protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
-      AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath)
+      AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath,
+      String[] deleteDeltaFiles)
       throws QueryExecutionException {
     BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
     SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
@@ -232,6 +234,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
             queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier()).getFactDir()
         .length() + 1;
     blockExecutionInfo.setBlockId(filePath.substring(tableFactPathLength));
+    blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
     blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
     blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
     blockExecutionInfo.setQueryDimensions(currentBlockQueryDimensions
@@ -360,8 +363,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // setting the no dictionary column block indexes
     blockExecutionInfo.setNoDictionaryBlockIndexes(ArrayUtils.toPrimitive(
         noDictionaryColumnBlockIndex.toArray(new Integer[noDictionaryColumnBlockIndex.size()])));
-    // setting column id to dictionary mapping
-    blockExecutionInfo.setColumnIdToDcitionaryMapping(queryProperties.columnToDictionayMapping);
     // setting each column value size
     blockExecutionInfo.setEachColumnValueSize(segmentProperties.getEachDimColumnValueSize());
     blockExecutionInfo.setComplexColumnParentBlockIndexes(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index b294b58..7d08dda 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -18,12 +18,12 @@ package org.apache.carbondata.core.scan.executor.infos;
 
 import java.util.Map;
 
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.model.QueryDimension;
@@ -101,12 +101,6 @@ public class BlockExecutionInfo {
   private int[] projectionListMeasureIndexes;
 
   /**
-   * this will be used to update the older block fixed length keys with the
-   * new block fixed length key
-   */
-  private KeyStructureInfo keyStructureInfo;
-
-  /**
    * first block from which query execution will start
    */
   private DataRefNode firstDataBlock;
@@ -146,12 +140,6 @@ public class BlockExecutionInfo {
   private Map<Integer, KeyStructureInfo> columnGroupToKeyStructureInfo;
 
   /**
-   * mapping of dictionary dimension to its dictionary mapping which will be
-   * used to get the actual data from dictionary for aggregation, sorting
-   */
-  private Map<String, Dictionary> columnIdToDcitionaryMapping;
-
-  /**
    * filter tree to execute the filter
    */
   private FilterExecuter filterExecuterTree;
@@ -230,6 +218,13 @@ public class BlockExecutionInfo {
    */
   private AbsoluteTableIdentifier absoluteTableIdentifier;
 
+  /**
+   * delete delta file path
+   */
+  private String[] deleteDeltaFilePath;
+
+  private Map<String, DeleteDeltaVo> deletedRecordsMap;
+
   public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
     return absoluteTableIdentifier;
   }
@@ -484,13 +479,6 @@ public class BlockExecutionInfo {
     this.columnGroupToKeyStructureInfo = columnGroupToKeyStructureInfo;
   }
 
-  /**
-   * @param columnIdToDcitionaryMapping the columnIdToDcitionaryMapping to set
-   */
-  public void setColumnIdToDcitionaryMapping(Map<String, Dictionary> columnIdToDcitionaryMapping) {
-    this.columnIdToDcitionaryMapping = columnIdToDcitionaryMapping;
-  }
-
   public boolean isRawRecordDetailQuery() {
     return isRawRecordDetailQuery;
   }
@@ -643,4 +631,32 @@ public class BlockExecutionInfo {
     this.projectionListMeasureIndexes = projectionListMeasureIndexes;
   }
 
+  /**
+   * @return delete delta files
+   */
+  public String[] getDeleteDeltaFilePath() {
+    return deleteDeltaFilePath;
+  }
+
+  /**
+   * set the delete delta files
+   * @param deleteDeltaFilePath
+   */
+  public void setDeleteDeltaFilePath(String[] deleteDeltaFilePath) {
+    this.deleteDeltaFilePath = deleteDeltaFilePath;
+  }
+
+  /**
+   * @return deleted record map
+   */
+  public Map<String, DeleteDeltaVo> getDeletedRecordsMap() {
+    return deletedRecordsMap;
+  }
+
+  /**
+   * @param deletedRecordsMap
+   */
+  public void setDeletedRecordsMap(Map<String, DeleteDeltaVo> deletedRecordsMap) {
+    this.deletedRecordsMap = deletedRecordsMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java
new file mode 100644
index 0000000..52fa529
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.executor.infos;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+
+/**
+ * class to hold information about delete delta files
+ */
+public class DeleteDeltaInfo {
+
+  /**
+   * delete delta files
+   */
+  private String[] deleteDeltaFile;
+
+  /**
+   * latest delete delta file timestamp
+   */
+  private long latestDeleteDeltaFileTimestamp;
+
+  public DeleteDeltaInfo(String[] deleteDeltaFile) {
+    this.deleteDeltaFile = deleteDeltaFile;
+    this.latestDeleteDeltaFileTimestamp =
+        CarbonUpdateUtil.getLatestDeleteDeltaTimestamp(deleteDeltaFile);
+  }
+
+  public String[] getDeleteDeltaFile() {
+    return deleteDeltaFile;
+  }
+
+  public long getLatestDeleteDeltaFileTimestamp() {
+    return latestDeleteDeltaFileTimestamp;
+  }
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Arrays.hashCode(deleteDeltaFile);
+    result =
+        prime * result + (int) (latestDeleteDeltaFileTimestamp ^ (latestDeleteDeltaFileTimestamp
+            >>> 32));
+    return result;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    DeleteDeltaInfo other = (DeleteDeltaInfo) obj;
+    if (!Arrays.equals(deleteDeltaFile, other.deleteDeltaFile)) {
+      return false;
+    }
+    if (latestDeleteDeltaFileTimestamp != other.latestDeleteDeltaFileTimestamp) {
+      return false;
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index 1dda1aa..c24b73c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -25,11 +25,13 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
+import org.apache.carbondata.core.mutate.TupleIdEnum;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -125,7 +127,20 @@ public abstract class AbstractScannedResult {
    */
   private int[] complexParentBlockIndexes;
 
-  protected BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache;
+  /**
+   * blockletid+pageumber to deleted reocrd map
+   */
+  private Map<String, DeleteDeltaVo> deletedRecordMap;
+
+  /**
+   * current page delete delta vo
+   */
+  private DeleteDeltaVo currentDeleteDeltaVo;
+
+  /**
+   * actual blocklet number
+   */
+  private String blockletNumber;
 
   public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) {
     this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
@@ -135,6 +150,7 @@ public abstract class AbstractScannedResult {
     this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap();
     this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes();
     this.totalDimensionsSize = blockExecutionInfo.getQueryDimensions().length;
+    this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap();
   }
 
   /**
@@ -393,6 +409,12 @@ public abstract class AbstractScannedResult {
    */
   public void setBlockletId(String blockletId) {
     this.blockletId = CarbonTablePath.getShortBlockId(blockletId);
+    blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID);
+    // if deleted recors map is present for this block
+    // then get the first page deleted vo
+    if (null != deletedRecordMap) {
+      currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter);
+    }
   }
 
   /**
@@ -457,6 +479,9 @@ public abstract class AbstractScannedResult {
       pageCounter++;
       rowCounter = 0;
       currentRow = -1;
+      if (null != deletedRecordMap) {
+        currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + pageCounter + "");
+      }
       return hasNext();
     }
     return false;
@@ -629,21 +654,6 @@ public abstract class AbstractScannedResult {
   public abstract String[] getNoDictionaryKeyStringArray();
 
   /**
-   * @return BlockletLevelDeleteDeltaDataCache.
-   */
-  public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
-    return blockletDeleteDeltaCache;
-  }
-
-  /**
-   * @param blockletDeleteDeltaCache
-   */
-  public void setBlockletDeleteDeltaCache(
-      BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache) {
-    this.blockletDeleteDeltaCache = blockletDeleteDeltaCache;
-  }
-
-  /**
    * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later.
    * @param columnarBatch
    * @param startRow
@@ -653,11 +663,11 @@ public abstract class AbstractScannedResult {
   public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
       int vectorOffset) {
     int rowsFiltered = 0;
-    if (blockletDeleteDeltaCache != null) {
+    if (currentDeleteDeltaVo != null) {
       int len = startRow + size;
       for (int i = startRow; i < len; i++) {
         int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i;
-        if (blockletDeleteDeltaCache.contains(rowId, pageCounter)) {
+        if (currentDeleteDeltaVo.containsRow(rowId)) {
           columnarBatch.markFiltered(vectorOffset);
           rowsFiltered++;
         }
@@ -666,4 +676,17 @@ public abstract class AbstractScannedResult {
     }
     return rowsFiltered;
   }
+
+  /**
+   * Below method will be used to check row got deleted
+   *
+   * @param rowId
+   * @return is present in deleted row
+   */
+  public boolean containsDeletedRow(int rowId) {
+    if (null != currentDeleteDeltaVo) {
+      return currentDeleteDeltaVo.containsRow(rowId);
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index a0823af..92e9594 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -18,6 +18,8 @@ package org.apache.carbondata.core.scan.result.iterator;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.common.CarbonIterator;
@@ -27,9 +29,13 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.DataRefNodeFinder;
 import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
+import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.executor.infos.DeleteDeltaInfo;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator;
 import org.apache.carbondata.core.scan.processor.impl.DataBlockIteratorImpl;
@@ -53,6 +59,9 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName());
 
+  private static final Map<DeleteDeltaInfo, Object> deleteDeltaToLockObjectMap =
+      new ConcurrentHashMap<>();
+
   protected ExecutorService execService;
   /**
    * execution info of the block
@@ -77,7 +86,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
   /**
    * queryStatisticsModel to store query statistics object
    */
-  QueryStatisticsModel queryStatisticsModel;
+  private QueryStatisticsModel queryStatisticsModel;
 
   public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
       ExecutorService execService) {
@@ -105,13 +114,24 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
 
   private void intialiseInfos() {
     for (BlockExecutionInfo blockInfo : blockExecutionInfos) {
-      DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
+      Map<String, DeleteDeltaVo> deletedRowsMap = null;
+      DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize(),
+          blockInfo.getDataBlock().getSegmentProperties().getNumberOfSortColumns(),
+          blockInfo.getDataBlock().getSegmentProperties().getNumberOfNoDictSortColumns());
+      // if delete delta file is present
+      if (null != blockInfo.getDeleteDeltaFilePath() && 0 != blockInfo
+          .getDeleteDeltaFilePath().length) {
+        DeleteDeltaInfo deleteDeltaInfo = new DeleteDeltaInfo(blockInfo.getDeleteDeltaFilePath());
+        // read and get the delete detail block details
+        deletedRowsMap = getDeleteDeltaDetails(blockInfo.getDataBlock(), deleteDeltaInfo);
+        // set the deleted row to block execution info
+        blockInfo.setDeletedRecordsMap(deletedRowsMap);
+      }
       DataRefNode startDataBlock = finder
           .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
       while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
         startDataBlock = startDataBlock.getNextDataRefNode();
       }
-
       long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
       //if number of block is less than 0 then take end block.
       if (numberOfBlockToScan <= 0) {
@@ -124,6 +144,83 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
     }
   }
 
+  /**
+   * Below method will be used to get the delete delta rows for a block
+   *
+   * @param dataBlock       data block
+   * @param deleteDeltaInfo delete delta info
+   * @return blockid+pageid to deleted row mapping
+   */
+  private Map<String, DeleteDeltaVo> getDeleteDeltaDetails(AbstractIndex dataBlock,
+      DeleteDeltaInfo deleteDeltaInfo) {
+    // if datablock deleted delta timestamp is more then the current delete delta files timestamp
+    // then return the current deleted rows
+    if (dataBlock.getDeleteDeltaTimestamp() >= deleteDeltaInfo
+        .getLatestDeleteDeltaFileTimestamp()) {
+      return dataBlock.getDeletedRowsMap();
+    }
+    CarbonDeleteFilesDataReader carbonDeleteDeltaFileReader = null;
+    // get the lock object so in case of concurrent query only one task will read the delete delta
+    // files other tasks will wait
+    Object lockObject = deleteDeltaToLockObjectMap.get(deleteDeltaInfo);
+    // if lock object is null then add a lock object
+    if (null == lockObject) {
+      synchronized (deleteDeltaToLockObjectMap) {
+        // double checking
+        lockObject = deleteDeltaToLockObjectMap.get(deleteDeltaInfo);
+        if (null == lockObject) {
+          lockObject = new Object();
+          deleteDeltaToLockObjectMap.put(deleteDeltaInfo, lockObject);
+        }
+      }
+    }
+    // double checking to check the deleted rows is already present or not
+    if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp()) {
+      // if not then acquire the lock
+      synchronized (lockObject) {
+        // check the timestamp again
+        if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo
+            .getLatestDeleteDeltaFileTimestamp()) {
+          // read the delete delta files
+          carbonDeleteDeltaFileReader = new CarbonDeleteFilesDataReader();
+          Map<String, DeleteDeltaVo> deletedRowsMap = carbonDeleteDeltaFileReader
+              .getDeletedRowsDataVo(deleteDeltaInfo.getDeleteDeltaFile());
+          setDeltedDeltaBoToDataBlock(deleteDeltaInfo, deletedRowsMap, dataBlock);
+          // remove the lock
+          deleteDeltaToLockObjectMap.remove(deleteDeltaInfo);
+          return deletedRowsMap;
+        } else {
+          return dataBlock.getDeletedRowsMap();
+        }
+      }
+    } else {
+      return dataBlock.getDeletedRowsMap();
+    }
+  }
+
+  /**
+   * Below method will be used to set deleted records map to data block
+   * based on latest delta file timestamp
+   *
+   * @param deleteDeltaInfo
+   * @param deletedRecordsMap
+   * @param dataBlock
+   */
+  private void setDeltedDeltaBoToDataBlock(DeleteDeltaInfo deleteDeltaInfo,
+      Map<String, DeleteDeltaVo> deletedRecordsMap, AbstractIndex dataBlock) {
+    // check if timestamp of data block is less than the latest delete delta timestamp
+    // then update the delete delta details and timestamp in data block
+    if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp()) {
+      synchronized (dataBlock) {
+        if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo
+            .getLatestDeleteDeltaFileTimestamp()) {
+          dataBlock.setDeletedRowsMap(deletedRecordsMap);
+          dataBlock.setDeleteDeltaTimestamp(deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp());
+        }
+      }
+    }
+  }
+
   @Override public boolean hasNext() {
     if ((dataBlockIterator != null && dataBlockIterator.hasNext())) {
       return true;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index 0fb9782..f3d1336 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -23,8 +23,6 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
-import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -114,13 +112,6 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
       }
     }
     scannedResult.setNumberOfRows(numberOfRows);
-    // loading delete data cache in blockexecutioninfo instance
-    DeleteDeltaCacheLoaderIntf deleteCacheLoader =
-        new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
-            blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier());
-    deleteCacheLoader.loadDeleteDeltaFileDataToCache();
-    scannedResult
-        .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
     scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
     // adding statistics for carbon scan time
     QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index 8f14b85..e710e40 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -26,8 +26,6 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
-import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
@@ -198,17 +196,9 @@ public class FilterScanner extends AbstractBlockletScanner {
         indexesGroup[k] = indexes;
       }
     }
-    // loading delete data cache in blockexecutioninfo instance
-    DeleteDeltaCacheLoaderIntf deleteCacheLoader =
-        new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
-            blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier());
-    deleteCacheLoader.loadDeleteDeltaFileDataToCache();
-    scannedResult
-        .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
     FileHolder fileReader = blocksChunkHolder.getFileReader();
     int[][] allSelectedDimensionBlocksIndexes =
         blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
-
     long dimensionReadTime = System.currentTimeMillis();
     DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
         .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 6fab563..5e6e8de 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -261,7 +261,22 @@ public class SegmentUpdateStatusManager {
     return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId);
   }
 
-
+  /**
+   * Below method will be used to get all the delete delta files based on block name
+   *
+   * @param blockFilePath actual block filePath
+   * @return all delete delta files
+   * @throws Exception
+   */
+  public String[] getDeleteDeltaFilePath(String blockFilePath) throws Exception {
+    int tableFactPathLength = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier()).getFactDir().length() + 1;
+    String blockame = blockFilePath.substring(tableFactPathLength);
+    String tupleId = CarbonTablePath.getShortBlockId(blockame);
+    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+        .toArray(new String[0]);
+  }
 
   /**
    * Returns all delta file paths of specified block
@@ -291,11 +306,8 @@ public class SegmentUpdateStatusManager {
       //blockName without timestamp
       final String blockNameFromTuple =
           blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
-      SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
-          readLoadMetadata();
-      return getDeltaFiles(file, blockNameFromTuple, listOfSegmentUpdateDetailsArray, extension,
+      return getDeltaFiles(file, blockNameFromTuple, extension,
           segment);
-
     } catch (Exception ex) {
       String errorMsg = "Invalid tuple id " + tupleId;
       LOG.error(errorMsg);
@@ -345,12 +357,11 @@ public class SegmentUpdateStatusManager {
    * @param extension
    * @return
    */
-  public List<String> getDeltaFiles(CarbonFile blockDir, final String blockNameFromTuple,
-      SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray,
+  private List<String> getDeltaFiles(CarbonFile blockDir, final String blockNameFromTuple,
       final String extension,
       String segment) {
-    List<String> deleteFileList = null;
-    for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
+    List<String> deleteFileList = new ArrayList<>();
+    for (SegmentUpdateDetails block : updateDetails) {
       if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName()
           .equalsIgnoreCase(segment) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
         final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
index c66398c..982fb50 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
@@ -62,7 +62,7 @@ public class SegmentTaskIndexStoreTest {
         <TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
             createCache(CacheType.DRIVER_BTREE, "");
     tableBlockInfo = new TableBlockInfo("file", 0L, "SG100", locations, 10L,
-        ColumnarFormatVersion.valueOf(version));
+        ColumnarFormatVersion.valueOf(version), null);
     absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp",
         new CarbonTableIdentifier("testdatabase", "testtable", "TB100"));
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java
index 08c22ec..1b7f106 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java
@@ -27,7 +27,7 @@ public class BlockInfoTest {
   static BlockInfo blockInfo;
 
   @BeforeClass public static void setup() {
-    blockInfo = new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1));
+    blockInfo = new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null));
   }
 
   @Test public void hashCodeTest() {
@@ -43,7 +43,7 @@ public class BlockInfoTest {
 
   @Test public void equalsTestWithSimilarObject() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1));
+        new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (res);
   }
@@ -60,28 +60,28 @@ public class BlockInfoTest {
 
   @Test public void equalsTestWithDifferentSegmentId() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1));
+        new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1, null));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDifferentOffset() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("/filePath.carbondata", 62, "segmentId", null, 6, ColumnarFormatVersion.V1));
+        new BlockInfo(new TableBlockInfo("/filePath.carbondata", 62, "segmentId", null, 6, ColumnarFormatVersion.V1, null));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDifferentBlockLength() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1));
+        new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1, null));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffFilePath() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("/diffFilePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1));
+        new BlockInfo(new TableBlockInfo("/diffFilePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1, null));
     Boolean res = blockInfoTest.equals(blockInfo);
     assert (!res);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
index 840287e..f4553a6 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
@@ -33,8 +33,8 @@ public class TableBlockInfoTest {
   static TableBlockInfo tableBlockInfos;
 
   @BeforeClass public static void setup() {
-    tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1);
-    tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
+    tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1, null);
+    tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1, null);
   }
 
   @Test public void equalTestWithSameObject() {
@@ -43,7 +43,7 @@ public class TableBlockInfoTest {
   }
 
   @Test public void equalTestWithSimilarObject() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1, null);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (res);
   }
@@ -59,52 +59,52 @@ public class TableBlockInfoTest {
   }
 
   @Test public void equlsTestWithDiffSegmentId() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, ColumnarFormatVersion.V1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, ColumnarFormatVersion.V1, null);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equlsTestWithDiffBlockOffset() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffBlockLength() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, ColumnarFormatVersion.V1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, ColumnarFormatVersion.V1, null);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffBlockletNumber() {
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+        new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
     Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffFilePath() {
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+        new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
     Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void compareToTestForSegmentId() {
     TableBlockInfo tableBlockInfo =
-        new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+        new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedResult = 2;
     assertEquals(res, expectedResult);
 
     TableBlockInfo tableBlockInfo1 =
-        new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+        new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedResult1 = -1;
     assertEquals(res1, expectedResult1);
 
     TableBlockInfo tableBlockInfo2 =
-        new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+        new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
     int res2 = tableBlockInfos.compareTo(tableBlockInfo2);
     int expectedresult2 = 1;
     assertEquals(res2, expectedresult2);
@@ -129,18 +129,18 @@ public class TableBlockInfoTest {
 
     };
 
-    TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1);
+    TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1, null);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedResult = 7;
     assertEquals(res, expectedResult);
 
-    TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1);
+    TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1, null);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedResult1 = 1;
     assertEquals(res1, expectedResult1);
 
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
+        new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1, null);
     int res2 = tableBlockInfos.compareTo(tableBlockInfoTest);
     int expectedResult2 = -1;
     assertEquals(res2, expectedResult2);
@@ -148,13 +148,13 @@ public class TableBlockInfoTest {
 
   @Test public void compareToTestWithStartBlockletNo() {
     TableBlockInfo tableBlockInfo =
-        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
+        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedresult =-1;
     assertEquals(res, expectedresult);
 
     TableBlockInfo tableBlockInfo1 =
-        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), ColumnarFormatVersion.V1);
+        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), ColumnarFormatVersion.V1, null);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedresult1 = 1;
     assertEquals(res1, expectedresult1);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
index 52c56d3..ccc7af6 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java
@@ -33,10 +33,10 @@ public class TableTaskInfoTest {
     tableBlockInfoList = new ArrayList<>(5);
 
     String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1));
+    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1, null));
 
     String[] locs = { "loc4", "loc5" };
-    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, ColumnarFormatVersion.V1));
+    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, ColumnarFormatVersion.V1, null));
 
     tableTaskInfo = new TableTaskInfo("taskId", tableBlockInfoList);
   }
@@ -67,10 +67,10 @@ public class TableTaskInfoTest {
     List<TableBlockInfo> tableBlockInfoListTest = new ArrayList<>();
 
     String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1));
+    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1, null));
 
     String[] locations1 = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, ColumnarFormatVersion.V1));
+    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, ColumnarFormatVersion.V1, null));
 
     List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoListTest);
     assert (res.equals(locs));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 9adf4d4..badf63e 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -516,7 +516,7 @@ public class CarbonUtilTest {
       }
     };
     TableBlockInfo info =
-        new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
+        new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null);
 
     assertEquals(CarbonUtil.readMetadatFile(info).getVersionId().number(), 1);
   }
@@ -525,7 +525,7 @@ public class CarbonUtilTest {
   public void testToReadMetadatFileWithException()
       throws Exception {
     TableBlockInfo info =
-        new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
+        new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null);
     CarbonUtil.readMetadatFile(info);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index 83c7fa4..8161fae 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -142,12 +142,14 @@ public class DataFileFooterConverterTest {
       }
     };
     String[] arr = { "a", "b", "c" };
-    TableBlockInfo tableBlockInfo = new TableBlockInfo("/file.carbondata", 3, "id", arr, 3, ColumnarFormatVersion.V1);
+    String fileName = "/part-0-0_batchno0-0-1495074251740.carbondata";
+    TableBlockInfo tableBlockInfo = new TableBlockInfo(fileName, 3, "id", arr, 3, ColumnarFormatVersion.V1, null);
     tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3);
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
     tableBlockInfoList.add(tableBlockInfo);
+    String idxFileName = "0_batchno0-0-1495074251740.carbonindex";
     List<DataFileFooter> dataFileFooterList =
-        dataFileFooterConverter.getIndexInfo("indexfile", tableBlockInfoList);
+        dataFileFooterConverter.getIndexInfo(idxFileName, tableBlockInfoList);
     byte[] exp = dataFileFooterList.get(0).getBlockletIndex().getBtreeIndex().getStartKey();
     byte[] res = "1".getBytes();
     for (int i = 0; i < exp.length; i++) {
@@ -244,7 +246,7 @@ public class DataFileFooterConverterTest {
     segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols());
     dataFileFooter.setNumberOfRows(3);
     dataFileFooter.setSegmentInfo(segmentInfo);
-    TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
+    TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null);
     DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info);
     assertEquals(result.getNumberOfRows(), 3);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index cda34e4..5d9bbe7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -323,10 +323,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
             updateStatusManager)) {
           continue;
         }
+        String[] deleteDeltaFilePath = null;
+        try {
+          deleteDeltaFilePath =
+              updateStatusManager.getDeleteDeltaFilePath(tableBlockInfo.getFilePath());
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
         result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()),
             tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
             tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
-            tableBlockInfo.getVersion()));
+            tableBlockInfo.getVersion(), deleteDeltaFilePath));
       }
     }
     return result;
@@ -429,7 +436,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
             new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
                 tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(),
                 carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(),
-                carbonInputSplit.getBlockStorageIdMap()));
+                carbonInputSplit.getBlockStorageIdMap(), carbonInputSplit.getDeleteDeltaFiles()));
       }
     }
     return tableBlockInfoList;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 08661a2..631bc2c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -72,6 +72,11 @@ public class CarbonInputSplit extends FileSplit
 
   private List<UpdateVO> invalidTimestampsList;
 
+  /**
+   * list of delete delta files for split
+   */
+  private String[] deleteDeltaFiles;
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -82,7 +87,7 @@ public class CarbonInputSplit extends FileSplit
   }
 
   private CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
-      ColumnarFormatVersion version) {
+      ColumnarFormatVersion version, String[] deleteDeltaFiles) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
     String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -93,11 +98,12 @@ public class CarbonInputSplit extends FileSplit
     this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
     this.invalidSegments = new ArrayList<>();
     this.version = version;
+    this.deleteDeltaFiles = deleteDeltaFiles;
   }
 
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
-      int numberOfBlocklets, ColumnarFormatVersion version) {
-    this(segmentId, path, start, length, locations, version);
+      int numberOfBlocklets, ColumnarFormatVersion version, String[] deleteDeltaFiles) {
+    this(segmentId, path, start, length, locations, version, deleteDeltaFiles);
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
@@ -113,8 +119,9 @@ public class CarbonInputSplit extends FileSplit
    * @param blockStorageIdMap
    */
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
-      int numberOfBlocklets, ColumnarFormatVersion version, Map<String, String> blockStorageIdMap) {
-    this(segmentId, path, start, length, locations, numberOfBlocklets, version);
+      int numberOfBlocklets, ColumnarFormatVersion version, Map<String, String> blockStorageIdMap,
+      String[] deleteDeltaFiles) {
+    this(segmentId, path, start, length, locations, numberOfBlocklets, version, deleteDeltaFiles);
     this.blockStorageIdMap = blockStorageIdMap;
   }
 
@@ -122,7 +129,7 @@ public class CarbonInputSplit extends FileSplit
       ColumnarFormatVersion version)
       throws IOException {
     return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
-        split.getLocations(), version);
+        split.getLocations(), version, null);
   }
 
   public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
@@ -133,7 +140,8 @@ public class CarbonInputSplit extends FileSplit
       try {
         tableBlockInfoList.add(
             new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
-                split.getLocations(), split.getLength(), blockletInfos, split.getVersion()));
+                split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
+                split.getDeleteDeltaFiles()));
       } catch (IOException e) {
         throw new RuntimeException("fail to get location of split: " + split, e);
       }
@@ -147,7 +155,7 @@ public class CarbonInputSplit extends FileSplit
     try {
       return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
           inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
-          blockletInfos, inputSplit.getVersion());
+          blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
     } catch (IOException e) {
       throw new RuntimeException("fail to get location of split: " + inputSplit, e);
     }
@@ -167,6 +175,11 @@ public class CarbonInputSplit extends FileSplit
     for (int i = 0; i < numInvalidSegment; i++) {
       invalidSegments.add(in.readUTF());
     }
+    int numberOfDeleteDeltaFiles = in.readInt();
+    deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
+    for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
+      deleteDeltaFiles[i] = in.readUTF();
+    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -178,6 +191,12 @@ public class CarbonInputSplit extends FileSplit
     for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
     }
+    out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
+    if (null != deleteDeltaFiles) {
+      for (int i = 0; i < deleteDeltaFiles.length; i++) {
+        out.writeUTF(deleteDeltaFiles[i]);
+      }
+    }
   }
 
   public List<String> getInvalidSegments() {
@@ -287,4 +306,8 @@ public class CarbonInputSplit extends FileSplit
   public Map<String, String> getBlockStorageIdMap() {
     return blockStorageIdMap;
   }
+
+  public String[] getDeleteDeltaFiles() {
+    return deleteDeltaFiles;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index 7ba6133..f9dc178 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -90,7 +90,7 @@ class InMemoryBTreeIndex implements Index {
       result.add(new CarbonInputSplit(segment.getId(), new Path(tableBlockInfo.getFilePath()),
           tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
           tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
-          tableBlockInfo.getVersion()));
+          tableBlockInfo.getVersion(), null));
     }
     return result;
   }
@@ -142,7 +142,8 @@ class InMemoryBTreeIndex implements Index {
       tableBlockInfoList.add(
           new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
               segment.getId(), carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
-              blockletInfos, carbonInputSplit.getVersion()));
+              blockletInfos, carbonInputSplit.getVersion(),
+              carbonInputSplit.getDeleteDeltaFiles()));
     }
     return tableBlockInfoList;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 4ebbf60..2898870 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -300,7 +300,8 @@ class CarbonMergerRDD[K, V](
       carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
         val blockInfo = new TableBlockInfo(entry.getPath.toString,
           entry.getStart, entry.getSegmentId,
-          entry.getLocations, entry.getLength, entry.getVersion
+          entry.getLocations, entry.getLength, entry.getVersion,
+          updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
         )
         !CarbonUtil
           .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3d2e35b..dfea7d7 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -564,7 +564,7 @@ object CarbonDataRDDFactory {
             val fileSplit = inputSplit.asInstanceOf[FileSplit]
             new TableBlockInfo(fileSplit.getPath.toString,
               fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1
+              fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
             ).asInstanceOf[Distributable]
           }
           // group blocks to nodes, tasks

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cab78fe..96a8062 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -577,7 +577,7 @@ object CarbonDataRDDFactory {
             val fileSplit = inputSplit.asInstanceOf[FileSplit]
             new TableBlockInfo(fileSplit.getPath.toString,
               fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1
+              fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
             ).asInstanceOf[Distributable]
           }
           // group blocks to nodes, tasks


Mime
View raw message