carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1364] Added the blocklet info to index file and make the datamap distributable with job
Date Wed, 30 Aug 2017 07:25:59 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 03c1774f7 -> 1e21cd1cf


[CARBONDATA-1364] Added the blocklet info to index file and make the datamap distributable with job

1.Added the blocklet info to the carbonindex file, so datamap not required to read each carbondata file footer to the blocklet information. This makes the datamap loading faster.

2. Made the data map distributable and added the spark job. So datamap pruning could happen distributable and pruned blocklet list would be sent to driver.

This closes #1179


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1e21cd1c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1e21cd1c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1e21cd1c

Branch: refs/heads/master
Commit: 1e21cd1cfbaaf618457d3dcc02fea9f8b67f8d95
Parents: 03c1774
Author: Ravindra Pesala <ravi.pesala@gmail.com>
Authored: Tue Aug 29 23:31:13 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Aug 30 15:25:21 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../core/datamap/DataMapDistributable.java      |  38 ++++-
 .../core/datamap/DataMapStoreManager.java       |  16 ++-
 .../carbondata/core/datamap/TableDataMap.java   |  24 +++-
 .../carbondata/core/datamap/dev/DataMap.java    |   8 +-
 .../core/datamap/dev/DataMapFactory.java        |   6 +
 .../carbondata/core/indexstore/Blocklet.java    |  10 +-
 .../blockletindex/BlockletDataMap.java          |  34 +++--
 .../BlockletDataMapDistributable.java           |  41 ++++++
 .../blockletindex/BlockletDataMapFactory.java   |  58 ++++++--
 .../util/AbstractDataFileFooterConverter.java   |  16 ++-
 .../core/util/CarbonMetadataUtil.java           |  14 ++
 .../apache/carbondata/core/util/CarbonUtil.java |  27 ++++
 .../core/util/DataFileFooterConverterV3.java    |  31 +---
 .../hadoop/api/CarbonTableInputFormat.java      |  48 +++++--
 .../carbondata/hadoop/api/DataMapJob.java       |  33 +++++
 .../hadoop/api/DistributableDataMapFormat.java  | 143 +++++++++++++++++++
 .../sdv/generated/QueriesBasicTestCase.scala    |  13 +-
 .../QueriesIncludeDictionaryTestCase.scala      |   4 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |  13 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   5 +
 .../carbondata/spark/rdd/SparkDataMapJob.scala  | 113 +++++++++++++++
 .../processing/merger/CarbonDataMergerUtil.java |  62 --------
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  78 ++++------
 24 files changed, 622 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 944abc4..6c116a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1347,6 +1347,10 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_UPDATE_SEGMENT_PARALLELISM_DEFAULT = "1";
 
+  public static final String USE_DISTRIBUTED_DATAMAP = "carbon.enable.distributed.datamap";
+
+  public static final String USE_DISTRIBUTED_DATAMAP_DEFAULT = "false";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
index 517f629..50af789 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -16,12 +16,18 @@
  */
 package org.apache.carbondata.core.datamap;
 
+import java.io.IOException;
+import java.io.Serializable;
+
 import org.apache.carbondata.core.datastore.block.Distributable;
 
+import org.apache.hadoop.mapreduce.InputSplit;
+
 /**
  * Distributable class for datamap.
  */
-public abstract class DataMapDistributable implements Distributable {
+public abstract class DataMapDistributable extends InputSplit
+    implements Distributable, Serializable {
 
   private String tablePath;
 
@@ -29,6 +35,10 @@ public abstract class DataMapDistributable implements Distributable {
 
   private String dataMapName;
 
+  private String[] locations;
+
+  private String dataMapFactoryClass;
+
   public String getTablePath() {
     return tablePath;
   }
@@ -53,4 +63,30 @@ public abstract class DataMapDistributable implements Distributable {
     this.dataMapName = dataMapName;
   }
 
+  public String getDataMapFactoryClass() {
+    return dataMapFactoryClass;
+  }
+
+  public void setDataMapFactoryClass(String dataMapFactoryClass) {
+    this.dataMapFactoryClass = dataMapFactoryClass;
+  }
+
+  public void setLocations(String[] locations) {
+    this.locations = locations;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return locations;
+  }
+
+  @Override
+  public int compareTo(Distributable o) {
+    return 0;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 54318b5..36f7662 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -56,15 +56,15 @@ public final class DataMapStoreManager {
    * @param factoryClass
    * @return
    */
-  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName,
-      Class<? extends DataMapFactory> factoryClass) {
+  public synchronized TableDataMap getDataMap(AbsoluteTableIdentifier identifier,
+      String dataMapName, String factoryClass) {
     String table = identifier.uniqueName();
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
     TableDataMap dataMap;
     if (tableDataMaps == null) {
       dataMap = createAndRegisterDataMap(identifier, factoryClass, dataMapName);
     } else {
-      dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+      dataMap = getTableDataMap(dataMapName, tableDataMaps);
     }
     if (dataMap == null) {
       throw new RuntimeException("Datamap does not exist");
@@ -77,22 +77,24 @@ public final class DataMapStoreManager {
    * The datamap is created using datamap name, datamap factory class and table identifier.
    */
   public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
-      Class<? extends DataMapFactory> factoryClass, String dataMapName) {
+      String factoryClassName, String dataMapName) {
     String table = identifier.uniqueName();
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
     if (tableDataMaps == null) {
       tableDataMaps = new ArrayList<>();
       allDataMaps.put(table, tableDataMaps);
     }
-    TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps);
+    TableDataMap dataMap = getTableDataMap(dataMapName, tableDataMaps);
     if (dataMap != null) {
       throw new RuntimeException("Already datamap exists in that path with type " + dataMapName);
     }
 
     try {
+      Class<? extends DataMapFactory> factoryClass =
+          (Class<? extends DataMapFactory>) Class.forName(factoryClassName);
       DataMapFactory dataMapFactory = factoryClass.newInstance();
       dataMapFactory.init(identifier, dataMapName);
-      dataMap = new TableDataMap(dataMapName, dataMapFactory);
+      dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory);
     } catch (Exception e) {
       LOGGER.error(e);
       throw new RuntimeException(e);
@@ -101,7 +103,7 @@ public final class DataMapStoreManager {
     return dataMap;
   }
 
-  private TableDataMap getAbstractTableDataMap(String dataMapName,
+  private TableDataMap getTableDataMap(String dataMapName,
       List<TableDataMap> tableDataMaps) {
     TableDataMap dataMap = null;
     for (TableDataMap tableDataMap: tableDataMaps) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 5571538..66bb257 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.events.ChangeEvent;
 import org.apache.carbondata.core.events.EventListener;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -33,6 +34,8 @@ import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
  */
 public final class TableDataMap implements EventListener {
 
+  private AbsoluteTableIdentifier identifier;
+
   private String dataMapName;
 
   private DataMapFactory dataMapFactory;
@@ -40,8 +43,9 @@ public final class TableDataMap implements EventListener {
   /**
    * It is called to initialize and load the required table datamap metadata.
    */
-  public TableDataMap(String dataMapName,
-      DataMapFactory dataMapFactory) {
+  public TableDataMap(AbsoluteTableIdentifier identifier,
+      String dataMapName, DataMapFactory dataMapFactory) {
+    this.identifier = identifier;
     this.dataMapName = dataMapName;
     this.dataMapFactory = dataMapFactory;
   }
@@ -83,10 +87,14 @@ public final class TableDataMap implements EventListener {
   public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException {
     List<DataMapDistributable> distributables = new ArrayList<>();
     for (String segmentsId : segmentIds) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId);
-      for (DataMap dataMap : dataMaps) {
-        distributables.add(dataMap.toDistributable());
+      List<DataMapDistributable> list = dataMapFactory.toDistributable(segmentsId);
+      for (DataMapDistributable distributable: list) {
+        distributable.setDataMapName(dataMapName);
+        distributable.setSegmentId(segmentsId);
+        distributable.setTablePath(identifier.getTablePath());
+        distributable.setDataMapFactoryClass(dataMapFactory.getClass().getName());
       }
+      distributables.addAll(list);
     }
     return distributables;
   }
@@ -100,7 +108,11 @@ public final class TableDataMap implements EventListener {
    * @return
    */
   public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) {
-    return dataMapFactory.getDataMap(distributable).prune(filterExp);
+    List<Blocklet> blocklets = dataMapFactory.getDataMap(distributable).prune(filterExp);
+    for (Blocklet blocklet: blocklets) {
+      blocklet.setSegmentId(distributable.getSegmentId());
+    }
+    return blocklets;
   }
 
   @Override public void fireEvent(ChangeEvent event) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 526572a..f6ea885 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.core.datamap.dev;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -32,7 +31,7 @@ public interface DataMap {
   /**
    * It is called to load the data map to memory or to initialize it.
    */
-  void init(String path) throws MemoryException, IOException;
+  void init(String filePath) throws MemoryException, IOException;
 
   /**
    * Prune the datamap with filter expression. It returns the list of
@@ -43,11 +42,6 @@ public interface DataMap {
    */
   List<Blocklet> prune(FilterResolverIntf filterExp);
 
-  /**
-   * Convert datamap to distributable object
-   * @return
-   */
-  DataMapDistributable toDistributable();
 
   /**
    * Clear complete index table and release memory.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 873457c..9796a77 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -51,6 +51,12 @@ public interface DataMapFactory {
   DataMap getDataMap(DataMapDistributable distributable);
 
   /**
+   * Get all distributable objects of a segmentid
+   * @return
+   */
+  List<DataMapDistributable> toDistributable(String segmentId);
+
+  /**
    *
    * @param event
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index c3a72f0..919a48d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -48,8 +48,8 @@ public class Blocklet implements Serializable {
     this.blockletId = blockletId;
   }
 
-  public Path getPath() {
-    return new Path(path);
+  public String getPath() {
+    return path;
   }
 
   public String getBlockletId() {
@@ -65,9 +65,9 @@ public class Blocklet implements Serializable {
   }
 
   public void updateLocations() throws IOException {
-    Path fspath = new Path(path);
-    FileSystem fs = fspath.getFileSystem(FileFactory.getConfiguration());
-    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(fspath);
+    Path path = new Path(this.path);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
     LocatedFileStatus fileStatus = iter.next();
     location = fileStatus.getBlockLocations()[0].getHosts();
     length = fileStatus.getLen();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 928c78b..bd7a1d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -91,9 +90,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private int[] columnCardinality;
 
-  @Override public void init(String path) throws IOException, MemoryException {
+  @Override
+  public void init(String filePath) throws IOException, MemoryException {
+    long startTime = System.currentTimeMillis();
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path);
+    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(filePath);
     for (DataFileFooter fileFooter : indexInfo) {
       List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
       if (segmentProperties == null) {
@@ -102,13 +103,19 @@ public class BlockletDataMap implements DataMap, Cacheable {
         createSchema(segmentProperties);
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
-      fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+      if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() == 0) {
+        LOGGER
+            .info("Reading carbondata file footer to get blocklet info " + blockInfo.getFilePath());
+        fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+      }
 
       loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
     }
     if (unsafeMemoryDMStore != null) {
       unsafeMemoryDMStore.finishWriting();
     }
+    LOGGER.info("Time taken to load blocklet datamap from file : " + filePath + "is " +
+        (System.currentTimeMillis() - startTime));
   }
 
   private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties,
@@ -215,7 +222,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
         new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));
   }
 
-  @Override public List<Blocklet> prune(FilterResolverIntf filterExp) {
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp) {
 
     // getting the start and end index key based on filter for hitting the
     // selected block reference nodes based on filter resolver tree.
@@ -419,26 +427,26 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return dataMapRow;
   }
 
-  @Override public void clear() {
+  @Override
+  public void clear() {
     unsafeMemoryDMStore.freeMemory();
     unsafeMemoryDMStore = null;
     segmentProperties = null;
   }
 
-  @Override public long getFileTimeStamp() {
+  @Override
+  public long getFileTimeStamp() {
     return 0;
   }
 
-  @Override public int getAccessCount() {
+  @Override
+  public int getAccessCount() {
     return 0;
   }
 
-  @Override public long getMemorySize() {
+  @Override
+  public long getMemorySize() {
     return unsafeMemoryDMStore.getMemoryUsed();
   }
 
-  @Override public DataMapDistributable toDistributable() {
-    // TODO
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
new file mode 100644
index 0000000..63f45b5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+
+/**
+ * This class contains required information to make the Blocklet datamap distributable.
+ * Each distributable object can represents one datamap.
+ * Using this object job like spark/MR can be launched and execute each distributable object as
+ * one datamap task.
+ */
+public class BlockletDataMapDistributable extends DataMapDistributable {
+
+  /**
+   * Relative file path from the segment folder.
+   */
+  private String filePath;
+
+  public BlockletDataMapDistributable(String indexFileName) {
+    this.filePath = indexFileName;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index e189931..d6ddfa6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -37,6 +37,11 @@ import org.apache.carbondata.core.events.ChangeEvent;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
 /**
  * Table map for blocklet
  */
@@ -67,14 +72,7 @@ public class BlockletDataMapFactory implements DataMapFactory {
         segmentMap.get(segmentId);
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = new ArrayList<>();
-      String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId;
-      FileFactory.FileType fileType = FileFactory.getFileType(path);
-      CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
-      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(".carbonindex");
-        }
-      });
+      CarbonFile[] listFiles = getCarbonIndexFiles(segmentId);
       for (int i = 0; i < listFiles.length; i++) {
         tableBlockIndexUniqueIdentifiers.add(
             new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName()));
@@ -84,6 +82,38 @@ public class BlockletDataMapFactory implements DataMapFactory {
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
   }
 
+  private CarbonFile[] getCarbonIndexFiles(String segmentId) {
+    String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId;
+    CarbonFile carbonFile = FileFactory.getCarbonFile(path);
+    return carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(".carbonindex");
+      }
+    });
+  }
+
+  @Override
+  public List<DataMapDistributable> toDistributable(String segmentId) {
+    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentId);
+    List<DataMapDistributable> distributables = new ArrayList<>();
+    for (int i = 0; i < carbonIndexFiles.length; i++) {
+      Path path = new Path(carbonIndexFiles[i].getPath());
+      try {
+        FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+        LocatedFileStatus fileStatus = iter.next();
+        String[] location = fileStatus.getBlockLocations()[0].getHosts();
+        BlockletDataMapDistributable distributable =
+            new BlockletDataMapDistributable(path.getName());
+        distributable.setLocations(location);
+        distributables.add(distributable);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return distributables;
+  }
+
   @Override
   public void clear(String segmentId) {
     List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
@@ -105,7 +135,17 @@ public class BlockletDataMapFactory implements DataMapFactory {
 
   @Override
   public DataMap getDataMap(DataMapDistributable distributable) {
-    return null;
+    BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
+    TableBlockIndexUniqueIdentifier uniqueIdentifier =
+        new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
+            mapDistributable.getFilePath());
+    DataMap dataMap;
+    try {
+      dataMap = cache.get(uniqueIdentifier);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return dataMap;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index f4f2693..19ead44 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.datastore.block.BlockInfo;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
 import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
@@ -150,8 +151,9 @@ public abstract class AbstractDataFileFooterConverter {
         dataFileFooter = new DataFileFooter();
         TableBlockInfo tableBlockInfo = new TableBlockInfo();
         tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
-        tableBlockInfo.setVersion(
-            ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion()));
+        ColumnarFormatVersion version =
+            ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion());
+        tableBlockInfo.setVersion(version);
         int blockletSize = getBlockletSize(readBlockIndexInfo);
         tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
         tableBlockInfo.setFilePath(parentPath + "/" + readBlockIndexInfo.file_name);
@@ -160,6 +162,16 @@ public abstract class AbstractDataFileFooterConverter {
         dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
         dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
         dataFileFooter.setSegmentInfo(segmentInfo);
+        dataFileFooter.setVersionId(version);
+        if (readBlockIndexInfo.isSetBlocklet_info()) {
+          List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
+          BlockletInfo blockletInfo = new DataFileFooterConverterV3()
+              .getBlockletInfo(readBlockIndexInfo.getBlocklet_info(),
+                  CarbonUtil.getNumberOfDimensionColumns(columnSchemaList));
+          blockletInfo.setBlockletIndex(blockletIndex);
+          blockletInfoList.add(blockletInfo);
+          dataFileFooter.setBlockletList(blockletInfoList);
+        }
         dataFileFooters.add(dataFileFooter);
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index ff02bf7..ece97fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -273,11 +273,25 @@ public class CarbonMetadataUtil {
       blockIndex.setOffset(blockIndexInfo.getOffset());
       blockIndex.setFile_name(blockIndexInfo.getFileName());
       blockIndex.setBlock_index(getBlockletIndex(blockIndexInfo.getBlockletIndex()));
+      if (blockIndexInfo.getBlockletInfo() != null) {
+        blockIndex.setBlocklet_info(getBlocletInfo3(blockIndexInfo.getBlockletInfo()));
+      }
       thriftBlockIndexList.add(blockIndex);
     }
     return thriftBlockIndexList;
   }
 
+  private static BlockletInfo3 getBlocletInfo3(
+      org.apache.carbondata.core.metadata.blocklet.BlockletInfo blockletInfo) {
+    List<Long> dimensionChunkOffsets = blockletInfo.getDimensionChunkOffsets();
+    dimensionChunkOffsets.addAll(blockletInfo.getMeasureChunkOffsets());
+    List<Integer> dimensionChunksLength = blockletInfo.getDimensionChunksLength();
+    dimensionChunksLength.addAll(blockletInfo.getMeasureChunksLength());
+    return new BlockletInfo3(blockletInfo.getNumberOfRows(), dimensionChunkOffsets,
+        dimensionChunksLength, blockletInfo.getDimensionOffset(), blockletInfo.getMeasureOffsets(),
+        blockletInfo.getNumberOfPages());
+  }
+
   /**
    * return DataChunk3 that contains the input DataChunk2 list
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5a1e40f..76d5dc7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -932,6 +932,33 @@ public final class CarbonUtil {
   }
 
   /**
+   * Below method will be used to get the number of dimension column
+   * in carbon column schema
+   *
+   * @param columnSchemaList column schema list
+   * @return number of dimension column
+   */
+  public static int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) {
+    int numberOfDimensionColumns = 0;
+    int previousColumnGroupId = -1;
+    ColumnSchema columnSchema = null;
+    for (int i = 0; i < columnSchemaList.size(); i++) {
+      columnSchema = columnSchemaList.get(i);
+      if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) {
+        numberOfDimensionColumns++;
+      } else if (columnSchema.isDimensionColumn()) {
+        if (previousColumnGroupId != columnSchema.getColumnGroupId()) {
+          previousColumnGroupId = columnSchema.getColumnGroupId();
+          numberOfDimensionColumns++;
+        }
+      } else {
+        break;
+      }
+    }
+    return numberOfDimensionColumns;
+  }
+
+  /**
    * The method calculate the B-Tree metadata size.
    *
    * @param tableBlockInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index ccb8b29..214e217 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -76,7 +76,7 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
     List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
     for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) {
       BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i),
-          getNumberOfDimensionColumns(columnSchemaList));
+          CarbonUtil.getNumberOfDimensionColumns(columnSchemaList));
       blockletInfo.setBlockletIndex(blockletIndexList.get(i));
       blockletInfoList.add(blockletInfo);
     }
@@ -103,7 +103,7 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
    * @param blockletInfoThrift blocklet info of the thrift
    * @return blocklet info wrapper
    */
-  private BlockletInfo getBlockletInfo(
+  public BlockletInfo getBlockletInfo(
       org.apache.carbondata.format.BlockletInfo3 blockletInfoThrift, int numberOfDimensionColumns) {
     BlockletInfo blockletInfo = new BlockletInfo();
     List<Long> dimensionColumnChunkOffsets =
@@ -127,32 +127,5 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
     return blockletInfo;
   }
 
-  /**
-   * Below method will be used to get the number of dimension column
-   * in carbon column schema
-   *
-   * @param columnSchemaList column schema list
-   * @return number of dimension column
-   */
-  private int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) {
-    int numberOfDimensionColumns = 0;
-    int previousColumnGroupId = -1;
-    ColumnSchema columnSchema = null;
-    for (int i = 0; i < columnSchemaList.size(); i++) {
-      columnSchema = columnSchemaList.get(i);
-      if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) {
-        numberOfDimensionColumns++;
-      } else if (columnSchema.isDimensionColumn()) {
-        if (previousColumnGroupId != columnSchema.getColumnGroupId()) {
-          previousColumnGroupId = columnSchema.getColumnGroupId();
-          numberOfDimensionColumns++;
-        }
-      } else {
-        break;
-      }
-    }
-    return numberOfDimensionColumns;
-  }
-
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index c69e19f..f271517 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -21,13 +21,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
@@ -109,6 +103,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+  private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
 
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
@@ -170,6 +165,24 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
   }
 
+
+  public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+      throws IOException {
+    if (dataMapJob != null) {
+      String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
+      configuration.set(DATA_MAP_DSTR, toString);
+    }
+  }
+
+  private static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
+    String jobString = configuration.get(DATA_MAP_DSTR);
+    if (jobString != null) {
+      DataMapJob dataMapJob = (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
+      return dataMapJob;
+    }
+    return null;
+  }
+
   /**
    * It sets unresolved filter expression.
    *
@@ -254,7 +267,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
     TableDataMap blockletMap =
         DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
-            BlockletDataMapFactory.class);
+            BlockletDataMapFactory.class.getName());
     List<String> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
@@ -486,8 +499,18 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
 
     TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
-    List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
+        .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME,
+            BlockletDataMapFactory.class.getName());
+    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+    List<Blocklet> prunedBlocklets;
+    if (dataMapJob != null) {
+      DistributableDataMapFormat datamapDstr =
+          new DistributableDataMapFormat(absoluteTableIdentifier, BlockletDataMap.NAME,
+              segmentIds, BlockletDataMapFactory.class.getName());
+      prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+    } else {
+      prunedBlocklets = blockletMap.prune(segmentIds, resolver);
+    }
 
     List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
     int partitionIndex = 0;
@@ -531,7 +554,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     blocklet.updateLocations();
     org.apache.carbondata.hadoop.CarbonInputSplit split =
         org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
-            new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()),
+            new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(),
+                blocklet.getLocations()),
             ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
     split.setDetailInfo(blocklet.getDetailInfo());
     return split;
@@ -652,7 +676,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
       throws IOException, KeyGenException {
     TableDataMap blockletMap = DataMapStoreManager.getInstance()
-        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class);
+        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
         new SegmentStatusManager(identifier).getValidAndInvalidSegments();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
new file mode 100644
index 0000000..e33c356
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.api;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+/**
+ * Distributable datamap job to execute the #DistributableDataMapFormat in cluster. it prunes the
+ * datamaps distributably and returns the final blocklet list
+ */
+public interface DataMapJob extends Serializable {
+
+  List<Blocklet> execute(DistributableDataMapFormat dataMapFormat, FilterResolverIntf resolverIntf);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
new file mode 100644
index 0000000..653e33f
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.api;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Input format for datamaps, it makes the datamap pruning distributable.
+ */
+public class DistributableDataMapFormat extends FileInputFormat<Void, Blocklet> implements
+    Serializable {
+
+  private static final String FILTER_EXP = "mapreduce.input.distributed.datamap.filter";
+
+  private AbsoluteTableIdentifier identifier;
+
+  private String dataMapName;
+
+  private List<String> validSegments;
+
+  private String className;
+
+  public DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
+      String dataMapName, List<String> validSegments, String className) {
+    this.identifier = identifier;
+    this.dataMapName = dataMapName;
+    this.validSegments = validSegments;
+    this.className = className;
+  }
+
+  public static void setFilterExp(Configuration configuration, FilterResolverIntf filterExp)
+      throws IOException {
+    if (filterExp != null) {
+      String string = ObjectSerializationUtil.convertObjectToString(filterExp);
+      configuration.set(FILTER_EXP, string);
+    }
+  }
+
+  private static FilterResolverIntf getFilterExp(Configuration configuration) throws IOException {
+    String filterString = configuration.get(FILTER_EXP);
+    if (filterString != null) {
+      Object toObject = ObjectSerializationUtil.convertStringToObject(filterString);
+      return (FilterResolverIntf) toObject;
+    }
+    return null;
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    TableDataMap dataMap =
+        DataMapStoreManager.getInstance().getDataMap(identifier, dataMapName, className);
+    List<DataMapDistributable> distributables = dataMap.toDistributable(validSegments);
+    List<InputSplit> inputSplits = new ArrayList<>(distributables.size());
+    inputSplits.addAll(distributables);
+    return inputSplits;
+  }
+
+  @Override
+  public RecordReader<Void, Blocklet> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    return new RecordReader<Void, Blocklet>() {
+      private Iterator<Blocklet> blockletIterator;
+      private Blocklet currBlocklet;
+
+      @Override
+      public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+          throws IOException, InterruptedException {
+        DataMapDistributable distributable = (DataMapDistributable)inputSplit;
+        AbsoluteTableIdentifier identifier =
+            AbsoluteTableIdentifier.fromTablePath(distributable.getTablePath());
+        TableDataMap dataMap = DataMapStoreManager.getInstance()
+            .getDataMap(identifier, distributable.getDataMapName(),
+                distributable.getDataMapFactoryClass());
+        blockletIterator =
+            dataMap.prune(distributable, getFilterExp(taskAttemptContext.getConfiguration()))
+                .iterator();
+      }
+
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        boolean hasNext = blockletIterator.hasNext();
+        if (hasNext) {
+          currBlocklet = blockletIterator.next();
+        }
+        return hasNext;
+      }
+
+      @Override
+      public Void getCurrentKey() throws IOException, InterruptedException {
+        return null;
+      }
+
+      @Override
+      public Blocklet getCurrentValue() throws IOException, InterruptedException {
+        return currBlocklet;
+      }
+
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return 0;
+      }
+
+      @Override
+      public void close() throws IOException {
+
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBasicTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBasicTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBasicTestCase.scala
index 3ca982c..d696ace 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBasicTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBasicTestCase.scala
@@ -5857,8 +5857,8 @@ class QueriesBasicTestCase extends QueryTest with BeforeAndAfterAll {
   //TC_025
   test("TC_025", Include) {
 
-    checkAnswer(s"""select channelsId, sum(channelsId+channelsId) Total from Carbon_automation group by  channelsId order by Total""",
-      s"""select channelsId, sum(channelsId+channelsId) Total from Carbon_automation_hive group by  channelsId order by Total""", "QueriesBasicTestCase_TC_025")
+    checkAnswer(s"""select channelsId, sum(channelsId+channelsId) Total from Carbon_automation group by  channelsId order by channelsId,Total""",
+      s"""select channelsId, sum(channelsId+channelsId) Total from Carbon_automation_hive group by  channelsId order by channelsId,Total""", "QueriesBasicTestCase_TC_025")
 
   }
 
@@ -7898,15 +7898,6 @@ class QueriesBasicTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
 
-  //TC_313
-  test("TC_313", Include) {
-
-    checkAnswer(s"""select ActiveOperatorId, sum(imeiupdown) as total, count(distinct AMSize) as AMSize_count from (select AMSize, t1.gamePointId+ t1.contractNumber as imeiupdown, if((t1.gamePointId+ t1.contractNumber)>100, '>50', if((t1.gamePointId+t1.contractNumber)>100,'50~10',if((t1.gamePointId+t1.contractNumber)>100, '10~1','<1'))) as ActiveOperatorId from Carbon_automation t1) t2 group by ActiveOperatorId""",
-      s"""select ActiveOperatorId, sum(imeiupdown) as total, count(distinct AMSize) as AMSize_count from (select AMSize, t1.gamePointId+ t1.contractNumber as imeiupdown, if((t1.gamePointId+ t1.contractNumber)>100, '>50', if((t1.gamePointId+t1.contractNumber)>100,'50~10',if((t1.gamePointId+t1.contractNumber)>100, '10~1','<1'))) as ActiveOperatorId from Carbon_automation_hive t1) t2 group by ActiveOperatorId""", "QueriesBasicTestCase_TC_313")
-
-  }
-
-
   //TC_314
   test("TC_314", Include) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesIncludeDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesIncludeDictionaryTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesIncludeDictionaryTestCase.scala
index fbcc74d..2a99d13 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesIncludeDictionaryTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesIncludeDictionaryTestCase.scala
@@ -3068,8 +3068,8 @@ class QueriesIncludeDictionaryTestCase extends QueryTest with BeforeAndAfterAll
   //VMALL_DICTIONARY_INCLUDE_371
   test("VMALL_DICTIONARY_INCLUDE_371", Include) {
 
-    checkAnswer(s"""select imei,deviceInformationId,MAC,deviceColor from VMALL_DICTIONARY_INCLUDE where  Latest_DAY  not like '1234567890123480.0000000000' order by deviceInformationId limit 5""",
-      s"""select imei,deviceInformationId,MAC,deviceColor from VMALL_DICTIONARY_INCLUDE_hive where  Latest_DAY  not like '1234567890123480.0000000000' order by deviceInformationId limit 5""", "QueriesIncludeDictionaryTestCase_VMALL_DICTIONARY_INCLUDE_371")
+    checkAnswer(s"""select imei,deviceInformationId,MAC,deviceColor from VMALL_DICTIONARY_INCLUDE where  Latest_DAY  not like '1234567890123480.0000000000' order by imei,deviceInformationId limit 5""",
+      s"""select imei,deviceInformationId,MAC,deviceColor from VMALL_DICTIONARY_INCLUDE_hive where  Latest_DAY  not like '1234567890123480.0000000000' order by imei,deviceInformationId limit 5""", "QueriesIncludeDictionaryTestCase_VMALL_DICTIONARY_INCLUDE_371")
 
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index ee9d7ab..e8ec6ad 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -52,6 +52,15 @@ class C2DataMapFactory() extends DataMapFactory {
   override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
 
   override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO)
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = {
+    ???
+  }
 }
 
 class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
@@ -76,7 +85,7 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
     // register datamap writer
     DataMapStoreManager.getInstance().createAndRegisterDataMap(
       AbsoluteTableIdentifier.from(storeLocation, "default", "carbon1"),
-      classOf[C2DataMapFactory],
+      classOf[C2DataMapFactory].getName,
       "test")
 
     val df = buildTestData(33000)
@@ -103,7 +112,7 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
     // register datamap writer
     DataMapStoreManager.getInstance().createAndRegisterDataMap(
       AbsoluteTableIdentifier.from(storeLocation, "default", "carbon2"),
-      classOf[C2DataMapFactory],
+      classOf[C2DataMapFactory].getName,
       "test")
 
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2a1a781..0035c44 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -275,6 +275,11 @@ class CarbonScanRDD(
       identifier.appendWithLocalPrefix(identifier.getTablePath))
     CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
+    if (CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+        CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
+      CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    }
     format
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
new file mode 100644
index 0000000..f9e4f8d
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.rdd
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
+
+/**
+ * Spark job to execute datamap job and prune all the datamaps distributable
+ */
+class SparkDataMapJob extends DataMapJob {
+
+  override def execute(dataMapFormat: DistributableDataMapFormat,
+      resolverIntf: FilterResolverIntf): util.List[Blocklet] = {
+    new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, resolverIntf).collect().toList
+      .asJava
+  }
+}
+
+class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) extends Partition {
+  override def index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * RDD to prune the datamaps across spark cluster
+ * @param sc
+ * @param dataMapFormat
+ * @param resolverIntf
+ */
+class DataMapPruneRDD(sc: SparkContext,
+    dataMapFormat: DistributableDataMapFormat,
+    resolverIntf: FilterResolverIntf)
+  extends CarbonRDD[(Blocklet)](sc, Nil) {
+
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  override def internalCompute(split: Partition,
+      context: TaskContext): Iterator[Blocklet] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+    val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+    val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
+    DistributableDataMapFormat.setFilterExp(attemptContext.getConfiguration, resolverIntf)
+    val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
+    reader.initialize(inputSplit, attemptContext)
+    val iter = new Iterator[Blocklet] {
+
+      private var havePair = false
+      private var finished = false
+
+      override def hasNext: Boolean = {
+        if (context.isInterrupted) {
+          throw new TaskKilledException
+        }
+        if (!finished && !havePair) {
+          finished = !reader.nextKeyValue
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): Blocklet = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        val value = reader.getCurrentValue
+        value
+      }
+    }
+    iter
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    val job = Job.getInstance(new Configuration())
+    val splits = dataMapFormat.getSplits(job)
+    splits.asScala.zipWithIndex.map(f => new DataMapRDDPartition(id, f._2, f._1)).toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 063eaad..43d456f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -37,9 +37,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -1300,64 +1298,4 @@ public final class CarbonDataMergerUtil {
     return true;
   }
 
-  /**
-   * This will update the property of segments as major compacted.
-   * @param model
-   * @param changedSegDetails
-   */
-  public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model,
-      List<LoadMetadataDetails> changedSegDetails,
-      List<LoadMetadataDetails> preservedSegment) throws Exception {
-
-    String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-            model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-    LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath);
-    List<LoadMetadataDetails> originalList = Arrays.asList(details);
-    for (LoadMetadataDetails segment : changedSegDetails) {
-      if (preservedSegment.contains(segment)) {
-        continue;
-      }
-      originalList.get(originalList.indexOf(segment)).setMajorCompacted("true");
-
-    }
-
-
-    ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
-            model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
-            LockUsage.TABLE_STATUS_LOCK);
-
-    try {
-      if (carbonTableStatusLock.lockWithRetries()) {
-        LOGGER.info(
-            "Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName()
-                        + " for table status updation ");
-        CarbonTablePath carbonTablePath = CarbonStorePath
-                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                        absoluteTableIdentifier.getCarbonTableIdentifier());
-
-        segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(),
-                originalList.toArray(new LoadMetadataDetails[originalList.size()]));
-      } else {
-        LOGGER.error(
-                "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
-                        .getTableName() + "for table status updation");
-        throw new Exception("Failed to update the MajorCompactionStatus.");
-      }
-    } catch (IOException e) {
-      LOGGER.error("Error while writing metadata");
-      throw new Exception("Failed to update the MajorCompactionStatus." + e.getMessage());
-    } finally {
-      if (carbonTableStatusLock.unlock()) {
-        LOGGER.info(
-                "Table unlocked successfully after table status updation" + model.getDatabaseName()
-                        + "." + model.getTableName());
-      } else {
-        LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model
-                .getTableName() + " during table status updation");
-      }
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1e21cd1c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 742b25a..e0d4b73 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -29,13 +29,14 @@ import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
-import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverterV3;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.FileFooter3;
 import org.apache.carbondata.processing.store.TablePage;
@@ -298,62 +299,41 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
   /**
    * Below method will be used to fill the block info details
    *
-   * @param numberOfRows    number of rows in file
+   * @param numberOfRows       number of rows in file
    * @param carbonDataFileName The name of carbonData file
-   * @param currentPosition current offset
+   * @param currentPosition    current offset
    */
   @Override
   protected void fillBlockIndexInfoDetails(long numberOfRows, String carbonDataFileName,
       long currentPosition) {
-    byte[][] currentMinValue = new byte[blockletIndex.get(0).min_max_index.max_values.size()][];
-    byte[][] currentMaxValue = new byte[blockletIndex.get(0).min_max_index.max_values.size()][];
-    for (int i = 0; i < currentMaxValue.length; i++) {
-      currentMinValue[i] = blockletIndex.get(0).min_max_index.getMin_values().get(i).array();
-      currentMaxValue[i] = blockletIndex.get(0).min_max_index.getMax_values().get(i).array();
+    int i = 0;
+    DataFileFooterConverterV3 converterV3 = new DataFileFooterConverterV3();
+    for (org.apache.carbondata.format.BlockletIndex index : blockletIndex) {
+      BlockletInfo3 blockletInfo3 = blockletMetadata.get(i);
+      BlockletInfo blockletInfo = converterV3.getBlockletInfo(blockletInfo3,
+          dataWriterVo.getSegmentProperties().getDimensions().size());
+      BlockletBTreeIndex bTreeIndex = new BlockletBTreeIndex(index.b_tree_index.getStart_key(),
+          index.b_tree_index.getEnd_key());
+      BlockletMinMaxIndex minMaxIndex = new BlockletMinMaxIndex();
+      minMaxIndex.setMinValues(toByteArray(index.getMin_max_index().getMin_values()));
+      minMaxIndex.setMaxValues(toByteArray(index.getMin_max_index().getMax_values()));
+      org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex bIndex =
+          new org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex(bTreeIndex,
+              minMaxIndex);
+      BlockIndexInfo biInfo =
+          new BlockIndexInfo(numberOfRows, carbonDataFileName, currentPosition, bIndex,
+              blockletInfo);
+      blockIndexInfoList.add(biInfo);
+      i++;
     }
-    byte[] minValue = null;
-    byte[] maxValue = null;
-    int measureStartIndex = currentMinValue.length - dataWriterVo.getMeasureCount();
-    for (int i = 1; i < blockletIndex.size(); i++) {
-      for (int j = 0; j < measureStartIndex; j++) {
-        minValue = blockletIndex.get(i).min_max_index.getMin_values().get(j).array();
-        maxValue = blockletIndex.get(i).min_max_index.getMax_values().get(j).array();
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue) > 0) {
-          currentMinValue[j] = minValue.clone();
-        }
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue) < 0) {
-          currentMaxValue[j] = maxValue.clone();
-        }
-      }
-      int measureIndex = 0;
-      for (int j = measureStartIndex; j < currentMinValue.length; j++) {
-        minValue = blockletIndex.get(i).min_max_index.getMin_values().get(j).array();
-        maxValue = blockletIndex.get(i).min_max_index.getMax_values().get(j).array();
+  }
 
-        if (CarbonMetadataUtil.compareMeasureData(currentMinValue[j], minValue,
-            dataWriterVo.getSegmentProperties().getMeasures().get(measureIndex).getDataType())
-            > 0) {
-          currentMinValue[j] = minValue.clone();
-        }
-        if (CarbonMetadataUtil.compareMeasureData(currentMaxValue[j], maxValue,
-            dataWriterVo.getSegmentProperties().getMeasures().get(measureIndex).getDataType())
-            < 0) {
-          currentMaxValue[j] = maxValue.clone();
-        }
-        measureIndex++;
-      }
+  private byte[][] toByteArray(List<ByteBuffer> buffers) {
+    byte[][] arrays = new byte[buffers.size()][];
+    for (int i = 0; i < arrays.length; i++) {
+      arrays[i] = buffers.get(i).array();
     }
-    BlockletBTreeIndex btree =
-        new BlockletBTreeIndex(blockletIndex.get(0).b_tree_index.getStart_key(),
-            blockletIndex.get(blockletIndex.size() - 1).b_tree_index.getEnd_key());
-    BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
-    minmax.setMinValues(currentMinValue);
-    minmax.setMaxValues(currentMaxValue);
-    org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex blockletIndex =
-        new org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex(btree, minmax);
-    BlockIndexInfo blockIndexInfo =
-        new BlockIndexInfo(numberOfRows, carbonDataFileName, currentPosition, blockletIndex);
-    blockIndexInfoList.add(blockIndexInfo);
+    return arrays;
   }
 
   /**


Mime
View raw message