carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/2] carbondata git commit: [CARBONDATA-2224][File Level Reader Support] Refactoring of #2055
Date Mon, 19 Mar 2018 14:16:03 GMT
[CARBONDATA-2224][File Level Reader Support] Refactoring of #2055

Review comment fixes and refactoring of #2055

This closes #2069


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/99766b8a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/99766b8a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/99766b8a

Branch: refs/heads/carbonfile
Commit: 99766b8af0b021a0a06ffa893d09cc82774f9c66
Parents: 7a124ec
Author: Ajantha-Bhat <ajanthabhat@gmail.com>
Authored: Fri Mar 16 16:06:04 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Mon Mar 19 22:15:45 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |   4 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   2 +-
 .../carbondata/examples/HadoopFileExample.scala |   8 +-
 .../hadoop/api/CarbonFileInputFormat.java       | 535 ++-----------------
 .../hadoop/api/CarbonInputFormat.java           | 534 ++++++++++++++++++
 .../hadoop/api/CarbonTableInputFormat.java      | 463 +---------------
 .../carbondata/hadoop/util/SchemaReader.java    |   8 +-
 ...FileInputFormatWithExternalCarbonTable.scala |  16 +-
 ...tCreateTableUsingSparkCarbonFileFormat.scala |  25 +-
 ...tSparkCarbonFileFormatWithSparkSession.scala |   4 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   6 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   6 +-
 .../spark/rdd/CarbonScanPartitionRDD.scala      |   4 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  60 +--
 .../org/apache/spark/util/PartitionUtils.scala  |   4 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |   4 +-
 .../command/mutation/DeleteExecution.scala      |   4 +-
 .../command/table/CarbonDropTableCommand.scala  |   2 +-
 .../datasources/SparkCarbonFileFormat.scala     |  15 +-
 .../sql/execution/strategy/DDLStrategy.scala    |   6 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   8 +-
 .../spark/sql/hive/CarbonSessionState.scala     |   2 +-
 .../spark/sql/hive/CarbonSessionState.scala     |   2 +-
 .../carbondata/streaming/StreamHandoffRDD.scala |  12 +-
 24 files changed, 669 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 278dc96..9e0d80a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -826,8 +826,8 @@ public class CarbonTable implements Serializable {
     return external != null && external.equalsIgnoreCase("true");
   }
 
-  public boolean isFileLevelExternalTable() {
-    String external = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal");
+  public boolean isFileLevelFormat() {
+    String external = tableInfo.getFactTable().getTableProperties().get("_filelevelformat");
     return external != null && external.equalsIgnoreCase("true");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ff49edf..855bdee 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2213,7 +2213,7 @@ public final class CarbonUtil {
    * @param schemaFilePath
    * @return
    */
-  public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable(
+  public static org.apache.carbondata.format.TableInfo inferSchema(
       String carbonDataFilePath, AbsoluteTableIdentifier absoluteTableIdentifier,
       boolean schemaExists) throws IOException {
     TBaseCreator createTBase = new ThriftReader.TBaseCreator() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
index d75abc2..465e660 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.examples
 
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.CarbonProjection
 
 // scalastyle:off println
@@ -34,9 +34,9 @@ object HadoopFileExample {
     projection.addColumn("c1")  // column c1
     projection.addColumn("c3")  // column c3
     val conf = new Configuration()
-    CarbonTableInputFormat.setColumnProjection(conf, projection)
-    CarbonTableInputFormat.setDatabaseName(conf, "default")
-    CarbonTableInputFormat.setTableName(conf, "carbon1")
+    CarbonInputFormat.setColumnProjection(conf, projection)
+    CarbonInputFormat.setDatabaseName(conf, "default")
+    CarbonInputFormat.setTableName(conf, "carbon1")
 
     val sc = spark.sparkContext
     val input = sc.newAPIHadoopFile(s"${ExampleUtils.storeLocation}/default/carbon1",

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index b86b1cc..ff532b7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -17,11 +17,8 @@
 
 package org.apache.carbondata.hadoop.api;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.LinkedList;
@@ -30,21 +27,11 @@ import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.exception.InvalidConfigurationException;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -52,241 +39,58 @@ import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeConverter;
-import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.hadoop.util.SchemaReader;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 
 /**
- * Input format of CarbonData file.
+ * InputFormat for reading carbondata files without table level metadata support,
+ * schema is inferred as following steps:
+ * 1. read from schema file is exists
+ * 2. read from data file footer
  *
  * @param <T>
  */
 @InterfaceAudience.User
 @InterfaceStability.Evolving
-public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable {
-
-  public static final String READ_SUPPORT_CLASS = "carbon.read.support.class";
-  // comma separated list of input segment numbers
-  public static final String INPUT_SEGMENT_NUMBERS =
-      "mapreduce.input.carboninputformat.segmentnumbers";
-  private static final String VALIDATE_INPUT_SEGMENT_IDs =
-      "mapreduce.input.carboninputformat.validsegments";
-  // comma separated list of input files
-  public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
-  private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
-  private static final Log LOG = LogFactory.getLog(CarbonFileInputFormat.class);
-  private static final String FILTER_PREDICATE =
-      "mapreduce.input.carboninputformat.filter.predicate";
-  private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
-  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
-  private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
-  private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
-  private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
-  public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
-  public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
-  private static final String PARTITIONS_TO_PRUNE =
-      "mapreduce.input.carboninputformat.partitions.to.prune";
-  public static final String UPADTE_T =
-      "mapreduce.input.carboninputformat.partitions.to.prune";
+public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Serializable {
 
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
 
-  /**
-   * Set the `tableInfo` in `configuration`
-   */
-  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
-      throws IOException {
-    if (null != tableInfo) {
-      configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
-    }
-  }
-
-  /**
-   * Get TableInfo object from `configuration`
-   */
-  private static TableInfo getTableInfo(Configuration configuration) throws IOException {
-    String tableInfoStr = configuration.get(TABLE_INFO);
-    if (tableInfoStr == null) {
-      return null;
+  protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+    CarbonTable carbonTableTemp;
+    if (carbonTable == null) {
+      // carbon table should be created either from deserialized table info (schema saved in
+      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+      TableInfo tableInfo = getTableInfo(configuration);
+      CarbonTable localCarbonTable;
+      if (tableInfo != null) {
+        localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      } else {
+        String schemaPath = CarbonTablePath
+            .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
+        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+          TableInfo tableInfoInfer =
+              SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
+          localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
+        } else {
+          localCarbonTable =
+              SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
+        }
+      }
+      this.carbonTable = localCarbonTable;
+      return localCarbonTable;
     } else {
-      TableInfo output = new TableInfo();
-      output.readFields(
-          new DataInputStream(
-              new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
-      return output;
-    }
-  }
-
-
-  public static void setTablePath(Configuration configuration, String tablePath) {
-    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
-  }
-
-  public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
-    configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
-  }
-
-
-  public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
-      throws IOException {
-    if (dataMapJob != null) {
-      String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
-      configuration.set(DATA_MAP_DSTR, toString);
-    }
-  }
-
-  public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
-    String jobString = configuration.get(DATA_MAP_DSTR);
-    if (jobString != null) {
-      return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
-    }
-    return null;
-  }
-
-  /**
-   * It sets unresolved filter expression.
-   *
-   * @param configuration
-   * @param filterExpression
-   */
-  public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
-    if (filterExpression == null) {
-      return;
-    }
-    try {
-      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
-      configuration.set(FILTER_PREDICATE, filterString);
-    } catch (Exception e) {
-      throw new RuntimeException("Error while setting filter expression to Job", e);
-    }
-  }
-
-  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
-    if (projection == null || projection.isEmpty()) {
-      return;
-    }
-    String[] allColumns = projection.getAllColumns();
-    StringBuilder builder = new StringBuilder();
-    for (String column : allColumns) {
-      builder.append(column).append(",");
-    }
-    String columnString = builder.toString();
-    columnString = columnString.substring(0, columnString.length() - 1);
-    configuration.set(COLUMN_PROJECTION, columnString);
-  }
-
-  public static String getColumnProjection(Configuration configuration) {
-    return configuration.get(COLUMN_PROJECTION);
-  }
-
-
-  /**
-   * Set list of segments to access
-   */
-  public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
-    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
-  }
-
-  /**
-   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
-   */
-  public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
-    String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
-    String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
-    String segmentNumbersFromProperty = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
-    if (!segmentNumbersFromProperty.trim().equals("*")) {
-      CarbonFileInputFormat
-          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
-    }
-  }
-
-  /**
-   * set list of segment to access
-   */
-  public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
-    configuration.set(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
-  }
-
-  /**
-   * get list of segment to access
-   */
-  public static boolean getValidateSegmentsToAccess(Configuration configuration) {
-    return configuration.get(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
-        .equalsIgnoreCase("true");
-  }
-
-  /**
-   * set list of partitions to prune
-   */
-  public static void setPartitionsToPrune(Configuration configuration,
-      List<PartitionSpec> partitions) {
-    if (partitions == null) {
-      return;
-    }
-    try {
-      String partitionString =
-          ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
-      configuration.set(PARTITIONS_TO_PRUNE, partitionString);
-    } catch (Exception e) {
-      throw new RuntimeException("Error while setting patition information to Job", e);
-    }
-  }
-
-  /**
-   * get list of partitions to prune
-   */
-  private static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
-      throws IOException {
-    String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
-    if (partitionString != null) {
-      return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
-    }
-    return null;
-  }
-
-  public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
-      throws IOException {
-    String tablePath = configuration.get(INPUT_DIR, "");
-    try {
-      return AbsoluteTableIdentifier
-          .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
-    } catch (InvalidConfigurationException e) {
-      throw new IOException(e);
+      carbonTableTemp = this.carbonTable;
+      return carbonTableTemp;
     }
   }
 
@@ -306,8 +110,6 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
-    //    TableDataMap blockletMap = DataMapStoreManager.getInstance()
-    //        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
 
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
       // get all valid segments and set them into the configuration
@@ -346,8 +148,6 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement
     return null;
   }
 
-
-
   /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
@@ -404,279 +204,4 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement
     }
     return result;
   }
-
-  protected Expression getFilterPredicates(Configuration configuration) {
-    try {
-      String filterExprString = configuration.get(FILTER_PREDICATE);
-      if (filterExprString == null) {
-        return null;
-      }
-      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
-      return (Expression) filter;
-    } catch (IOException e) {
-      throw new RuntimeException("Error while reading filter expression", e);
-    }
-  }
-
-  /**
-   * get data blocks of given segment
-   */
-  private List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
-      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
-      BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList) throws IOException {
-
-    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
-    QueryStatistic statistic = new QueryStatistic();
-
-    // get tokens for all the required FileSystem for table path
-    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
-        new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
-    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
-    DataMapExprWrapper dataMapExprWrapper =
-        DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
-    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
-    List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
-    List<ExtendedBlocklet> prunedBlocklets;
-    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
-      DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
-              segmentIds, partitionsToPrune,
-              BlockletDataMapFactory.class.getName());
-      prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
-      // Apply expression on the blocklets.
-      prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
-    } else {
-      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
-    }
-
-    List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
-    int partitionIndex = 0;
-    List<Integer> partitionIdList = new ArrayList<>();
-    if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
-      partitionIdList = partitionInfo.getPartitionIds();
-    }
-    for (ExtendedBlocklet blocklet : prunedBlocklets) {
-      long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
-          CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
-
-      // OldPartitionIdList is only used in alter table partition command because it change
-      // partition info first and then read data.
-      // For other normal query should use newest partitionIdList
-      if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
-        if (oldPartitionIdList != null) {
-          partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
-        } else {
-          partitionIndex = partitionIdList.indexOf((int)partitionId);
-        }
-      }
-      if (partitionIndex != -1) {
-        // matchedPartitions variable will be null in two cases as follows
-        // 1. the table is not a partition table
-        // 2. the table is a partition table, and all partitions are matched by query
-        // for partition table, the task id of carbaondata file name is the partition id.
-        // if this partition is not required, here will skip it.
-        if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
-          CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
-          if (inputSplit != null) {
-            resultFilterredBlocks.add(inputSplit);
-          }
-        }
-      }
-    }
-    statistic
-        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
-    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
-    return resultFilterredBlocks;
-  }
-
-  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
-    CarbonInputSplit split =
-        CarbonInputSplit.from(blocklet.getSegmentId(),
-            blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
-                blocklet.getLength(), blocklet.getLocations()),
-            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
-            blocklet.getDataMapWriterPath());
-    split.setDetailInfo(blocklet.getDetailInfo());
-    return split;
-  }
-
-  @Override
-  public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
-      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-    Configuration configuration = taskAttemptContext.getConfiguration();
-    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
-    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
-    return new CarbonRecordReader<T>(queryModel, readSupport);
-  }
-
-  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-      throws IOException {
-    Configuration configuration = taskAttemptContext.getConfiguration();
-    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
-    TableProvider tableProvider = new SingleTableProvider(carbonTable);
-
-    // query plan includes projection column
-    String projectionString = getColumnProjection(configuration);
-    String[] projectionColumnNames = null;
-    if (projectionString != null) {
-      projectionColumnNames = projectionString.split(",");
-    }
-    QueryModel queryModel = carbonTable.createQueryWithProjection(
-        projectionColumnNames, getDataTypeConverter(configuration));
-
-    // set the filter to the query model in order to filter blocklet before scan
-    Expression filter = getFilterPredicates(configuration);
-    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
-    // getAllMeasures returns list of visible and invisible columns
-    boolean[] isFilterMeasures =
-        new boolean[carbonTable.getAllMeasures().size()];
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
-        isFilterMeasures);
-    queryModel.setIsFilterDimensions(isFilterDimensions);
-    queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = CarbonInputFormatUtil
-        .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
-
-    // update the file level index store if there are invalid segment
-    if (inputSplit instanceof CarbonMultiBlockSplit) {
-      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
-      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
-      if (invalidSegments.size() > 0) {
-        queryModel.setInvalidSegmentIds(invalidSegments);
-      }
-      List<UpdateVO> invalidTimestampRangeList =
-          split.getAllSplits().get(0).getInvalidTimestampRange();
-      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
-        queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
-      }
-    }
-    return queryModel;
-  }
-
-  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
-    CarbonTable carbonTableTemp;
-    if (carbonTable == null) {
-      // carbon table should be created either from deserialized table info (schema saved in
-      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
-      TableInfo tableInfo = getTableInfo(configuration);
-      CarbonTable localCarbonTable;
-      if (tableInfo != null) {
-        localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
-      } else {
-        String schemaPath = CarbonTablePath
-            .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
-        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
-          TableInfo tableInfoInfer =
-              SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
-          localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
-        } else {
-          localCarbonTable =
-              SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
-        }
-      }
-      this.carbonTable = localCarbonTable;
-      return localCarbonTable;
-    } else {
-      carbonTableTemp = this.carbonTable;
-      return carbonTableTemp;
-    }
-  }
-
-
-  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
-    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
-    //By default it uses dictionary decoder read class
-    CarbonReadSupport<T> readSupport = null;
-    if (readSupportClass != null) {
-      try {
-        Class<?> myClass = Class.forName(readSupportClass);
-        Constructor<?> constructor = myClass.getConstructors()[0];
-        Object object = constructor.newInstance();
-        if (object instanceof CarbonReadSupport) {
-          readSupport = (CarbonReadSupport) object;
-        }
-      } catch (ClassNotFoundException ex) {
-        LOG.error("Class " + readSupportClass + "not found", ex);
-      } catch (Exception ex) {
-        LOG.error("Error while creating " + readSupportClass, ex);
-      }
-    } else {
-      readSupport = new DictionaryDecodeReadSupport<>();
-    }
-    return readSupport;
-  }
-
-  @Override
-  protected boolean isSplitable(JobContext context, Path filename) {
-    try {
-      // Don't split the file if it is local file system
-      FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
-      if (fileSystem instanceof LocalFileSystem) {
-        return false;
-      }
-    } catch (Exception e) {
-      return true;
-    }
-    return true;
-  }
-
-  /**
-   * return valid segment to access
-   */
-  private String[] getSegmentsToAccess(JobContext job) {
-    String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
-    if (segmentString.trim().isEmpty()) {
-      return new String[0];
-    }
-    return segmentString.split(",");
-  }
-
-  public static DataTypeConverter getDataTypeConverter(Configuration configuration)
-      throws IOException {
-    String converter = configuration.get(CARBON_CONVERTER);
-    if (converter == null) {
-      return new DataTypeConverterImpl();
-    }
-    return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
-  }
-
-  public static void setDatabaseName(Configuration configuration, String databaseName) {
-    if (null != databaseName) {
-      configuration.set(DATABASE_NAME, databaseName);
-    }
-  }
-
-  public static String getDatabaseName(Configuration configuration)
-      throws InvalidConfigurationException {
-    String databseName = configuration.get(DATABASE_NAME);
-    if (null == databseName) {
-      throw new InvalidConfigurationException("Database name is not set.");
-    }
-    return databseName;
-  }
-
-  public static void setTableName(Configuration configuration, String tableName) {
-    if (null != tableName) {
-      configuration.set(TABLE_NAME, tableName);
-    }
-  }
-
-  public static String getTableName(Configuration configuration)
-      throws InvalidConfigurationException {
-    String tableName = configuration.get(TABLE_NAME);
-    if (tableName == null) {
-      throw new InvalidConfigurationException("Table name is not set");
-    }
-    return tableName;
-  }
-
-  public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(
-      org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
-      throws IOException {
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
new file mode 100644
index 0000000..3cc9c5f
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.SingleTableProvider;
+import org.apache.carbondata.core.scan.filter.TableProvider;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+
+/**
+ * Base class for carbondata input format, there are two input format implementations:
+ * 1. CarbonFileInputFormat: for reading carbondata files without table level metadata support.
+ *
+ * 2. CarbonTableInputFormat: for reading carbondata files with table level metadata support,
+ * such as segment and explicit schema metadata.
+ *
+ * @param <T>
+ */
+public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
+  // comma separated list of input segment numbers
+  public static final String INPUT_SEGMENT_NUMBERS =
+      "mapreduce.input.carboninputformat.segmentnumbers";
+  private static final String VALIDATE_INPUT_SEGMENT_IDs =
+      "mapreduce.input.carboninputformat.validsegments";
+  // comma separated list of input files
+  private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
+  private static final Log LOG = LogFactory.getLog(CarbonInputFormat.class);
+  private static final String FILTER_PREDICATE =
+      "mapreduce.input.carboninputformat.filter.predicate";
+  private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
+  private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+  private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+  private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+  public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+  public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
+  private static final String PARTITIONS_TO_PRUNE =
+      "mapreduce.input.carboninputformat.partitions.to.prune";
+
+  /**
+   * Set the `tableInfo` in `configuration`
+   */
+  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
+      throws IOException {
+    if (null != tableInfo) {
+      configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
+    }
+  }
+
+  /**
+   * Get TableInfo object from `configuration`
+   */
+  protected static TableInfo getTableInfo(Configuration configuration) throws IOException {
+    String tableInfoStr = configuration.get(TABLE_INFO);
+    if (tableInfoStr == null) {
+      return null;
+    } else {
+      TableInfo output = new TableInfo();
+      output.readFields(new DataInputStream(
+          new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
+      return output;
+    }
+  }
+
+  /**
+   * Get the cached CarbonTable or create it by TableInfo in `configuration`
+   */
+  protected abstract CarbonTable getOrCreateCarbonTable(Configuration configuration)
+      throws IOException;
+
+  public static void setTablePath(Configuration configuration, String tablePath) {
+    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+  }
+
+  public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
+    configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
+  }
+
+  public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+      throws IOException {
+    if (dataMapJob != null) {
+      String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
+      configuration.set(DATA_MAP_DSTR, toString);
+    }
+  }
+
+  public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
+    String jobString = configuration.get(DATA_MAP_DSTR);
+    if (jobString != null) {
+      return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
+    }
+    return null;
+  }
+
+  /**
+   * It sets unresolved filter expression.
+   *
+   * @param configuration
+   * @param filterExpression
+   */
+  public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+    if (filterExpression == null) {
+      return;
+    }
+    try {
+      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+      configuration.set(FILTER_PREDICATE, filterString);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting filter expression to Job", e);
+    }
+  }
+
+  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+    if (projection == null || projection.isEmpty()) {
+      return;
+    }
+    String[] allColumns = projection.getAllColumns();
+    StringBuilder builder = new StringBuilder();
+    for (String column : allColumns) {
+      builder.append(column).append(",");
+    }
+    String columnString = builder.toString();
+    columnString = columnString.substring(0, columnString.length() - 1);
+    configuration.set(COLUMN_PROJECTION, columnString);
+  }
+
+  public static String getColumnProjection(Configuration configuration) {
+    return configuration.get(COLUMN_PROJECTION);
+  }
+
+  /**
+   * Set list of segments to access
+   */
+  public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
+    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
+  }
+
+  /**
+   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
+   */
+  public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
+    String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
+    String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
+    String segmentNumbersFromProperty = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
+    if (!segmentNumbersFromProperty.trim().equals("*")) {
+      CarbonInputFormat
+          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
+    }
+  }
+
+  /**
+   * set list of segment to access
+   */
+  public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
+    configuration.set(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
+  }
+
+  /**
+   * get list of segment to access
+   */
+  public static boolean getValidateSegmentsToAccess(Configuration configuration) {
+    return configuration.get(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
+        .equalsIgnoreCase("true");
+  }
+
+  /**
+   * set list of partitions to prune
+   */
+  public static void setPartitionsToPrune(Configuration configuration,
+      List<PartitionSpec> partitions) {
+    if (partitions == null) {
+      return;
+    }
+    try {
+      String partitionString =
+          ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
+      configuration.set(PARTITIONS_TO_PRUNE, partitionString);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting patition information to Job" + partitions, e);
+    }
+  }
+
+  /**
+   * get list of partitions to prune
+   */
+  public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
+      throws IOException {
+    String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
+    if (partitionString != null) {
+      return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
+    }
+    return null;
+  }
+
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+      throws IOException {
+    String tablePath = configuration.get(INPUT_DIR, "");
+    try {
+      return AbsoluteTableIdentifier
+          .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
+    } catch (InvalidConfigurationException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR
+   * are used to get table path to read.
+   *
+   * @param job
+   * @return List<InputSplit> list of CarbonInputSplit
+   * @throws IOException
+   */
+  @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
+
+  protected Expression getFilterPredicates(Configuration configuration) {
+    try {
+      String filterExprString = configuration.get(FILTER_PREDICATE);
+      if (filterExprString == null) {
+        return null;
+      }
+      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+      return (Expression) filter;
+    } catch (IOException e) {
+      throw new RuntimeException("Error while reading filter expression", e);
+    }
+  }
+
+  /**
+   * get data blocks of given segment
+   */
+  protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+      BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
+      List<Integer> oldPartitionIdList) throws IOException {
+
+    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+    QueryStatistic statistic = new QueryStatistic();
+
+    // get tokens for all the required FileSystem for table path
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    DataMapExprWrapper dataMapExprWrapper =
+        DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
+    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+    List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
+    List<ExtendedBlocklet> prunedBlocklets;
+    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
+      DistributableDataMapFormat datamapDstr =
+          new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, segmentIds,
+              partitionsToPrune, BlockletDataMapFactory.class.getName());
+      prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+      // Apply expression on the blocklets.
+      prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
+    } else {
+      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+    }
+
+    List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+    int partitionIndex = 0;
+    List<Integer> partitionIdList = new ArrayList<>();
+    if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+      partitionIdList = partitionInfo.getPartitionIds();
+    }
+    for (ExtendedBlocklet blocklet : prunedBlocklets) {
+      long partitionId = CarbonTablePath.DataFileUtil
+          .getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
+
+      // OldPartitionIdList is only used in alter table partition command because it change
+      // partition info first and then read data.
+      // For other normal query should use newest partitionIdList
+      if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+        if (oldPartitionIdList != null) {
+          partitionIndex = oldPartitionIdList.indexOf((int) partitionId);
+        } else {
+          partitionIndex = partitionIdList.indexOf((int) partitionId);
+        }
+      }
+      if (partitionIndex != -1) {
+        // matchedPartitions variable will be null in two cases as follows
+        // 1. the table is not a partition table
+        // 2. the table is a partition table, and all partitions are matched by query
+        // for partition table, the task id of carbaondata file name is the partition id.
+        // if this partition is not required, here will skip it.
+        if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
+          CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
+          if (inputSplit != null) {
+            resultFilterredBlocks.add(inputSplit);
+          }
+        }
+      }
+    }
+    statistic
+        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+    return resultFilterredBlocks;
+  }
+
+  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
+    CarbonInputSplit split = CarbonInputSplit
+        .from(blocklet.getSegmentId(), blocklet.getBlockletId(),
+            new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(),
+                blocklet.getLocations()),
+            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
+            blocklet.getDataMapWriterPath());
+    split.setDetailInfo(blocklet.getDetailInfo());
+    return split;
+  }
+
+  @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
+    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+    return new CarbonRecordReader<T>(queryModel, readSupport);
+  }
+
+  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
+    TableProvider tableProvider = new SingleTableProvider(carbonTable);
+
+    // query plan includes projection column
+    String projectionString = getColumnProjection(configuration);
+    String[] projectionColumnNames = null;
+    if (projectionString != null) {
+      projectionColumnNames = projectionString.split(",");
+    }
+    QueryModel queryModel = carbonTable
+        .createQueryWithProjection(projectionColumnNames, getDataTypeConverter(configuration));
+
+    // set the filter to the query model in order to filter blocklet before scan
+    Expression filter = getFilterPredicates(configuration);
+    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
+    // getAllMeasures returns list of visible and invisible columns
+    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
+    CarbonInputFormatUtil
+        .processFilterExpression(filter, carbonTable, isFilterDimensions, isFilterMeasures);
+    queryModel.setIsFilterDimensions(isFilterDimensions);
+    queryModel.setIsFilterMeasures(isFilterMeasures);
+    FilterResolverIntf filterIntf = CarbonInputFormatUtil
+        .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+
+    // update the file level index store if there are invalid segment
+    if (inputSplit instanceof CarbonMultiBlockSplit) {
+      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+      if (invalidSegments.size() > 0) {
+        queryModel.setInvalidSegmentIds(invalidSegments);
+      }
+      List<UpdateVO> invalidTimestampRangeList =
+          split.getAllSplits().get(0).getInvalidTimestampRange();
+      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+        queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+      }
+    }
+    return queryModel;
+  }
+
+  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+    //By default it uses dictionary decoder read class
+    CarbonReadSupport<T> readSupport = null;
+    if (readSupportClass != null) {
+      try {
+        Class<?> myClass = Class.forName(readSupportClass);
+        Constructor<?> constructor = myClass.getConstructors()[0];
+        Object object = constructor.newInstance();
+        if (object instanceof CarbonReadSupport) {
+          readSupport = (CarbonReadSupport) object;
+        }
+      } catch (ClassNotFoundException ex) {
+        LOG.error("Class " + readSupportClass + "not found", ex);
+      } catch (Exception ex) {
+        LOG.error("Error while creating " + readSupportClass, ex);
+      }
+    } else {
+      readSupport = new DictionaryDecodeReadSupport<>();
+    }
+    return readSupport;
+  }
+
+  @Override protected boolean isSplitable(JobContext context, Path filename) {
+    try {
+      // Don't split the file if it is local file system
+      FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+      if (fileSystem instanceof LocalFileSystem) {
+        return false;
+      }
+    } catch (Exception e) {
+      return true;
+    }
+    return true;
+  }
+
+  public static void setCarbonReadSupport(Configuration configuration,
+      Class<? extends CarbonReadSupport> readSupportClass) {
+    if (readSupportClass != null) {
+      configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
+    }
+  }
+
+  /**
+   * It is optional, if user does not set then it reads from store
+   *
+   * @param configuration
+   * @param converter is the Data type converter for different computing engine
+   * @throws IOException
+   */
+  public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter)
+      throws IOException {
+    if (null != converter) {
+      configuration.set(CARBON_CONVERTER,
+          ObjectSerializationUtil.convertObjectToString(converter));
+    }
+  }
+
+  public static DataTypeConverter getDataTypeConverter(Configuration configuration)
+      throws IOException {
+    String converter = configuration.get(CARBON_CONVERTER);
+    if (converter == null) {
+      return new DataTypeConverterImpl();
+    }
+    return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
+  }
+
+  public static void setDatabaseName(Configuration configuration, String databaseName) {
+    if (null != databaseName) {
+      configuration.set(DATABASE_NAME, databaseName);
+    }
+  }
+
+  public static String getDatabaseName(Configuration configuration)
+      throws InvalidConfigurationException {
+    String databseName = configuration.get(DATABASE_NAME);
+    if (null == databseName) {
+      throw new InvalidConfigurationException("Database name is not set.");
+    }
+    return databseName;
+  }
+
+  public static void setTableName(Configuration configuration, String tableName) {
+    if (null != tableName) {
+      configuration.set(TABLE_NAME, tableName);
+    }
+  }
+
+  public static String getTableName(Configuration configuration)
+      throws InvalidConfigurationException {
+    String tableName = configuration.get(TABLE_NAME);
+    if (tableName == null) {
+      throw new InvalidConfigurationException("Table name is not set");
+    }
+    return tableName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bcc487e..efe962d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -17,11 +17,8 @@
 
 package org.apache.carbondata.hadoop.api;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -31,21 +28,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -60,29 +50,15 @@ import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
 import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeConverter;
-import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.hadoop.util.SchemaReader;
 
 import org.apache.commons.logging.Log;
@@ -91,80 +67,38 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 
 /**
- * Input format of CarbonData file.
+ * InputFormat for reading carbondata files with table level metadata support,
+ * such as segment and explicit schema metadata.
  *
  * @param <T>
  */
-public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
+public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
   // comma separated list of input segment numbers
   public static final String INPUT_SEGMENT_NUMBERS =
       "mapreduce.input.carboninputformat.segmentnumbers";
-  private static final String VALIDATE_INPUT_SEGMENT_IDs =
-      "mapreduce.input.carboninputformat.validsegments";
   // comma separated list of input files
   public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
   private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
   private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
-  private static final String FILTER_PREDICATE =
-      "mapreduce.input.carboninputformat.filter.predicate";
-  private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
-  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
-  private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
   public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
-  private static final String PARTITIONS_TO_PRUNE =
-      "mapreduce.input.carboninputformat.partitions.to.prune";
-  public static final String UPADTE_T =
-      "mapreduce.input.carboninputformat.partitions.to.prune";
-
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
 
   /**
-   * Set the `tableInfo` in `configuration`
-   */
-  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
-      throws IOException {
-    if (null != tableInfo) {
-      configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
-    }
-  }
-
-  /**
-   * Get TableInfo object from `configuration`
-   */
-  private static TableInfo getTableInfo(Configuration configuration) throws IOException {
-    String tableInfoStr = configuration.get(TABLE_INFO);
-    if (tableInfoStr == null) {
-      return null;
-    } else {
-      TableInfo output = new TableInfo();
-      output.readFields(
-          new DataInputStream(
-              new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
-      return output;
-    }
-  }
-
-  /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+  protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
     if (carbonTable == null) {
       // carbon table should be created either from deserialized table info (schema saved in
       // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
@@ -183,150 +117,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     }
   }
 
-  public static void setTablePath(Configuration configuration, String tablePath) {
-    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
-  }
-
-  public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
-    configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
-  }
-
-
-  public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
-      throws IOException {
-    if (dataMapJob != null) {
-      String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
-      configuration.set(DATA_MAP_DSTR, toString);
-    }
-  }
-
-  private static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
-    String jobString = configuration.get(DATA_MAP_DSTR);
-    if (jobString != null) {
-      return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
-    }
-    return null;
-  }
-
-  /**
-   * It sets unresolved filter expression.
-   *
-   * @param configuration
-   * @param filterExpression
-   */
-  public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
-    if (filterExpression == null) {
-      return;
-    }
-    try {
-      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
-      configuration.set(FILTER_PREDICATE, filterString);
-    } catch (Exception e) {
-      throw new RuntimeException("Error while setting filter expression to Job", e);
-    }
-  }
-
-  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
-    if (projection == null || projection.isEmpty()) {
-      return;
-    }
-    String[] allColumns = projection.getAllColumns();
-    StringBuilder builder = new StringBuilder();
-    for (String column : allColumns) {
-      builder.append(column).append(",");
-    }
-    String columnString = builder.toString();
-    columnString = columnString.substring(0, columnString.length() - 1);
-    configuration.set(COLUMN_PROJECTION, columnString);
-  }
-
-  public static String getColumnProjection(Configuration configuration) {
-    return configuration.get(COLUMN_PROJECTION);
-  }
-
-  public static void setCarbonReadSupport(Configuration configuration,
-      Class<? extends CarbonReadSupport> readSupportClass) {
-    if (readSupportClass != null) {
-      configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
-    }
-  }
-
-  /**
-   * Set list of segments to access
-   */
-  public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
-    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
-  }
-
-  /**
-   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
-   */
-  public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
-    String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
-    String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
-    String segmentNumbersFromProperty = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
-    if (!segmentNumbersFromProperty.trim().equals("*")) {
-      CarbonTableInputFormat
-          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
-    }
-  }
-
-  /**
-   * set list of segment to access
-   */
-  public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
-    configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
-  }
-
-  /**
-   * get list of segment to access
-   */
-  public static boolean getValidateSegmentsToAccess(Configuration configuration) {
-    return configuration.get(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
-        .equalsIgnoreCase("true");
-  }
-
-  /**
-   * set list of partitions to prune
-   */
-  public static void setPartitionsToPrune(Configuration configuration,
-      List<PartitionSpec> partitions) {
-    if (partitions == null) {
-      return;
-    }
-    try {
-      String partitionString =
-          ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
-      configuration.set(PARTITIONS_TO_PRUNE, partitionString);
-    } catch (Exception e) {
-      throw new RuntimeException("Error while setting patition information to Job" + partitions, e);
-    }
-  }
-
-  /**
-   * get list of partitions to prune
-   */
-  public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
-      throws IOException {
-    String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
-    if (partitionString != null) {
-      return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
-    }
-    return null;
-  }
-
-  public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
-      throws IOException {
-    String tablePath = configuration.get(INPUT_DIR, "");
-    try {
-      return AbsoluteTableIdentifier
-          .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
-    } catch (InvalidConfigurationException e) {
-      throw new IOException(e);
-    }
-  }
-
   /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR
@@ -362,8 +152,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
 
-
-
       List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments());
       if (filteredSegmentToAccess.size() == 0) {
         return new ArrayList<>(0);
@@ -716,195 +504,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     return result;
   }
 
-  protected Expression getFilterPredicates(Configuration configuration) {
-    try {
-      String filterExprString = configuration.get(FILTER_PREDICATE);
-      if (filterExprString == null) {
-        return null;
-      }
-      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
-      return (Expression) filter;
-    } catch (IOException e) {
-      throw new RuntimeException("Error while reading filter expression", e);
-    }
-  }
-
-  /**
-   * get data blocks of given segment
-   */
-  private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
-      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
-      BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList) throws IOException {
-
-    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
-    QueryStatistic statistic = new QueryStatistic();
-
-    // get tokens for all the required FileSystem for table path
-    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
-        new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
-    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
-    DataMapExprWrapper dataMapExprWrapper =
-        DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
-    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
-    List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
-    List<ExtendedBlocklet> prunedBlocklets;
-    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
-      DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
-              segmentIds, partitionsToPrune,
-              BlockletDataMapFactory.class.getName());
-      prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
-      // Apply expression on the blocklets.
-      prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
-    } else {
-      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
-    }
-
-    List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
-    int partitionIndex = 0;
-    List<Integer> partitionIdList = new ArrayList<>();
-    if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
-      partitionIdList = partitionInfo.getPartitionIds();
-    }
-    for (ExtendedBlocklet blocklet : prunedBlocklets) {
-      long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
-          CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
-
-      // OldPartitionIdList is only used in alter table partition command because it change
-      // partition info first and then read data.
-      // For other normal query should use newest partitionIdList
-      if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
-        if (oldPartitionIdList != null) {
-          partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
-        } else {
-          partitionIndex = partitionIdList.indexOf((int)partitionId);
-        }
-      }
-      if (partitionIndex != -1) {
-        // matchedPartitions variable will be null in two cases as follows
-        // 1. the table is not a partition table
-        // 2. the table is a partition table, and all partitions are matched by query
-        // for partition table, the task id of carbaondata file name is the partition id.
-        // if this partition is not required, here will skip it.
-        if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
-          CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
-          if (inputSplit != null) {
-            resultFilterredBlocks.add(inputSplit);
-          }
-        }
-      }
-    }
-    statistic
-        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
-    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
-    return resultFilterredBlocks;
-  }
-
-  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
-    org.apache.carbondata.hadoop.CarbonInputSplit split =
-        org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
-            blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
-                blocklet.getLength(), blocklet.getLocations()),
-            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
-            blocklet.getDataMapWriterPath());
-    split.setDetailInfo(blocklet.getDetailInfo());
-    return split;
-  }
-
-  @Override
-  public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
-      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-    Configuration configuration = taskAttemptContext.getConfiguration();
-    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
-    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
-    return new CarbonRecordReader<T>(queryModel, readSupport);
-  }
-
-  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-      throws IOException {
-    Configuration configuration = taskAttemptContext.getConfiguration();
-    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
-    TableProvider tableProvider = new SingleTableProvider(carbonTable);
-
-    // query plan includes projection column
-    String projectionString = getColumnProjection(configuration);
-    String[] projectionColumnNames = null;
-    if (projectionString != null) {
-      projectionColumnNames = projectionString.split(",");
-    }
-    QueryModel queryModel = carbonTable.createQueryWithProjection(
-        projectionColumnNames, getDataTypeConverter(configuration));
-
-    // set the filter to the query model in order to filter blocklet before scan
-    Expression filter = getFilterPredicates(configuration);
-    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
-    // getAllMeasures returns list of visible and invisible columns
-    boolean[] isFilterMeasures =
-        new boolean[carbonTable.getAllMeasures().size()];
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
-        isFilterMeasures);
-    queryModel.setIsFilterDimensions(isFilterDimensions);
-    queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = CarbonInputFormatUtil
-        .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
-
-    // update the file level index store if there are invalid segment
-    if (inputSplit instanceof CarbonMultiBlockSplit) {
-      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
-      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
-      if (invalidSegments.size() > 0) {
-        queryModel.setInvalidSegmentIds(invalidSegments);
-      }
-      List<UpdateVO> invalidTimestampRangeList =
-          split.getAllSplits().get(0).getInvalidTimestampRange();
-      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
-        queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
-      }
-    }
-    return queryModel;
-  }
-
-  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
-    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
-    //By default it uses dictionary decoder read class
-    CarbonReadSupport<T> readSupport = null;
-    if (readSupportClass != null) {
-      try {
-        Class<?> myClass = Class.forName(readSupportClass);
-        Constructor<?> constructor = myClass.getConstructors()[0];
-        Object object = constructor.newInstance();
-        if (object instanceof CarbonReadSupport) {
-          readSupport = (CarbonReadSupport) object;
-        }
-      } catch (ClassNotFoundException ex) {
-        LOG.error("Class " + readSupportClass + "not found", ex);
-      } catch (Exception ex) {
-        LOG.error("Error while creating " + readSupportClass, ex);
-      }
-    } else {
-      readSupport = new DictionaryDecodeReadSupport<>();
-    }
-    return readSupport;
-  }
-
-  @Override
-  protected boolean isSplitable(JobContext context, Path filename) {
-    try {
-      // Don't split the file if it is local file system
-      FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
-      if (fileSystem instanceof LocalFileSystem) {
-        return false;
-      }
-    } catch (Exception e) {
-      return true;
-    }
-    return true;
-  }
-
   /**
    * return valid segment to access
    */
@@ -969,58 +568,4 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
 
     return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
   }
-
-  /**
-   * It is optional, if user does not set then it reads from store
-   *
-   * @param configuration
-   * @param converter is the Data type converter for different computing engine
-   * @throws IOException
-   */
-  public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter)
-      throws IOException {
-    if (null != converter) {
-      configuration.set(CARBON_CONVERTER,
-          ObjectSerializationUtil.convertObjectToString(converter));
-    }
-  }
-
-  public static DataTypeConverter getDataTypeConverter(Configuration configuration)
-      throws IOException {
-    String converter = configuration.get(CARBON_CONVERTER);
-    if (converter == null) {
-      return new DataTypeConverterImpl();
-    }
-    return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
-  }
-
-  public static void setDatabaseName(Configuration configuration, String databaseName) {
-    if (null != databaseName) {
-      configuration.set(DATABASE_NAME, databaseName);
-    }
-  }
-
-  public static String getDatabaseName(Configuration configuration)
-      throws InvalidConfigurationException {
-    String databseName = configuration.get(DATABASE_NAME);
-    if (null == databseName) {
-      throw new InvalidConfigurationException("Database name is not set.");
-    }
-    return databseName;
-  }
-
-  public static void setTableName(Configuration configuration, String tableName) {
-    if (null != tableName) {
-      configuration.set(TABLE_NAME, tableName);
-    }
-  }
-
-  public static String getTableName(Configuration configuration)
-      throws InvalidConfigurationException {
-    String tableName = configuration.get(TABLE_NAME);
-    if (tableName == null) {
-      throw new InvalidConfigurationException("Table name is not set");
-    }
-    return tableName;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index ab7c333..9df59e6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -87,11 +87,11 @@ public class SchemaReader {
     // Convert the ColumnSchema -> TableSchema -> TableInfo.
     // Return the TableInfo.
     org.apache.carbondata.format.TableInfo tableInfo =
-        CarbonUtil.inferSchemaFileExternalTable(identifier.getTablePath(), identifier, false);
+        CarbonUtil.inferSchema(identifier.getTablePath(), identifier, false);
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    TableInfo wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(tableInfo, identifier.getDatabaseName(),
-            identifier.getTableName(), identifier.getTablePath());
+    TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+        tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
+        identifier.getTablePath());
     return wrapperTableInfo;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 8b1f63f..7841a23 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -105,14 +105,14 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
   }
 
   //TO DO, need to remove segment dependency and tableIdentifier Dependency
-  test("read carbondata files (sdk Writer Output) using the Carbonfile ") {
+  test("read carbondata files (sdk Writer Output) using the carbonfile ") {
     buildTestData(false)
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
 
-    //new provider Carbonfile
+    //new provider carbonfile
     sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
          |'$writerPath' """.stripMargin)
 
     sql("Describe formatted sdkOutputTable").show(false)
@@ -152,7 +152,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
 
     //data source file format
     sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
          |'$writerPath' """.stripMargin)
 
     val exception = intercept[MalformedCarbonCommandException]
@@ -176,7 +176,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
 
     //data source file format
     sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
          |'$writerPath' """.stripMargin)
 
     //org.apache.spark.SparkException: Index file not present to read the carbondata file
@@ -192,7 +192,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
     cleanTestData()
   }
 
-
+  // TODO: Make the sparkCarbonFileFormat to work without index file
   test("Read sdk writer output file without Carbondata file should fail") {
     buildTestData(false)
     deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
@@ -202,7 +202,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
     val exception = intercept[Exception] {
       //    data source file format
     sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
          |'$writerPath' """.stripMargin)
     }
     assert(exception.getMessage()
@@ -225,7 +225,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
     val exception = intercept[Exception] {
       //data source file format
       sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
          |'$writerPath' """.stripMargin)
 
       sql("select * from sdkOutputTable").show(false)


Mime
View raw message