carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [16/50] [abbrv] carbondata git commit: [CARBONDATA-1858][PARTITION] Support querying data from partition table.
Date Sun, 24 Dec 2017 13:26:12 GMT
[CARBONDATA-1858][PARTITION] Support querying data from partition table.

In case of partition table first, use sessioncatalog to prune the partitions. With the partition information, datamap should read partition.map file to get the index file and corresponding blocklets to prune

This closes #1672


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b8a02f39
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b8a02f39
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b8a02f39

Branch: refs/heads/fgdatamap
Commit: b8a02f391d9ff543abb54c7b58733cf9a4d7beaf
Parents: e6497e1
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Mon Dec 18 22:37:59 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Dec 19 23:17:55 2017 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  10 +-
 .../carbondata/core/datamap/dev/DataMap.java    |  11 +
 .../indexstore/BlockletDataMapIndexStore.java   |  37 ++--
 .../blockletindex/BlockletDataMap.java          | 117 +++++++---
 .../blockletindex/BlockletDataMapModel.java     |  11 +-
 .../core/metadata/PartitionMapFileStore.java    |  60 ++++--
 .../apache/carbondata/core/util/CarbonUtil.java |  14 +-
 .../core/util/path/CarbonTablePath.java         |   2 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   4 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   4 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  32 ++-
 .../hadoop/api/DistributableDataMapFormat.java  |  11 +-
 .../StandardPartitionTableQueryTestCase.scala   | 216 +++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   6 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  18 ++
 .../sql/CarbonDatasourceHadoopRelation.scala    |   7 +-
 .../management/CarbonInsertIntoCommand.scala    |  24 +--
 .../management/CarbonLoadDataCommand.scala      | 168 ++++++++-------
 .../table/CarbonCreateTableCommand.scala        |   5 +-
 .../datasources/CarbonFileFormat.scala          |   6 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |  81 +++++--
 .../spark/sql/optimizer/CarbonFilters.scala     |  18 +-
 22 files changed, 647 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 1d5c978..61d2243 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -64,14 +64,14 @@ public final class TableDataMap implements OperationEventListener {
    * @param filterExp
    * @return
    */
-  public List<ExtendedBlocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp)
-      throws IOException {
+  public List<ExtendedBlocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp,
+      List<String> partitions) throws IOException {
     List<ExtendedBlocklet> blocklets = new ArrayList<>();
     for (String segmentId : segmentIds) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
       List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
       for (DataMap dataMap : dataMaps) {
-        pruneBlocklets.addAll(dataMap.prune(filterExp));
+        pruneBlocklets.addAll(dataMap.prune(filterExp, partitions));
       }
       blocklets.addAll(addSegmentId(blockletDetailsFetcher
           .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
@@ -118,12 +118,12 @@ public final class TableDataMap implements OperationEventListener {
    * @return
    */
   public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
-      FilterResolverIntf filterExp) throws IOException {
+      FilterResolverIntf filterExp, List<String> partitions) throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     List<Blocklet> blocklets = new ArrayList<>();
     List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
     for (DataMap dataMap : dataMaps) {
-      blocklets.addAll(dataMap.prune(filterExp));
+      blocklets.addAll(dataMap.prune(filterExp, partitions));
     }
     for (Blocklet blocklet: blocklets) {
       ExtendedBlocklet detailedBlocklet =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index e7c30a9..16be1ac 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -42,6 +42,17 @@ public interface DataMap {
    */
   List<Blocklet> prune(FilterResolverIntf filterExp);
 
+  // TODO Move this method to Abstract class
+  /**
+   * Prune the datamap with filter expression and partition information. It returns the list of
+   * blocklets where these filters can exist.
+   *
+   * @param filterExp
+   * @return
+   */
+  List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions);
+
+  // TODO Move this method to Abstract class
   /**
    * Validate whether the current segment needs to be fetching the required data
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 4d90a08..59af50b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.PartitionMapFileStore;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
@@ -71,12 +72,14 @@ public class BlockletDataMapIndexStore
     BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
     if (dataMap == null) {
       try {
+        String segmentPath = CarbonTablePath
+            .getSegmentPath(identifier.getAbsoluteTableIdentifier().getTablePath(),
+                identifier.getSegmentId());
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-        indexFileStore.readAllIIndexOfSegment(
-            CarbonTablePath.getSegmentPath(
-                identifier.getAbsoluteTableIdentifier().getTablePath(),
-                identifier.getSegmentId()));
-        dataMap = loadAndGetDataMap(identifier, indexFileStore);
+        indexFileStore.readAllIIndexOfSegment(segmentPath);
+        PartitionMapFileStore partitionFileStore = new PartitionMapFileStore();
+        partitionFileStore.readAllPartitionsOfSegment(segmentPath);
+        dataMap = loadAndGetDataMap(identifier, indexFileStore, partitionFileStore);
       } catch (MemoryException e) {
         LOGGER.error("memory exception when loading datamap: " + e.getMessage());
         throw new RuntimeException(e.getMessage(), e);
@@ -102,18 +105,26 @@ public class BlockletDataMapIndexStore
       }
       if (missedIdentifiers.size() > 0) {
         Map<String, SegmentIndexFileStore> segmentIndexFileStoreMap = new HashMap<>();
+        Map<String, PartitionMapFileStore> partitionFileStoreMap = new HashMap<>();
         for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
           SegmentIndexFileStore indexFileStore =
               segmentIndexFileStoreMap.get(identifier.getSegmentId());
+          PartitionMapFileStore partitionFileStore =
+              partitionFileStoreMap.get(identifier.getSegmentId());
+          String segmentPath = CarbonTablePath
+              .getSegmentPath(identifier.getAbsoluteTableIdentifier().getTablePath(),
+                  identifier.getSegmentId());
           if (indexFileStore == null) {
-            String segmentPath = CarbonTablePath
-                .getSegmentPath(identifier.getAbsoluteTableIdentifier().getTablePath(),
-                    identifier.getSegmentId());
             indexFileStore = new SegmentIndexFileStore();
             indexFileStore.readAllIIndexOfSegment(segmentPath);
             segmentIndexFileStoreMap.put(identifier.getSegmentId(), indexFileStore);
           }
-          blockletDataMaps.add(loadAndGetDataMap(identifier, indexFileStore));
+          if (partitionFileStore == null) {
+            partitionFileStore = new PartitionMapFileStore();
+            partitionFileStore.readAllPartitionsOfSegment(segmentPath);
+            partitionFileStoreMap.put(identifier.getSegmentId(), partitionFileStore);
+          }
+          blockletDataMaps.add(loadAndGetDataMap(identifier, indexFileStore, partitionFileStore));
         }
       }
     } catch (Throwable e) {
@@ -160,7 +171,8 @@ public class BlockletDataMapIndexStore
    */
   private BlockletDataMap loadAndGetDataMap(
       TableBlockIndexUniqueIdentifier identifier,
-      SegmentIndexFileStore indexFileStore)
+      SegmentIndexFileStore indexFileStore,
+      PartitionMapFileStore partitionFileStore)
       throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         identifier.getUniqueTableSegmentIdentifier();
@@ -168,11 +180,12 @@ public class BlockletDataMapIndexStore
     if (lock == null) {
       lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
     }
-    BlockletDataMap dataMap = null;
+    BlockletDataMap dataMap;
     synchronized (lock) {
       dataMap = new BlockletDataMap();
       dataMap.init(new BlockletDataMapModel(identifier.getFilePath(),
-          indexFileStore.getFileData(identifier.getCarbonIndexFileName())));
+          indexFileStore.getFileData(identifier.getCarbonIndexFileName()),
+          partitionFileStore.getPartitions(identifier.getCarbonIndexFileName())));
       lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
           dataMap.getMemorySize());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 1dc9b7a..70bae32 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -99,7 +99,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private UnsafeMemoryDMStore unsafeMemoryDMStore;
 
-  private UnsafeMemoryDMStore unsafeMemoryTaskMinMaxDMStore;
+  private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
 
   private SegmentProperties segmentProperties;
 
@@ -113,13 +113,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     List<DataFileFooter> indexInfo = fileFooterConverter
         .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
+    DataMapRowImpl summaryRow = null;
     for (DataFileFooter fileFooter : indexInfo) {
       List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
       if (segmentProperties == null) {
         columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
         segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
         createSchema(segmentProperties);
-        createTaskMinMaxSchema(segmentProperties);
+        createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions());
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() == 0) {
@@ -128,28 +129,28 @@ public class BlockletDataMap implements DataMap, Cacheable {
         fileFooter = CarbonUtil.readMetadatFile(blockInfo);
       }
 
-      loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath());
+      summaryRow = loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow);
     }
     if (unsafeMemoryDMStore != null) {
       unsafeMemoryDMStore.finishWriting();
     }
-    if (null != unsafeMemoryTaskMinMaxDMStore) {
-      unsafeMemoryTaskMinMaxDMStore.finishWriting();
+    if (null != unsafeMemorySummaryDMStore) {
+      addTaskSummaryRowToUnsafeMemoryStore(summaryRow, blockletDataMapInfo.getPartitions());
+      unsafeMemorySummaryDMStore.finishWriting();
     }
     LOGGER.info(
         "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
             System.currentTimeMillis() - startTime));
   }
 
-  private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties,
-      String filePath) {
+  private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
+      SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
     CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
-    DataMapRow taskMinMaxRow = null;
     // Add one row to maintain task level min max for segment pruning
-    if (!blockletList.isEmpty()) {
-      taskMinMaxRow = new DataMapRowImpl(unsafeMemoryTaskMinMaxDMStore.getSchema());
+    if (!blockletList.isEmpty() && summaryRow == null) {
+      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
     }
     for (int index = 0; index < blockletList.size(); index++) {
       DataMapRow row = new DataMapRowImpl(schema);
@@ -164,16 +165,16 @@ public class BlockletDataMap implements DataMap, Cacheable {
       byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
       row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
       // compute and set task level min values
-      addTaskMinMaxValues(taskMinMaxRow, minMaxLen,
-          unsafeMemoryTaskMinMaxDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+      addTaskMinMaxValues(summaryRow, minMaxLen,
+          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
           TASK_MIN_VALUES_INDEX, true);
       ordinal++;
       taskMinMaxOrdinal++;
       byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
       row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
       // compute and set task level max values
-      addTaskMinMaxValues(taskMinMaxRow, minMaxLen,
-          unsafeMemoryTaskMinMaxDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+      addTaskMinMaxValues(summaryRow, minMaxLen,
+          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
           TASK_MAX_VALUES_INDEX, false);
       ordinal++;
 
@@ -205,17 +206,31 @@ public class BlockletDataMap implements DataMap, Cacheable {
         throw new RuntimeException(e);
       }
     }
-    // write the task level min/max row to unsafe memory store
-    if (null != taskMinMaxRow) {
-      addTaskMinMaxRowToUnsafeMemoryStore(taskMinMaxRow);
-    }
+
+    return summaryRow;
   }
 
-  private void addTaskMinMaxRowToUnsafeMemoryStore(DataMapRow taskMinMaxRow) {
-    try {
-      unsafeMemoryTaskMinMaxDMStore.addIndexRowToUnsafe(taskMinMaxRow);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+  private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow,
+      List<String> partitions) {
+    // write the task summary info to unsafe memory store
+    if (null != summaryRow) {
+      if (partitions != null && partitions.size() > 0) {
+        CarbonRowSchema[] minSchemas =
+            ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore.getSchema()[2])
+                .getChildSchemas();
+        DataMapRow partitionRow = new DataMapRowImpl(minSchemas);
+        for (int i = 0; i < partitions.size(); i++) {
+          partitionRow
+              .setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
+                  i);
+        }
+        summaryRow.setRow(partitionRow, 2);
+      }
+      try {
+        unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
@@ -379,10 +394,21 @@ public class BlockletDataMap implements DataMap, Cacheable {
         new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
   }
 
-  private void createTaskMinMaxSchema(SegmentProperties segmentProperties) throws MemoryException {
+  private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions)
+      throws MemoryException {
     List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(2);
     getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
-    unsafeMemoryTaskMinMaxDMStore = new UnsafeMemoryDMStore(
+    if (partitions != null && partitions.size() > 0) {
+      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()];
+      for (int i = 0; i < mapSchemas.length; i++) {
+        mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
+      }
+      CarbonRowSchema mapSchema =
+          new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+              mapSchemas);
+      taskMinMaxSchemas.add(mapSchema);
+    }
+    unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
         taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
   }
 
@@ -412,8 +438,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
   public boolean isScanRequired(FilterResolverIntf filterExp) {
     FilterExecuter filterExecuter =
         FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
-    for (int i = 0; i < unsafeMemoryTaskMinMaxDMStore.getRowCount(); i++) {
-      DataMapRow unsafeRow = unsafeMemoryTaskMinMaxDMStore.getUnsafeRow(i);
+    for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
+      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
       boolean isScanRequired = FilterExpressionProcessor
           .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
               getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
@@ -491,6 +517,20 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return blocklets;
   }
 
+  @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) {
+    List<String> storedPartitions = getPartitions();
+    if (storedPartitions != null && storedPartitions.size() > 0 && filterExp != null) {
+      boolean found = false;
+      if (partitions != null && partitions.size() > 0) {
+        found = partitions.containsAll(storedPartitions);
+      }
+      if (!found) {
+        return new ArrayList<>();
+      }
+    }
+    return prune(filterExp);
+  }
+
   /**
    * select the blocks based on column min and max value
    *
@@ -664,6 +704,19 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return dataMapRow;
   }
 
+  private List<String> getPartitions() {
+    List<String> partitions = new ArrayList<>();
+    DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+    if (unsafeRow.getColumnCount() > 2) {
+      DataMapRow row = unsafeRow.getRow(2);
+      for (int i = 0; i < row.getColumnCount(); i++) {
+        partitions.add(
+            new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
+      }
+    }
+    return partitions;
+  }
+
   @Override
   public void clear() {
     if (unsafeMemoryDMStore != null) {
@@ -672,9 +725,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
       segmentProperties = null;
     }
     // clear task min/max unsafe memory
-    if (null != unsafeMemoryTaskMinMaxDMStore) {
-      unsafeMemoryTaskMinMaxDMStore.freeMemory();
-      unsafeMemoryTaskMinMaxDMStore = null;
+    if (null != unsafeMemorySummaryDMStore) {
+      unsafeMemorySummaryDMStore.freeMemory();
+      unsafeMemorySummaryDMStore = null;
     }
   }
 
@@ -694,8 +747,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     if (unsafeMemoryDMStore != null) {
       memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
     }
-    if (null != unsafeMemoryTaskMinMaxDMStore) {
-      memoryUsed += unsafeMemoryTaskMinMaxDMStore.getMemoryUsed();
+    if (null != unsafeMemorySummaryDMStore) {
+      memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
     }
     return memoryUsed;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index 7ffa32d..c98ef33 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -16,18 +16,27 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
+import java.util.List;
+
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 
 public class BlockletDataMapModel extends DataMapModel {
 
   private byte[] fileData;
 
-  public BlockletDataMapModel(String filePath, byte[] fileData) {
+  private List<String> partitions;
+
+  public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions) {
     super(filePath);
     this.fileData = fileData;
+    this.partitions = partitions;
   }
 
   public byte[] getFileData() {
     return fileData;
   }
+
+  public List<String> getPartitions() {
+    return partitions;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index 4b58e75..853c729 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -46,6 +46,7 @@ import com.google.gson.Gson;
  */
 public class PartitionMapFileStore {
 
+  private Map<String, List<String>> partitionMap = new HashMap<>();
   /**
    * Write partitionmapp file to the segment folder with indexfilename and corresponding partitions.
    *
@@ -105,33 +106,38 @@ public class PartitionMapFileStore {
    * @throws IOException
    */
   public void mergePartitionMapFiles(String segmentPath) throws IOException {
+    CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
+    if (partitionFiles != null && partitionFiles.length > 1) {
+      PartitionMapper partitionMapper = null;
+      for (CarbonFile file : partitionFiles) {
+        PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath());
+        if (partitionMapper == null && localMapper != null) {
+          partitionMapper = localMapper;
+        }
+        if (localMapper != null) {
+          partitionMapper = partitionMapper.merge(localMapper);
+        }
+      }
+      if (partitionMapper != null) {
+        String path = segmentPath + "/" + "mergedpartitions" + CarbonTablePath.PARTITION_MAP_EXT;
+        writePartitionFile(partitionMapper, path);
+        for (CarbonFile file : partitionFiles) {
+          FileFactory.deleteAllCarbonFilesOfDir(file);
+        }
+      }
+    }
+  }
+
+  private CarbonFile[] getPartitionFiles(String segmentPath) {
     CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
     if (carbonFile.exists()) {
-      CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      return carbonFile.listFiles(new CarbonFileFilter() {
         @Override public boolean accept(CarbonFile file) {
           return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
         }
       });
-      if (carbonFiles != null && carbonFiles.length > 1) {
-        PartitionMapper partitionMapper = null;
-        for (CarbonFile file : carbonFiles) {
-          PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath());
-          if (partitionMapper == null && localMapper != null) {
-            partitionMapper = localMapper;
-          }
-          if (localMapper != null) {
-            partitionMapper = partitionMapper.merge(localMapper);
-          }
-        }
-        if (partitionMapper != null) {
-          String path = segmentPath + "/" + "mergedpartitions" + CarbonTablePath.PARTITION_MAP_EXT;
-          writePartitionFile(partitionMapper, path);
-          for (CarbonFile file : carbonFiles) {
-            FileFactory.deleteAllCarbonFilesOfDir(file);
-          }
-        }
-      }
     }
+    return null;
   }
 
   /**
@@ -167,6 +173,20 @@ public class PartitionMapFileStore {
     return partitionMapper;
   }
 
+  public void readAllPartitionsOfSegment(String segmentPath) {
+    CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
+    if (partitionFiles != null && partitionFiles.length > 0) {
+      for (CarbonFile file : partitionFiles) {
+        PartitionMapper partitionMapper = readPartitionMap(file.getAbsolutePath());
+        partitionMap.putAll(partitionMapper.getPartitionMap());
+      }
+    }
+  }
+
+  public List<String> getPartitions(String indexFileName) {
+    return partitionMap.get(indexFileName);
+  }
+
   public static class PartitionMapper implements Serializable {
 
     private static final long serialVersionUID = 3582245668420401089L;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 910efea..bd01772 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1391,23 +1391,23 @@ public final class CarbonUtil {
   }
 
   /**
-   * Below method will be used to get the list of segment in
+   * Below method will be used to get the list of values in
    * comma separated string format
    *
-   * @param segmentList
+   * @param values
    * @return comma separated segment string
    */
-  public static String getSegmentString(List<String> segmentList) {
-    if (segmentList.isEmpty()) {
+  public static String convertToString(List<String> values) {
+    if (values == null || values.isEmpty()) {
       return "";
     }
     StringBuilder segmentStringbuilder = new StringBuilder();
-    for (int i = 0; i < segmentList.size() - 1; i++) {
-      String segmentNo = segmentList.get(i);
+    for (int i = 0; i < values.size() - 1; i++) {
+      String segmentNo = values.get(i);
       segmentStringbuilder.append(segmentNo);
       segmentStringbuilder.append(",");
     }
-    segmentStringbuilder.append(segmentList.get(segmentList.size() - 1));
+    segmentStringbuilder.append(values.get(values.size() - 1));
     return segmentStringbuilder.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 6be1f4c..c33c0a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -42,10 +42,10 @@ public class CarbonTablePath extends Path {
   private static final String FACT_DIR = "Fact";
   private static final String SEGMENT_PREFIX = "Segment_";
   private static final String PARTITION_PREFIX = "Part";
-  private static final String CARBON_DATA_EXT = ".carbondata";
   private static final String DATA_PART_PREFIX = "part-";
   private static final String BATCH_PREFIX = "_batchno";
 
+  public static final String CARBON_DATA_EXT = ".carbondata";
   public static final String INDEX_FILE_EXT = ".carbonindex";
   public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge";
   public static final String PARTITION_MAP_EXT = ".partitionmap";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index e653137..2296641 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -716,13 +716,13 @@ public class CarbonUtilTest {
     List<String> list = new ArrayList<>();
     list.add("1");
     list.add("2");
-    String segments = CarbonUtil.getSegmentString(list);
+    String segments = CarbonUtil.convertToString(list);
     assertEquals(segments, "1,2");
   }
 
   @Test public void testToGetSegmentStringWithEmptySegmentList() {
     List<String> list = new ArrayList<>();
-    String segments = CarbonUtil.getSegmentString(list);
+    String segments = CarbonUtil.convertToString(list);
     assertEquals(segments, "");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1803a12..3e8ede2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -299,14 +299,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
     configuration
-        .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
+        .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
   }
 
   /**
    * Set list of files to access
    */
   public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
-    configuration.set(CarbonInputFormat.INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+    configuration.set(CarbonInputFormat.INPUT_FILES, CarbonUtil.convertToString(validFiles));
   }
 
   private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 9e8e22d..e60a588 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -127,6 +127,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
   public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
+  public static final String PARTITIONS_TO_PRUNE =
+      "mapreduce.input.carboninputformat.partitions.to.prune";
 
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
@@ -257,7 +259,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    * Set list of segments to access
    */
   public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
-    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
+    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
   }
 
   /**
@@ -290,10 +292,28 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
   /**
+   * set list of partitions to prune
+   */
+  public static void setPartitionsToPrune(Configuration configuration, List<String> partitions) {
+    configuration.set(
+        CarbonTableInputFormat.PARTITIONS_TO_PRUNE, CarbonUtil.convertToString(partitions));
+  }
+
+  /**
+   * get list of partitions to prune
+   */
+  public static List<String> getPartitionsToPrune(Configuration configuration) {
+    String partitionString = configuration.get(PARTITIONS_TO_PRUNE, "");
+    if (partitionString.trim().isEmpty()) {
+      return null;
+    }
+    return Arrays.asList(partitionString.split(","));
+  }
+  /**
    * Set list of files to access
    */
   public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
-    configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+    configuration.set(INPUT_FILES, CarbonUtil.convertToString(validFiles));
   }
 
   public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
@@ -710,14 +730,16 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME,
             BlockletDataMapFactory.class.getName());
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+    List<String> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
     if (dataMapJob != null) {
       DistributableDataMapFormat datamapDstr =
           new DistributableDataMapFormat(absoluteTableIdentifier, BlockletDataMap.NAME,
-              segmentIds, BlockletDataMapFactory.class.getName());
+              segmentIds, partitionsToPrune,
+              BlockletDataMapFactory.class.getName());
       prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
     } else {
-      prunedBlocklets = blockletMap.prune(segmentIds, resolver);
+      prunedBlocklets = blockletMap.prune(segmentIds, resolver, partitionsToPrune);
     }
 
     List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
@@ -908,7 +930,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     // TODO: currently only batch segment is supported, add support for streaming table
     List<String> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments());
 
-    List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null);
+    List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, null);
     for (ExtendedBlocklet blocklet : blocklets) {
       String blockName = blocklet.getPath();
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 1b875bc..96eec6f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -53,12 +53,15 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private String className;
 
+  private List<String> partitions;
+
   public DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
-      String dataMapName, List<String> validSegments, String className) {
+      String dataMapName, List<String> validSegments, List<String> partitions, String className) {
     this.identifier = identifier;
     this.dataMapName = dataMapName;
     this.validSegments = validSegments;
     this.className = className;
+    this.partitions = partitions;
   }
 
   public static void setFilterExp(Configuration configuration, FilterResolverIntf filterExp)
@@ -102,9 +105,9 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
         TableDataMap dataMap = DataMapStoreManager.getInstance()
             .getDataMap(identifier, distributable.getDataMapName(),
                 distributable.getDataMapFactoryClass());
-        blockletIterator =
-            dataMap.prune(distributable, getFilterExp(taskAttemptContext.getConfiguration()))
-                .iterator();
+        blockletIterator = dataMap.prune(
+            distributable, getFilterExp(taskAttemptContext.getConfiguration()), partitions)
+            .iterator();
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
new file mode 100644
index 0000000..570951a
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.BatchedDataSourceScanExec
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  test("querying on partition table for int partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionone (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    val frame = sql(
+      "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno," +
+      " deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary " +
+      "from partitionone where empno=11 order by empno")
+
+    verifyPartitionInfo(frame, Seq("empno=11"))
+
+    checkAnswer(frame,
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno=11 order by empno"))
+
+  }
+
+  test("querying on partition table for string partition column") {
+    sql(
+      """
+        | CREATE TABLE partitiontwo (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    val frame = sql(
+      "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno," +
+      " deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary " +
+      "from partitiontwo where deptname='network' and projectcode=928478")
+    verifyPartitionInfo(frame, Seq("deptname=network"))
+
+    val frame1 = sql(
+      "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno," +
+      " deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary " +
+      "from partitiontwo where projectcode=928478")
+    checkAnswer(frame1,
+      sql( "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno," +
+           " deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary " +
+           "from originTable where projectcode=928478"))
+    verifyPartitionInfo(frame1, Seq("deptname=network","deptname=security","deptname=protocol","deptname=Learning","deptname=configManagement"))
+
+    val frame2 = sql("select distinct deptname from partitiontwo")
+
+    verifyPartitionInfo(frame2, Seq("deptname=network","deptname=security","deptname=protocol","deptname=Learning","deptname=configManagement"))
+
+    checkAnswer(frame,
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where where deptname='network' and projectcode=928478 order by empno"))
+
+  }
+
+  test("querying on partition table for more partition columns") {
+    sql(
+      """
+        | CREATE TABLE partitionmany (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    val frame = sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmany where doj='2007-01-17 00:00:00'")
+    verifyPartitionInfo(frame, Seq("deptname=network","doj=2007-01-17 00:00:00","projectcode=928478"))
+    checkAnswer(frame,
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where doj='2007-01-17 00:00:00'"))
+
+  }
+
+  test("querying on partition table for date partition column") {
+    sql(
+      """
+        | CREATE TABLE partitiondate (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,doj Timestamp,projectcode int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (projectenddate Date)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondate OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    val frame = sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiondate where projectenddate = cast('2016-11-30' as date)")
+    verifyPartitionInfo(frame, Seq("projectenddate=2016-11-30"))
+    checkAnswer(frame,
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where projectenddate = cast('2016-11-30' as date)"))
+
+  }
+
+  test("querying on partition table for date partition column on insert query") {
+    sql(
+      """
+        | CREATE TABLE partitiondateinsert (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (projectenddate Date,doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""")
+    val frame = sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiondateinsert where projectenddate = cast('2016-11-30' as date)")
+    verifyPartitionInfo(frame, Seq("projectenddate=2016-11-30","doj=2015-12-01 00:00:00"))
+    checkAnswer(frame,
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where projectenddate = cast('2016-11-30' as date)"))
+
+    val frame1 = sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiondateinsert where doj>'2006-01-17 00:00:00'")
+    verifyPartitionInfo(frame1,
+      Seq("projectenddate=2016-06-29" ,
+          "doj=2010-12-29 00:00:00"   ,
+          "doj=2015-12-01 00:00:00"   ,
+          "projectenddate=2016-11-12" ,
+          "projectenddate=2016-12-29" ,
+          "doj=2011-11-09 00:00:00"   ,
+          "doj=2009-07-07 00:00:00"   ,
+          "projectenddate=2016-05-29" ,
+          "doj=2012-10-14 00:00:00"   ,
+          "projectenddate=2016-11-30" ,
+           "projectenddate=2016-11-15",
+           "doj=2015-05-12 00:00:00"  ,
+           "doj=2013-09-22 00:00:00"  ,
+           "doj=2008-05-29 00:00:00"  ,
+           "doj=2014-08-15 00:00:00",
+           "projectenddate=2016-12-30"))
+    checkAnswer(frame1,
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where doj>'2006-01-17 00:00:00'"))
+
+  }
+
+
+  private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
+    val plan = frame.queryExecution.sparkPlan
+    val scanRDD = plan collect {
+      case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd
+        .asInstanceOf[CarbonScanRDD]
+    }
+    assert(scanRDD.nonEmpty)
+    assert(!partitionNames.map(f => scanRDD.head.partitionNames.exists(_.equals(f))).exists(!_))
+  }
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists originMultiLoads")
+    sql("drop table if exists partitionone")
+    sql("drop table if exists partitiontwo")
+    sql("drop table if exists partitionmany")
+    sql("drop table if exists partitiondate")
+    sql("drop table if exists partitiondateinsert")
+    sql("drop table if exists staticpartitionone")
+    sql("drop table if exists singlepasspartitionone")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index cc7f757..e58bfd4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -61,7 +61,8 @@ class CarbonScanRDD(
     identifier: AbsoluteTableIdentifier,
     @transient serializedTableInfo: Array[Byte],
     @transient tableInfo: TableInfo,
-    inputMetricsStats: InitInputMetrics)
+    inputMetricsStats: InitInputMetrics,
+    @transient val partitionNames: Seq[String])
   extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
@@ -332,6 +333,9 @@ class CarbonScanRDD(
     CarbonTableInputFormat.setTableInfo(conf, tableInfo)
     CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
     CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+    if (partitionNames != null) {
+      CarbonTableInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+    }
     createInputFormat(conf)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 653e6f3..f78412b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -22,8 +22,10 @@ import java.text.SimpleDateFormat
 import java.util
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.DataTypeInfo
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
@@ -152,6 +154,22 @@ object CarbonScalaUtil {
     }
   }
 
+  def convertToUTF8String(value: String,
+      dataType: DataType,
+      timeStampFormat: SimpleDateFormat,
+      dateFormat: SimpleDateFormat): UTF8String = {
+    dataType match {
+      case TimestampType =>
+        UTF8String.fromString(
+          DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000))
+      case DateType =>
+        UTF8String.fromString(
+          DateTimeUtils.dateToString(
+            (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt))
+      case _ => UTF8String.fromString(value)
+    }
+  }
+
   /**
    * This method will validate a column for its data type and check whether the column data type
    * can be modified and update if conditions are met

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 19b96a8..148fca8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -65,7 +65,9 @@ case class CarbonDatasourceHadoopRelation(
 
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
 
-  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
+  def buildScan(requiredColumns: Array[String],
+      filters: Array[Filter],
+      partitions: Seq[String]): RDD[InternalRow] = {
     val filterExpression: Option[Expression] = filters.flatMap { filter =>
       CarbonFilters.createCarbonFilter(schema, filter)
     }.reduceOption(new AndExpression(_, _))
@@ -80,7 +82,8 @@ case class CarbonDatasourceHadoopRelation(
       identifier,
       carbonTable.getTableInfo.serialize(),
       carbonTable.getTableInfo,
-      inputMetricsStats)
+      inputMetricsStats,
+      partitions)
   }
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 5142ef2..810b10f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -34,18 +34,18 @@ case class CarbonInsertIntoCommand(
     val df = Dataset.ofRows(sparkSession, child)
     val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
     val load = CarbonLoadDataCommand(
-      Some(relation.carbonRelation.databaseName),
-      relation.carbonRelation.tableName,
-      null,
-      Seq(),
-      scala.collection.immutable.Map("fileheader" -> header),
-      overwrite,
-      null,
-      Some(df),
-      None,
-      None,
-      Map.empty,
-      partition).run(sparkSession)
+      databaseNameOp = Some(relation.carbonRelation.databaseName),
+      tableName = relation.carbonRelation.tableName,
+      factPathFromUser = null,
+      dimFilesPath = Seq(),
+      options = scala.collection.immutable.Map("fileheader" -> header),
+      isOverwriteTable = overwrite,
+      inputSqlString = null,
+      dataFrame = Some(df),
+      updateModel = None,
+      tableInfoOp = None,
+      internalOptions = Map.empty,
+      partition = partition).run(sparkSession)
     // updating relation metadata. This is in case of auto detect high cardinality
     relation.carbonRelation.metaData =
       CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index c94f698..a883735 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -34,12 +34,13 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
@@ -363,7 +364,7 @@ case class CarbonLoadDataCommand(
 
     if (carbonTable.isHivePartitionTable) {
       try {
-        loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
+        loadDataWithPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
       } finally {
         server match {
           case Some(dictServer) =>
@@ -378,7 +379,8 @@ case class CarbonLoadDataCommand(
         }
       }
     } else {
-      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+      CarbonDataRDDFactory.loadCarbonData(
+        sparkSession.sqlContext,
         carbonLoadModel,
         columnar,
         partitionStatus,
@@ -417,9 +419,10 @@ case class CarbonLoadDataCommand(
         dictionaryDataFrame)
     }
     if (table.isHivePartitionTable) {
-      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
+      loadDataWithPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
     } else {
-      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+      CarbonDataRDDFactory.loadCarbonData(
+        sparkSession.sqlContext,
         carbonLoadModel,
         columnar,
         partitionStatus,
@@ -432,7 +435,17 @@ case class CarbonLoadDataCommand(
     }
   }
 
-  private def loadStandardPartition(sparkSession: SparkSession,
+  /**
+   * Loads the data in a hive partition way. This method uses InsertIntoTable command to load data
+   * into partitoned data. The table relation would be converted to HadoopFSRelation to let spark
+   * handling the partitioning.
+   * @param sparkSession
+   * @param carbonLoadModel
+   * @param hadoopConf
+   * @param dataFrame
+   * @return
+   */
+  private def loadDataWithPartition(sparkSession: SparkSession,
       carbonLoadModel: CarbonLoadModel,
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame]) = {
@@ -443,15 +456,14 @@ case class CarbonLoadDataCommand(
     val relation = logicalPlan.collect {
       case l: LogicalRelation => l
       case c: CatalogRelation => c
-    }.head.asInstanceOf[LogicalPlan]
+    }.head
 
+    // Converts the data to carbon understandable format. The timestamp/date format data needs to
+    // converted to hive standard fomat to let spark understand the data to partition.
     val query: LogicalPlan = if (dataFrame.isDefined) {
-      val timeStampformatString = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
       val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
-        .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
       val dateFormat = new SimpleDateFormat(dateFormatString)
       val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
       val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
@@ -479,31 +491,81 @@ case class CarbonLoadDataCommand(
       LogicalRDD(attributes, rdd)(sparkSession)
 
     } else {
+      var timeStampformatString = carbonLoadModel.getTimestampformat
+      if (timeStampformatString.isEmpty) {
+        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+      }
+      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+      var dateFormatString = carbonLoadModel.getDateFormat
+      if (dateFormatString.isEmpty) {
+        dateFormatString = carbonLoadModel.getDefaultDateFormat
+      }
+      val dateFormat = new SimpleDateFormat(dateFormatString)
       // input data from csv files. Convert to logical plan
       CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
       hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
       val jobConf = new JobConf(hadoopConf)
       SparkHadoopUtil.get.addCredentials(jobConf)
-      val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
-        sparkSession.sparkContext,
-        classOf[CSVInputFormat],
-        classOf[NullWritable],
-        classOf[StringArrayWritable],
-        jobConf
-      ).map(f => InternalRow.fromSeq(f._2.get().map(UTF8String.fromString)))
-
       val attributes =
         StructType(carbonLoadModel.getCsvHeaderColumns.map(
           StructField(_, StringType))).toAttributes
+      val rowDataTypes = attributes.map { attribute =>
+        relation.output.find(_.name.equalsIgnoreCase(attribute.name)) match {
+          case Some(attr) => attr.dataType
+          case _ => StringType
+        }
+      }
+      val len = rowDataTypes.length
+      val rdd =
+        new NewHadoopRDD[NullWritable, StringArrayWritable](
+          sparkSession.sparkContext,
+          classOf[CSVInputFormat],
+          classOf[NullWritable],
+          classOf[StringArrayWritable],
+          jobConf).map{ case (key, value) =>
+            val data = new Array[Any](len)
+            var i = 0
+            while (i < len) {
+              // TODO find a way to avoid double conversion of date and time.
+              data(i) = CarbonScalaUtil.convertToUTF8String(
+                value.get()(i),
+                rowDataTypes(i),
+                timeStampFormat,
+                dateFormat)
+              i = i + 1
+            }
+            InternalRow.fromSeq(data)
+        }
+
       // Only select the required columns
       Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
         LogicalRDD(attributes, rdd)(sparkSession))
     }
     val convertRelation = relation match {
       case l: LogicalRelation =>
-        convertToLogicalRelation(l, isOverwriteTable, carbonLoadModel, sparkSession)
+        convertToLogicalRelation(
+          l.catalogTable.get,
+          l.output,
+          l.relation.sizeInBytes,
+          isOverwriteTable,
+          carbonLoadModel,
+          sparkSession)
       case c: CatalogRelation =>
-        convertToLogicalRelation(c, isOverwriteTable, carbonLoadModel, sparkSession)
+        val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable(
+          "tableMeta",
+          relation).asInstanceOf[CatalogTable]
+        // TODO need to find a way to avoid double lookup
+        val sizeInBytes =
+          CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+            catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
+        val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
+        convertToLogicalRelation(
+          catalogTable,
+          c.output,
+          sizeInBytes,
+          isOverwriteTable,
+          carbonLoadModel,
+          sparkSession)
     }
     Dataset.ofRows(
       sparkSession,
@@ -516,74 +578,26 @@ case class CarbonLoadDataCommand(
   }
 
   private def convertToLogicalRelation(
-      relation: LogicalRelation,
+      catalogTable: CatalogTable,
+      output: Seq[Attribute],
+      sizeInBytes: Long,
       overWrite: Boolean,
       loadModel: CarbonLoadModel,
       sparkSession: SparkSession): LogicalRelation = {
-    val catalogTable = relation.catalogTable.get
     val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
     val metastoreSchema = StructType(StructType.fromAttributes(
-      relation.output).fields.map(_.copy(dataType = StringType)))
+      output).fields.map(_.copy(dataType = StringType)))
     val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
     val catalog = new CatalogFileIndex(
-      sparkSession, catalogTable, relation.relation.sizeInBytes)
+      sparkSession, catalogTable, sizeInBytes)
     if (lazyPruningEnabled) {
       catalog
     } else {
       catalog.filterPartitions(Nil) // materialize all the partitions in memory
     }
     val partitionSchema =
-      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f =>
-      metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
-
-
-    val dataSchema =
-      StructType(metastoreSchema
-        .filterNot(field => partitionSchema.contains(field.name)))
-    val options = new mutable.HashMap[String, String]()
-    options ++= catalogTable.storage.properties
-    options += (("overwrite", overWrite.toString))
-    options += (("onepass", loadModel.getUseOnePass.toString))
-    options += (("dicthost", loadModel.getDictionaryServerHost))
-    options += (("dictport", loadModel.getDictionaryServerPort.toString))
-    val hdfsRelation = HadoopFsRelation(
-      location = catalog,
-      partitionSchema = partitionSchema,
-      dataSchema = dataSchema,
-      bucketSpec = catalogTable.bucketSpec,
-      fileFormat = new CarbonFileFormat,
-      options = options.toMap)(sparkSession = sparkSession)
-
-    CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
-      hdfsRelation.schema.toAttributes,
-      Some(catalogTable))
-  }
-
-  private def convertToLogicalRelation(
-      relation: CatalogRelation,
-      overWrite: Boolean,
-      loadModel: CarbonLoadModel,
-      sparkSession: SparkSession): LogicalRelation = {
-    val catalogTable =
-      CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", relation).asInstanceOf[CatalogTable]
-    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
-    val metastoreSchema = StructType(StructType.fromAttributes(
-      relation.output).fields.map(_.copy(dataType = StringType)))
-    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
-    // TODO nedd to find a way to avoid double lookup
-    val sizeInBytes =
-      CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
-        catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
-    val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
-    if (lazyPruningEnabled) {
-      catalog
-    } else {
-      catalog.filterPartitions(Nil) // materialize all the partitions in memory
-    }
-    val partitionSchema =
-      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f =>
-        metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
-
+      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(field =>
+      metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
 
     val dataSchema =
       StructType(metastoreSchema

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index a42d974..f86b35f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
@@ -94,7 +95,9 @@ case class CarbonCreateTableCommand(
           val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
           sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
           val partitionInfo = tableInfo.getFactTable.getPartitionInfo
-          val partitionString = if (partitionInfo != null) {
+          val partitionString =
+            if (partitionInfo != null &&
+                partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
             s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
               _.getColumnName).mkString(",")})"
           } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 99ce7fa..3eed726 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -72,7 +72,6 @@ with Serializable {
     sparkSession.sessionState.conf.setConfString(
       "spark.sql.sources.commitProtocolClass",
       "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
-
     job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
     var table = CarbonEnv.getCarbonTable(
       TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
@@ -98,6 +97,9 @@ with Serializable {
       model,
       conf
     )
+    // Set the standard date/time format which supported by spark/hive.
+    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
     model.setPartitionId("0")
     model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
@@ -139,7 +141,7 @@ with Serializable {
       }
 
       override def getFileExtension(context: TaskAttemptContext): String = {
-        ".carbondata"
+        CarbonTablePath.CARBON_DATA_EXT
       }
 
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 7f9bdf7..1e71714 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.optimizer.{CarbonDecoderRelation, CarbonFilters}
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
@@ -60,8 +60,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           l,
           projects,
           filters,
-          (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan(
-            a.map(_.name).toArray, f), needDecoder)) :: Nil
+          (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan(
+            a.map(_.name).toArray, f, p), needDecoder)) :: Nil
       case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
         if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
             !CarbonDictionaryDecoder.
@@ -130,6 +130,44 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       table.carbonTable.getTableInfo.serialize())
   }
 
+  /**
+   * Converts to physical RDD of carbon after pushing down applicable filters.
+   * @param relation
+   * @param projects
+   * @param filterPredicates
+   * @param scanBuilder
+   * @return
+   */
+  private def pruneFilterProject(
+      relation: LogicalRelation,
+      projects: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Seq[Attribute], Array[Filter],
+        ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow]) = {
+    val names = relation.catalogTable.get.partitionColumnNames
+    // Get the current partitions from table.
+    var partitions: Seq[String] = null
+    if (names.nonEmpty) {
+      val partitionSet = AttributeSet(names
+        .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
+      val partitionKeyFilters =
+        ExpressionSet(ExpressionSet(filterPredicates).filter(_.references.subsetOf(partitionSet)))
+      partitions =
+        CarbonFilters.getPartitions(
+          partitionKeyFilters.toSeq,
+          SparkSession.getActiveSession.get,
+          relation.catalogTable.get.identifier)
+    }
+    pruneFilterProjectRaw(
+      relation,
+      projects,
+      filterPredicates,
+      partitions,
+      (requestedColumns, _, pushedFilters, a, p) => {
+        scanBuilder(requestedColumns, pushedFilters.toArray, a, p)
+      })
+  }
+
   private[this] def toCatalystRDD(
       relation: LogicalRelation,
       output: Seq[Attribute],
@@ -146,27 +184,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     }
   }
 
-  protected def pruneFilterProject(
-      relation: LogicalRelation,
-      projects: Seq[NamedExpression],
-      filterPredicates: Seq[Expression],
-      scanBuilder: (Seq[Attribute], Array[Filter],
-        ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
-    pruneFilterProjectRaw(
-      relation,
-      projects,
-      filterPredicates,
-      (requestedColumns, _, pushedFilters, a) => {
-        scanBuilder(requestedColumns, pushedFilters.toArray, a)
-      })
-  }
-
   protected def pruneFilterProjectRaw(
       relation: LogicalRelation,
       rawProjects: Seq[NamedExpression],
       filterPredicates: Seq[Expression],
+      partitions: Seq[String],
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
+        ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow]) = {
     val projects = rawProjects.map {p =>
       p.transform {
         case CustomDeterministicExpression(exp) => exp
@@ -263,6 +287,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       }
       val scan = getDataSourceScan(relation,
         updateProject,
+        partitions,
         scanBuilder,
         candidatePredicates,
         pushedFilters,
@@ -298,6 +323,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
       val scan = getDataSourceScan(relation,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]],
+        partitions,
         scanBuilder,
         candidatePredicates,
         pushedFilters,
@@ -311,10 +337,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     }
   }
 
-  def getDataSourceScan(relation: LogicalRelation,
+  private def getDataSourceScan(relation: LogicalRelation,
       output: Seq[Attribute],
+      partitions: Seq[String],
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference]) => RDD[InternalRow],
+        ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow],
       candidatePredicates: Seq[Expression],
       pushedFilters: Seq[Filter],
       metadata: Map[String, String],
@@ -325,14 +352,22 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         needDecoder.isEmpty) {
       BatchedDataSourceScanExec(
         output,
-        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+        scanBuilder(updateRequestedColumns,
+          candidatePredicates,
+          pushedFilters,
+          needDecoder,
+          partitions),
         relation.relation,
         getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,
         relation.catalogTable.map(_.identifier), relation)
     } else {
       RowDataSourceScanExec(output,
-        scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+        scanBuilder(updateRequestedColumns,
+          candidatePredicates,
+          pushedFilters,
+          needDecoder,
+          partitions),
         relation.relation,
         getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b8a02f39/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 7408594..a63b358 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -17,18 +17,14 @@
 
 package org.apache.spark.sql.optimizer
 
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.CastExpressionOptimization
-import org.apache.spark.sql.CarbonBoundReference
-import org.apache.spark.sql.CastExpr
-import org.apache.spark.sql.SparkUnknownExpression
-import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
+import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
@@ -395,4 +391,14 @@ object CarbonFilters {
       case _ => expressions
     }
   }
+
+  def getPartitions(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier): Seq[String] = {
+    val partitions =
+      sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters)
+    partitions.toList.flatMap { partition =>
+      partition.spec.seq.map{case (column, value) => column + "=" + value}
+    }.toSet.toSeq
+  }
 }


Mime
View raw message