carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject carbondata git commit: [CARBONDATA-1418] Use CarbonTableInputFormat for creating the Splits and QueryModel
Date Tue, 29 Aug 2017 02:47:22 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master cac650f01 -> 9c83bd18e


[CARBONDATA-1418] Use CarbonTableInputFormat for creating the Splits and QueryModel

Refactored Code to use CarbonTableInputFormat for creating splits

This closes #1294


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9c83bd18
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9c83bd18
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9c83bd18

Branch: refs/heads/master
Commit: 9c83bd18e82de9734666cab297de7b21b5f517ac
Parents: cac650f
Author: Bhavya <bhavya@knoldus.com>
Authored: Mon Aug 28 23:21:58 2017 +0530
Committer: chenliang613 <chenliang613@apache.org>
Committed: Tue Aug 29 10:47:11 2017 +0800

----------------------------------------------------------------------
 .../carbondata/presto/CarbondataRecordSet.java  |  16 +-
 .../presto/CarbondataRecordSetProvider.java     | 265 ++------
 .../carbondata/presto/PrestoFilterUtil.java     | 239 +++++++
 .../presto/impl/CarbonLocalInputSplit.java      |  47 +-
 .../presto/impl/CarbonTableReader.java          | 676 +++----------------
 5 files changed, 435 insertions(+), 808 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index 435b008..4294403 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -33,6 +33,8 @@ import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.BatchResult;
 import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
 
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSession;
@@ -77,18 +79,12 @@ public class CarbondataRecordSet implements RecordSet {
    * get data blocks via Carbondata QueryModel API
    */
   @Override public RecordCursor cursor() {
-    List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
-
-    tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
-        split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
-        split.getLocalInputSplit().getLocations().toArray(new String[0]),
-        split.getLocalInputSplit().getLength(), new BlockletInfos(),
-        //blockletInfos,
-        ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()), null));
+    CarbonLocalInputSplit carbonLocalInputSplit = split.getLocalInputSplit();
+    List<CarbonInputSplit> splitList = new ArrayList<>(1);
+    splitList.add(CarbonLocalInputSplit.convertSplit(carbonLocalInputSplit));
+    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
     queryModel.setTableBlockInfos(tableBlockInfoList);
-
     queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
-
     try {
 
       Tuple3[] dict = readSupport

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 8aacf88..0c7b77f 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -17,45 +17,40 @@
 
 package org.apache.carbondata.presto;
 
-import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.service.impl.PathFactory;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
 import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.RecordSet;
 import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-import com.facebook.presto.spi.predicate.Domain;
-import com.facebook.presto.spi.predicate.Range;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.facebook.presto.spi.type.*;
 import com.google.common.collect.ImmutableList;
-import io.airlift.slice.Slice;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.expression.ColumnExpression;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.expression.LiteralExpression;
-import org.apache.carbondata.core.scan.expression.conditional.*;
-import org.apache.carbondata.core.scan.expression.logical.AndExpression;
-import org.apache.carbondata.core.scan.expression.logical.OrExpression;
-import org.apache.carbondata.core.scan.filter.SingleTableProvider;
-import org.apache.carbondata.core.scan.filter.TableProvider;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
 
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.carbondata.presto.Types.checkType;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
+import static org.apache.carbondata.presto.Types.checkType;
 
 public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
 
@@ -70,31 +65,19 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
 
   @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
       ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
-    requireNonNull(split, "split is null");
-    requireNonNull(columns, "columns is null");
 
     CarbondataSplit carbondataSplit =
         checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
-    checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+    checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
+        "split is not for this connector");
 
-    StringBuffer targetColsBuffer = new StringBuffer();
-    String targetCols = "";
+    CarbonProjection carbonProjection = new CarbonProjection();
     // Convert all columns handles
     ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
     for (ColumnHandle handle : columns) {
       handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
-      targetColsBuffer.append(((CarbondataColumnHandle) handle).getColumnName()).append(",");
-    }
-
-    // Build column projection(check the column order)
-    if (targetColsBuffer.length() > 0) {
-      targetCols = targetColsBuffer.substring(0, targetCols.length() - 1);
-    }
-    else
-    {
-      targetCols = null;
+      carbonProjection.addColumn(((CarbondataColumnHandle) handle).getColumnName());
     }
-    //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
 
     CarbonTableCacheModel tableCacheModel =
         carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
@@ -104,173 +87,47 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
 
     // Build Query Model
     CarbonTable targetTable = tableCacheModel.carbonTable;
-    CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
-    QueryModel queryModel =
-        QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(),
-            queryPlan, targetTable, new DataTypeConverterImpl());
 
-    // Push down filter
-    fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable);
-
-    // Return new record set
-    return new CarbondataRecordSet(targetTable, session, carbondataSplit,
-        handles.build(), queryModel);
-  }
-
-  // Build filter for QueryModel
-  private void fillFilter2QueryModel(QueryModel queryModel,
-      TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
-
-    //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
-
-    //Build Predicate Expression
-    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
-
-    Domain domain = null;
-
-    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
-
-      // Build ColumnExpresstion for Expresstion(Carbondata)
-      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
-      Type type = cdch.getColumnType();
-
-      DataType coltype = spi2CarbondataTypeMapper(cdch);
-      Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
-
-      domain = originalConstraint.getDomains().get().get(c);
-      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
-
-      if (domain.getValues().isNone()) {
-      }
-
-      if (domain.getValues().isAll()) {
-      }
-
-      List<Object> singleValues = new ArrayList<>();
-      List<Expression> disjuncts = new ArrayList<>();
-      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
-        if (range.isSingleValue()) {
-          singleValues.add(range.getLow().getValue());
-        } else {
-          List<Expression> rangeConjuncts = new ArrayList<>();
-          if (!range.getLow().isLowerUnbounded()) {
-            Object value = convertDataByType(range.getLow().getValue(), type);
-            switch (range.getLow().getBound()) {
-              case ABOVE:
-                if (type == TimestampType.TIMESTAMP) {
-                  //todo not now
-                } else {
-                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
-                          new LiteralExpression(value, coltype));
-                  rangeConjuncts.add(greater);
-                }
-                break;
-              case EXACTLY:
-                GreaterThanEqualToExpression greater =
-                        new GreaterThanEqualToExpression(colExpression,
-                                new LiteralExpression(value, coltype));
-                rangeConjuncts.add(greater);
-                break;
-              case BELOW:
-                throw new IllegalArgumentException("Low marker should never use BELOW bound");
-              default:
-                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
-            }
-          }
-          if (!range.getHigh().isUpperUnbounded()) {
-            Object value = convertDataByType(range.getHigh().getValue(), type);
-            switch (range.getHigh().getBound()) {
-              case ABOVE:
-                throw new IllegalArgumentException("High marker should never use ABOVE bound");
-              case EXACTLY:
-                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
-                        new LiteralExpression(value, coltype));
-                rangeConjuncts.add(less);
-                break;
-              case BELOW:
-                LessThanExpression less2 =
-                        new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
-                rangeConjuncts.add(less2);
-                break;
-              default:
-                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
-            }
-          }
-          disjuncts.addAll(rangeConjuncts);
-        }
-      }
-      if (singleValues.size() == 1) {
-        Expression ex = null;
-        if (coltype.equals(DataType.STRING)) {
-          ex = new EqualToExpression(colExpression,
-              new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
-        } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
-          Long value = (Long) singleValues.get(0) * 1000;
-          ex = new EqualToExpression(colExpression,
-              new LiteralExpression(value , coltype));
-        } else ex = new EqualToExpression(colExpression,
-            new LiteralExpression(singleValues.get(0), coltype));
-        filters.add(ex);
-      } else if (singleValues.size() > 1) {
-        ListExpression candidates = null;
-        List<Expression> exs = singleValues.stream().map((a) -> {
-          return new LiteralExpression(convertDataByType(a, type), coltype);
-        }).collect(Collectors.toList());
-        candidates = new ListExpression(exs);
-
-        if (candidates != null) filters.add(new InExpression(colExpression, candidates));
-      } else if (disjuncts.size() > 0) {
-        if (disjuncts.size() > 1) {
-          Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1));
-          if (disjuncts.size() > 2) {
-            for (int i = 2; i < disjuncts.size(); i++) {
-              filters.add(new AndExpression(finalFilters, disjuncts.get(i)));
-            }
-          }
-        } else if (disjuncts.size() == 1) filters.add(disjuncts.get(0));
-      }
+    QueryModel queryModel = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+      String carbonTablePath = PathFactory.getInstance()
+          .getCarbonTablePath(targetTable.getAbsoluteTableIdentifier().getStorePath(),
+              targetTable.getCarbonTableIdentifier(), null).getPath();
+
+      conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+      JobConf jobConf = new JobConf(conf);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(jobConf, tableCacheModel.carbonTable,
+              PrestoFilterUtil.getFilters(targetTable.getFactTableName().hashCode()),
+              carbonProjection);
+      TaskAttemptContextImpl hadoopAttemptContext =
+          new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
+      CarbonInputSplit carbonInputSplit =
+          CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit());
+      queryModel = carbonTableInputFormat.getQueryModel(carbonInputSplit, hadoopAttemptContext);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to get the Query Model ", e);
     }
-
-    Expression finalFilters;
-    List<Expression> tmp = filters.build();
-    if (tmp.size() > 1) {
-      finalFilters = new OrExpression(tmp.get(0), tmp.get(1));
-      if (tmp.size() > 2) {
-        for (int i = 2; i < tmp.size(); i++) {
-          finalFilters = new OrExpression(finalFilters, tmp.get(i));
-        }
-      }
-    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
-    else return;
-
-    TableProvider tableProvider = new SingleTableProvider(carbonTable);
-    // todo set into QueryModel
-    CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
-    queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil
-        .resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier(), tableProvider));
+    return new CarbondataRecordSet(targetTable, session, carbondataSplit, handles.build(),
+        queryModel);
   }
 
-  public static DataType spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
-    Type colType = carbondataColumnHandle.getColumnType();
-    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
-    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
-    else if (colType == IntegerType.INTEGER) return DataType.INT;
-    else if (colType == BigintType.BIGINT) return DataType.LONG;
-    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
-    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
-    else if (colType == DateType.DATE) return DataType.DATE;
-    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
-    else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
-        carbondataColumnHandle.getScale())) return DataType.DECIMAL;
-    else return DataType.STRING;
-  }
+  private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
+      CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) {
 
-  public Object convertDataByType(Object rawdata, Type type) {
-    if (type.equals(IntegerType.INTEGER)) return Integer.valueOf(rawdata.toString());
-    else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
-    else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
-    else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+    CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+    try {
+      CarbonTableInputFormat
+          .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create the CarbonTableInputFormat", e);
+    }
+    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
+    CarbonTableInputFormat.setColumnProjection(conf, projection);
 
-    return rawdata;
+    return format;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
new file mode 100644
index 0000000..f665d9d
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression;
+import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
+import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
+import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
+import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.BigintType;
+import com.facebook.presto.spi.type.BooleanType;
+import com.facebook.presto.spi.type.DateType;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.DoubleType;
+import com.facebook.presto.spi.type.IntegerType;
+import com.facebook.presto.spi.type.SmallintType;
+import com.facebook.presto.spi.type.TimestampType;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.VarcharType;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * PrestoFilterUtil create the carbonData Expression from the presto-domain
+ */
+public class PrestoFilterUtil {
+
+  private static Map<Integer, Expression> filterMap = new HashMap<>();
+
+  private static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
+    Type colType = carbondataColumnHandle.getColumnType();
+    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+    else if (colType == IntegerType.INTEGER) return DataType.INT;
+    else if (colType == BigintType.BIGINT) return DataType.LONG;
+    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+    else if (colType == DateType.DATE) return DataType.DATE;
+    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
+        carbondataColumnHandle.getScale())) return DataType.DECIMAL;
+    else return DataType.STRING;
+  }
+
+  /**
+   * Convert presto-TupleDomain predication into Carbon scan express condition
+   *
+   * @param originalConstraint presto-TupleDomain
+   * @return
+   */
+  static Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint) {
+    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+    Domain domain;
+
+    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+      // Build ColumnExpression for Expression(Carbondata)
+      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+      Type type = cdch.getColumnType();
+
+      DataType coltype = Spi2CarbondataTypeMapper(cdch);
+      Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+      domain = originalConstraint.getDomains().get().get(c);
+      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+      List<Object> singleValues = new ArrayList<>();
+      List<Expression> disjuncts = new ArrayList<>();
+      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+        if (range.isSingleValue()) {
+          Object value = ConvertDataByType(range.getLow().getValue(), type);
+          singleValues.add(value);
+        } else {
+          List<Expression> rangeConjuncts = new ArrayList<>();
+          if (!range.getLow().isLowerUnbounded()) {
+            Object value = ConvertDataByType(range.getLow().getValue(), type);
+            switch (range.getLow().getBound()) {
+              case ABOVE:
+                if (type == TimestampType.TIMESTAMP) {
+                  //todo not now
+                } else {
+                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+                      new LiteralExpression(value, coltype));
+                  rangeConjuncts.add(greater);
+                }
+                break;
+              case EXACTLY:
+                GreaterThanEqualToExpression greater =
+                    new GreaterThanEqualToExpression(colExpression,
+                        new LiteralExpression(value, coltype));
+                rangeConjuncts.add(greater);
+                break;
+              case BELOW:
+                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+            }
+          }
+          if (!range.getHigh().isUpperUnbounded()) {
+            Object value = ConvertDataByType(range.getHigh().getValue(), type);
+            switch (range.getHigh().getBound()) {
+              case ABOVE:
+                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+              case EXACTLY:
+                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+                    new LiteralExpression(value, coltype));
+                rangeConjuncts.add(less);
+                break;
+              case BELOW:
+                LessThanExpression less2 =
+                    new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                rangeConjuncts.add(less2);
+                break;
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+            }
+          }
+          disjuncts.addAll(rangeConjuncts);
+        }
+      }
+      if (singleValues.size() == 1) {
+        Expression ex;
+        if (coltype.equals(DataType.STRING)) {
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(singleValues.get(0), coltype));
+        } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
+          Long value = (Long) singleValues.get(0);
+          ex = new EqualToExpression(colExpression, new LiteralExpression(value, coltype));
+        } else ex = new EqualToExpression(colExpression,
+            new LiteralExpression(singleValues.get(0), coltype));
+        filters.add(ex);
+      } else if (singleValues.size() > 1) {
+        ListExpression candidates = null;
+        List<Expression> exs = singleValues.stream()
+            .map((a) -> new LiteralExpression(ConvertDataByType(a, type), coltype))
+            .collect(Collectors.toList());
+        candidates = new ListExpression(exs);
+
+        filters.add(new InExpression(colExpression, candidates));
+      } else if (disjuncts.size() > 0) {
+        if (disjuncts.size() > 1) {
+          Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1));
+          if (disjuncts.size() > 2) {
+            for (int i = 2; i < disjuncts.size(); i++) {
+              filters.add(new AndExpression(finalFilters, disjuncts.get(i)));
+            }
+          } else {
+            filters.add(finalFilters);
+          }
+        } else if (disjuncts.size() == 1) filters.add(disjuncts.get(0));
+      }
+    }
+
+    Expression finalFilters;
+    List<Expression> tmp = filters.build();
+    if (tmp.size() > 1) {
+      finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+      if (tmp.size() > 2) {
+        for (int i = 2; i < tmp.size(); i++) {
+          finalFilters = new OrExpression(finalFilters, tmp.get(i));
+        }
+      }
+    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+    else return null;
+    return finalFilters;
+  }
+
+  private static Object ConvertDataByType(Object rawdata, Type type) {
+    if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+    else if (type.equals(BigintType.BIGINT)) return rawdata;
+    else if (type.equals(VarcharType.VARCHAR)) {
+      if (rawdata instanceof Slice) {
+        return ((Slice) rawdata).toStringUtf8();
+      } else {
+        return rawdata;
+      }
+
+    } else if (type.equals(BooleanType.BOOLEAN)) return rawdata;
+    else if (type.equals(DateType.DATE)) {
+      Calendar c = Calendar.getInstance();
+      c.setTime(new Date(0));
+      c.add(Calendar.DAY_OF_YEAR, ((Long) rawdata).intValue());
+      Date date = c.getTime();
+      return date.getTime() * 1000;
+    }
+
+    return rawdata;
+  }
+
+  /**
+   * get the filters from key
+   */
+  static Expression getFilters(Integer key) {
+    return filterMap.get(key);
+  }
+
+  static void setFilter(Integer tableId, Expression filter) {
+    filterMap.put(tableId, filter);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index f0a8428..e209b97 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -19,9 +19,15 @@ package org.apache.carbondata.presto.impl;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
 
 import java.util.List;
 
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
 /**
  * CarbonLocalInputSplit represents a block, it contains a set of blocklet.
  */
@@ -34,6 +40,10 @@ public class CarbonLocalInputSplit {
   private long length; // the length of the block.
   private List<String> locations;// locations are the locations for different replicas.
   private short version;
+  private String[] deleteDeltaFiles;
+
+
+  private String detailInfo;
 
   /**
    * Number of BlockLets in a block
@@ -68,12 +78,29 @@ public class CarbonLocalInputSplit {
     return numberOfBlocklets;
   }
 
+  @JsonProperty public String[] getDeleteDeltaFiles() {
+    return deleteDeltaFiles;
+  }
+
+  @JsonProperty public String getDetailInfo() {
+    return detailInfo;
+  }
+
+  public void setDetailInfo(BlockletDetailInfo blockletDetailInfo) {
+    Gson gson = new Gson();
+    detailInfo = gson.toJson(blockletDetailInfo);
+
+  }
+
   @JsonCreator public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
       @JsonProperty("path") String path, @JsonProperty("start") long start,
       @JsonProperty("length") long length, @JsonProperty("locations") List<String> locations,
       @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
                                  @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
-      @JsonProperty("version") short version) {
+      @JsonProperty("version") short version,
+      @JsonProperty("deleteDeltaFiles") String[] deleteDeltaFiles,
+      @JsonProperty("detailInfo") String detailInfo
+      ) {
     this.path = path;
     this.start = start;
     this.length = length;
@@ -82,5 +109,23 @@ public class CarbonLocalInputSplit {
     this.numberOfBlocklets = numberOfBlocklets;
     //this.tableBlockInfo = tableBlockInfo;
     this.version = version;
+    this.deleteDeltaFiles = deleteDeltaFiles;
+    this.detailInfo = detailInfo;
+
   }
+
+  public static  CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit) {
+    CarbonInputSplit inputSplit = new CarbonInputSplit(carbonLocalInputSplit.getSegmentId(),
+        new Path(carbonLocalInputSplit.getPath()), carbonLocalInputSplit.getStart(),
+        carbonLocalInputSplit.getLength(), carbonLocalInputSplit.getLocations()
+        .toArray(new String[carbonLocalInputSplit.getLocations().size()]),
+        carbonLocalInputSplit.getNumberOfBlocklets(), ColumnarFormatVersion.valueOf(carbonLocalInputSplit.getVersion()),
+        carbonLocalInputSplit.getDeleteDeltaFiles());
+    Gson gson = new Gson();
+    BlockletDetailInfo blockletDetailInfo = gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class);
+    inputSplit.setDetailInfo(blockletDetailInfo);
+    return inputSplit;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c83bd18/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index d78f786..5c00026 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -20,105 +20,76 @@ package org.apache.carbondata.presto.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.DataRefNode;
-import org.apache.carbondata.core.datastore.DataRefNodeFinder;
-import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
-import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
-import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.block.BlockletInfos;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
-import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.SingleTableProvider;
-import org.apache.carbondata.core.scan.filter.TableProvider;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.service.impl.PathFactory;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.CacheClient;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
+import com.facebook.presto.hadoop.$internal.com.google.gson.Gson;
 import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.thrift.TBase;
 
 import static java.util.Objects.requireNonNull;
-import com.facebook.presto.spi.TableNotFoundException;
 
-/** CarbonTableReader will be a facade of these utils
- *
+/**
+ * CarbonTableReader will be a facade of these utils
  * 1:CarbonMetadata,(logic table)
  * 2:FileFactory, (physic table file)
  * 3:CarbonCommonFactory, (offer some )
  * 4:DictionaryFactory, (parse dictionary util)
- *
  * Currently, it is mainly used to parse metadata of tables under
  * the configured carbondata-store path and filter the relevant
  * input splits with given query predicates.
  */
 public class CarbonTableReader {
 
+  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
   private CarbonTableConfig config;
-
   /**
    * The names of the tables under the schema (this.carbonFileList).
    */
   private List<SchemaTableName> tableList;
-
   /**
    * carbonFileList represents the store path of the schema, which is configured as carbondata-store
    * in the CarbonData catalog file ($PRESTO_HOME$/etc/catalog/carbondata.properties).
    */
   private CarbonFile carbonFileList;
   private FileFactory.FileType fileType;
-
   /**
    * A cache for Carbon reader, with this cache,
    * metadata of a table is only read from file system once.
@@ -132,11 +103,12 @@ public class CarbonTableReader {
 
   /**
    * For presto worker node to initialize the metadata cache of a table.
+   *
    * @param table the name of the table and schema.
    * @return
    */
   public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
-    if (!cc.containsKey(table)) {
+    if (!cc.containsKey(table) || cc.get(table) == null) {
       // if this table is not cached, try to read the metadata of the table and cache it.
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
           FileFactory.class.getClassLoader())) {
@@ -153,31 +125,29 @@ public class CarbonTableReader {
       parseCarbonMetadata(table);
     }
 
-    if (cc.containsKey(table)) return cc.get(table);
-    else return null;
+    if (cc.containsKey(table)) {
+      return cc.get(table);
+    } else {
+      return null;
+    }
   }
 
   /**
    * Return the schema names under a schema store path (this.carbonFileList).
+   *
    * @return
    */
   public List<String> getSchemaNames() {
     return updateSchemaList();
   }
 
-  // default PathFilter, accepts files in carbondata format (with .carbondata extension).
-  private static final PathFilter DefaultFilter = new PathFilter() {
-    @Override public boolean accept(Path path) {
-      return CarbonTablePath.isCarbonDataFile(path.getName());
-    }
-  };
-
   /**
    * Get the CarbonFile instance which represents the store path in the configuration, and assign it to
    * this.carbonFileList.
+   *
    * @return
    */
-  public boolean updateCarbonFile() {
+  private boolean updateCarbonFile() {
     if (carbonFileList == null) {
       fileType = FileFactory.getFileType(config.getStorePath());
       try {
@@ -191,20 +161,20 @@ public class CarbonTableReader {
 
   /**
    * Return the schema names under a schema store path (this.carbonFileList).
+   *
    * @return
    */
-  public List<String> updateSchemaList() {
+  private List<String> updateSchemaList() {
     updateCarbonFile();
 
     if (carbonFileList != null) {
-      List<String> schemaList =
-          Stream.of(carbonFileList.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
-      return schemaList;
+      return Stream.of(carbonFileList.listFiles()).map(CarbonFile::getName).collect(Collectors.toList());
     } else return ImmutableList.of();
   }
 
   /**
    * Get the names of the tables in the given schema.
+   *
    * @param schema name of the schema
    * @return
    */
@@ -215,20 +185,23 @@ public class CarbonTableReader {
 
   /**
    * Get the names of the tables in the given schema.
+   *
    * @param schemaName name of the schema
    * @return
    */
-  public Set<String> updateTableList(String schemaName) {
-    List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
-        .collect(Collectors.toList());
+  private Set<String> updateTableList(String schemaName) {
+    List<CarbonFile> schema =
+        Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
+            .collect(Collectors.toList());
     if (schema.size() > 0) {
-      return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
+      return Stream.of((schema.get(0)).listFiles()).map(CarbonFile::getName)
           .collect(Collectors.toSet());
     } else return ImmutableSet.of();
   }
 
   /**
    * Get the CarbonTable instance of the given table.
+   *
    * @param schemaTableName name of the given table.
    * @return
    */
@@ -250,7 +223,7 @@ public class CarbonTableReader {
    * and cache all the table names in this.tableList. Notice that whenever this method
    * is called, it clears this.tableList and populate the list by reading the files.
    */
-  public void updateSchemaTables() {
+  private void updateSchemaTables() {
     // update logic determine later
     if (carbonFileList == null) {
       updateSchemaList();
@@ -268,6 +241,7 @@ public class CarbonTableReader {
   /**
    * Find the table with the given name and build a CarbonTable instance for it.
    * This method should be called after this.updateSchemaTables().
+   *
    * @param schemaTableName name of the given table.
    * @return
    */
@@ -282,10 +256,11 @@ public class CarbonTableReader {
 
   /**
    * Read the metadata of the given table and cache it in this.cc (CarbonTableReader cache).
+   *
    * @param table name of the given table.
    * @return the CarbonTable instance which contains all the needed metadata for a table.
    */
-  public CarbonTable parseCarbonMetadata(SchemaTableName table) {
+  private CarbonTable parseCarbonMetadata(SchemaTableName table) {
     CarbonTable result = null;
     try {
       CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
@@ -295,14 +270,14 @@ public class CarbonTableReader {
 
       // Step 1: get store path of the table and cache it.
       String storePath = config.getStorePath();
-        // create table identifier. the table id is randomly generated.
+      // create table identifier. the table id is randomly generated.
       cache.carbonTableIdentifier =
           new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
               UUID.randomUUID().toString());
-        // get the store path of the table.
-      cache.carbonTablePath = PathFactory.getInstance()
-          .getCarbonTablePath(storePath, cache.carbonTableIdentifier, null);
-        // cache the table
+      // get the store path of the table.
+      cache.carbonTablePath =
+          PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier, null);
+      // cache the table
       cc.put(table, cache);
 
       //Step 2: read the metadata (tableInfo) of the table.
@@ -323,7 +298,7 @@ public class CarbonTableReader {
 
       // Step 3: convert format level TableInfo to code level TableInfo
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-        // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo.
+      // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo.
       TableInfo wrapperTableInfo = schemaConverter
           .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
               storePath);
@@ -344,542 +319,57 @@ public class CarbonTableReader {
     return result;
   }
 
-  /**
-   * Apply filters to the table and get valid input splits of the table.
-   * @param tableCacheModel the table
-   * @param filters the filters
-   * @return
-   * @throws Exception
-   */
-  public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
-      Expression filters) throws Exception {
-
-    // need apply filters to segment
-    FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
-    UpdateVO invalidBlockVOForSegmentId = null;
-    Boolean IUDTable = false;
-
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
-    CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
-    List<String> invalidSegments = new ArrayList<>();
-
-    // get all valid segments and set them into the configuration
-    SegmentUpdateStatusManager updateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-    SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
-        segmentStatusManager.getValidAndInvalidSegments();
-
-    tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
-    if (segments.getValidSegments().size() == 0) {
-      return new ArrayList<>(0);
-    }
-
-    // remove entry in the segment index if there are invalid segments
-    invalidSegments.addAll(segments.getInvalidSegments());
-    if (invalidSegments.size() > 0) {
-      List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
-          new ArrayList<>(invalidSegments.size());
-      for (String segId : invalidSegments) {
-        invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
-      }
-      cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
-    }
-
-    TableProvider tableProvider = new SingleTableProvider(tableCacheModel.carbonTable);
-
-    // get filter for segment
-    CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
-    FilterResolverIntf filterInterface = CarbonInputFormatUtil
-        .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier(),
-            tableProvider);
 
-    IUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+  public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters)  {
     List<CarbonLocalInputSplit> result = new ArrayList<>();
-    // for each segment fetch blocks matching filter in Driver BTree
-    for (String segmentNo : tableCacheModel.segments) {
-
-      if (IUDTable) {
-        // update not being performed on this table.
-        invalidBlockVOForSegmentId =
-            updateStatusManager.getInvalidTimestampRange(segmentNo);
-      }
-
-      try {
-        List<DataRefNode> dataRefNodes =
-            getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,
-                tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient,
-                updateStatusManager);
-        for (DataRefNode dataRefNode : dataRefNodes) {
-          BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
-          TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
-
-          if (IUDTable) {
-            if (CarbonUtil
-                .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
-                    invalidBlockVOForSegmentId, updateStatusManager)) {
-              continue;
-            }
-          }
-          result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
-              tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
-              Arrays.asList(tableBlockInfo.getLocations()),
-              tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
-              tableBlockInfo.getVersion().number()));
-        }
-      } catch (Exception ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-    cacheClient.close();
-    return result;
-  }
-
-  /**
-   * Get all the data blocks of a given segment.
-   * @param filterExpressionProcessor
-   * @param absoluteTableIdentifier
-   * @param tablePath
-   * @param resolver
-   * @param segmentId
-   * @param cacheClient
-   * @param updateStatusManager
-   * @return
-   * @throws IOException
-   */
-  private List<DataRefNode> getDataBlocksOfSegment(
-      FilterExpressionProcessor filterExpressionProcessor,
-      AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath,
-      FilterResolverIntf resolver, String segmentId, CacheClient cacheClient,
-      SegmentUpdateStatusManager updateStatusManager) throws IOException {
-    //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
-    //QueryStatistic statistic = new QueryStatistic();
-
-    // read segment index
-    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
-        getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
-            updateStatusManager);
-
-    List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
-
-    if (null != segmentIndexMap) {
-      // build result
-      for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
-        List<DataRefNode> filterredBlocks;
-        // if no filter is given, get all blocks from Btree Index
-        if (null == resolver) {
-          filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-        } else {
-          // apply filter and get matching blocks
-          filterredBlocks = filterExpressionProcessor
-              .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
-                  absoluteTableIdentifier);
-        }
-        resultFilterredBlocks.addAll(filterredBlocks);
-      }
-    }
-    //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
-    //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
-    return resultFilterredBlocks;
-  }
-
-  private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
-      UpdateVO updateDetails) {
-    Long refreshedTime = segmentTaskIndexWrapper.getRefreshedTimeStamp();
-    Long updateTime = updateDetails.getLatestUpdateTimestamp();
-    if (null != refreshedTime && null != updateTime && updateTime > refreshedTime) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Build and load the B-trees of the segment.
-   * @param absoluteTableIdentifier
-   * @param tablePath
-   * @param segmentId
-   * @param cacheClient
-   * @param updateStatusManager
-   * @return
-   * @throws IOException
-   */
-  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
-      AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId,
-      CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
-    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
-    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
-    UpdateVO updateDetails = null;
-    boolean isSegmentUpdated = false;
-    Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
-    TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
-        new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
-    segmentTaskIndexWrapper =
-        cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
-
-    // Until Updates or Deletes being performed on the table Invalid Blocks will not
-    // be formed. So it is unnecessary to get the InvalidTimeStampRange.
-    if (updateStatusManager.getUpdateStatusDetails().length != 0) {
-      updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
-    }
-
-    if (null != segmentTaskIndexWrapper) {
-      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
-      // IUD operations should be performed on the table in order to mark the segment as Updated.
-      // For Normal table no need to check for invalided blocks as there will be none of them.
-      if ((null != updateDetails) && isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
-        taskKeys = segmentIndexMap.keySet();
-        isSegmentUpdated = true;
-      }
-    }
-
-    // if segment tree is not loaded, load the segment tree
-    if (segmentIndexMap == null || isSegmentUpdated) {
-
-      List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
 
-      FileSystem fs =
-          getFileStatusOfSegments(new String[] { segmentId }, tablePath, fileStatusList);
-      List<InputSplit> splits = getSplit(fileStatusList, fs);
+    CarbonTable carbonTable = tableCacheModel.carbonTable;
+    TableInfo tableInfo = tableCacheModel.tableInfo;
+    Configuration config = new Configuration();
+    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+    String carbonTablePath = PathFactory.getInstance()
+        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier().getStorePath(),
+            carbonTable.getCarbonTableIdentifier(), null).getPath();
+    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
 
-      List<FileSplit> carbonSplits = new ArrayList<>();
-      for (InputSplit inputSplit : splits) {
-        FileSplit fileSplit = (FileSplit) inputSplit;
-        String segId = CarbonTablePath.DataPathUtil
-            .getSegmentId(fileSplit.getPath().toString());//这里的seperator应该怎么加??
-        if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
-          continue;
-        }
-        carbonSplits.add(fileSplit);
-      }
-
-      List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
-      for (FileSplit inputSplit : carbonSplits) {
-        if ((null == updateDetails) || (isValidBlockBasedOnUpdateDetails(
-            taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId))) {
-
-          BlockletInfos blockletInfos = new BlockletInfos(0, 0,
-              0);//this level we do not need blocklet info!!!! Is this a trick?
-          tableBlockInfoList.add(
-              new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), segmentId,
-                  inputSplit.getLocations(), inputSplit.getLength(), blockletInfos,
-                  ColumnarFormatVersion
-                      .valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//这里的null是否会异常?
+    try {
+      CarbonTableInputFormat.setTableInfo(config, tableInfo);
+      CarbonTableInputFormat carbonTableInputFormat =
+          createInputFormat(config, carbonTable.getAbsoluteTableIdentifier(), filters);
+      JobConf jobConf = new JobConf(config);
+      Job job = Job.getInstance(jobConf);
+      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
+      CarbonInputSplit carbonInputSplit = null;
+      Gson gson = new Gson();
+      if (splits != null && splits.size() > 0) {
+        for (InputSplit inputSplit : splits) {
+          carbonInputSplit = (CarbonInputSplit) inputSplit;
+          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
+              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
+              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
+              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
+              carbonInputSplit.getDeleteDeltaFiles(),
+              gson.toJson(carbonInputSplit.getDetailInfo())));
         }
       }
 
-      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
-      segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
-      // get Btree blocks for given segment
-      tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
-      tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
-      segmentTaskIndexWrapper =
-          cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
-      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+    } catch (IOException e) {
+      throw new RuntimeException("Error creating Splits from CarbonTableInputFormat", e);
     }
-    return segmentIndexMap;
-  }
-
-  private boolean isValidBlockBasedOnUpdateDetails(
-      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
-      UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
-    String taskID = null;
-    if (null != carbonInputSplit) {
-      if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
-        return false;
-      }
-
-      if (null == taskKeys) {
-        return true;
-      }
-
-      taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
-      String bucketNo =
-          CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
 
-      SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
-          new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
-
-      String blockTimestamp = carbonInputSplit.getPath().getName()
-          .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
-              carbonInputSplit.getPath().getName().lastIndexOf('.'));
-      if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
-          && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
-        if (!taskKeys.contains(taskBucketHolder)) {
-          return true;
-        }
-      }
-    }
-    return false;
+    return result;
   }
 
-  /**
-   * Get the input splits of a set of carbondata files.
-   * @param fileStatusList the file statuses of the set of carbondata files.
-   * @param targetSystem hdfs FileSystem
-   * @return
-   * @throws IOException
-   */
-  private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)
+  private CarbonTableInputFormat<Object>  createInputFormat( Configuration conf, AbsoluteTableIdentifier identifier, Expression filterExpression)
       throws IOException {
+    CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+    CarbonTableInputFormat.setTablePath(conf,
+        identifier.appendWithLocalPrefix(identifier.getTablePath()));
+    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
 
-    Iterator split = fileStatusList.iterator();
-
-    List<InputSplit> splits = new ArrayList<>();
-
-    while (true) {
-      while (true) {
-        while (split.hasNext()) {
-          // file is a carbondata file
-          FileStatus file = (FileStatus) split.next();
-          Path path = file.getPath();
-          long length = file.getLen();
-          if (length != 0L) {
-            BlockLocation[] blkLocations;
-            if (file instanceof LocatedFileStatus) {
-              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
-            } else {
-              blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
-            }
-
-            if (this.isSplitable()) {
-              long blockSize1 = file.getBlockSize();
-              long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
-
-              long bytesRemaining;
-              int blkIndex;
-              for (
-                  bytesRemaining = length;
-                  (double) bytesRemaining / (double) splitSize > 1.1D;// when there are more than one splits left.
-                  bytesRemaining -= splitSize) {
-                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
-                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize,
-                    blkLocations[blkIndex].getHosts()));
-              }
-
-              if (bytesRemaining != 0L) {
-                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
-                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining,
-                    blkLocations[blkIndex].getHosts()));
-              }
-            } else {
-              splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
-                  blkLocations[0].getHosts()));
-            }
-          } else {
-            splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
-                new String[0]));
-          }
-        }
-        return splits;
-      }
-    }
-
-  }
-
-  private String[] getValidPartitions() {
-    //TODO: has to Identify partitions by partition pruning
-    return new String[] { "0" };
-  }
-
-  /**
-   * Get all file statuses of the carbondata files with a segmentId in segmentsToConsider
-   * under the tablePath, and add them to the result.
-   * @param segmentsToConsider
-   * @param tablePath
-   * @param result
-   * @return the FileSystem instance been used in this function.
-   * @throws IOException
-   */
-  private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath,
-      List<FileStatus> result) throws IOException {
-    String[] partitionsToConsider = getValidPartitions();
-    if (partitionsToConsider.length == 0) {
-      throw new IOException("No partitions/data found");
-    }
-
-    FileSystem fs = null;
-
-    //PathFilter inputFilter = getDataFileFilter(job);
-
-    // get tokens for all the required FileSystem for table path
-        /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
-                job.getConfiguration());*/
-
-    //get all data files of valid partitions and segments
-    for (int i = 0; i < partitionsToConsider.length; ++i) {
-      String partition = partitionsToConsider[i];
-
-      for (int j = 0; j < segmentsToConsider.length; ++j) {
-        String segmentId = segmentsToConsider[j];
-        Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
-
-        try {
-          Configuration conf = new Configuration();
-          fs = segmentPath.getFileSystem(conf);
-
-          RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
-          while (iter.hasNext()) {
-            LocatedFileStatus stat = iter.next();
-            if (DefaultFilter.accept(stat.getPath())) {
-              if (stat.isDirectory()) {
-                // DefaultFiler accepts carbondata files.
-                addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
-              } else {
-                result.add(stat);
-              }
-            }
-          }
-        } catch (Exception ex) {
-          System.out.println(ex.toString());
-        }
-      }
-    }
-    return fs;
-  }
-
-  /**
-   * Get the FileStatus of all carbondata files under the path recursively,
-   * and add the file statuses into the result
-   * @param result
-   * @param fs
-   * @param path
-   * @param inputFilter the filter used to determinate whether a path is a carbondata file
-   * @throws IOException
-   */
-  protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path,
-      PathFilter inputFilter) throws IOException {
-    RemoteIterator iter = fs.listLocatedStatus(path);
-
-    while (iter.hasNext()) {
-      LocatedFileStatus stat = (LocatedFileStatus) iter.next();
-      if (inputFilter.accept(stat.getPath())) {
-        if (stat.isDirectory()) {
-          this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
-        } else {
-          result.add(stat);
-        }
-      }
-    }
-
+    return format;
   }
 
-  /**
-   * Get the data blocks of a b tree. the root node of the b tree is abstractIndex.dataRefNode.
-   * BTreeNode is a sub class of DataRefNode.
-   * @param abstractIndex
-   * @return
-   */
-  private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
-    List<DataRefNode> blocks = new LinkedList<DataRefNode>();
-    SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
 
-    try {
-      IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
-      IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
-
-      // Add all blocks of btree into result
-      DataRefNodeFinder blockFinder =
-          new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize(),
-              segmentProperties.getNumberOfSortColumns(),
-              segmentProperties.getNumberOfNoDictSortColumns());
-      DataRefNode startBlock =
-          blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
-      DataRefNode endBlock =
-          blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
-      while (startBlock != endBlock) {
-        blocks.add(startBlock);
-        startBlock = startBlock.getNextDataRefNode();
-      }
-      blocks.add(endBlock);
-
-    } catch (KeyGenException e) {
-      System.out.println("Could not generate start key" + e.getMessage());
-    }
-    return blocks;
-  }
-
-  private boolean isSplitable() {
-    try {
-      // Don't split the file if it is local file system
-      if (this.fileType == FileFactory.FileType.LOCAL) {
-        return false;
-      }
-    } catch (Exception e) {
-      return true;
-    }
-    return true;
-  }
-
-  private long computeSplitSize(long blockSize, long minSize, long maxSize) {
-    return Math.max(minSize, Math.min(maxSize, blockSize));
-  }
-
-  private FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
-    return new FileSplit(file, start, length, hosts);
-  }
-
-  private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
-    for (int i = 0; i < blkLocations.length; i++) {
-      // is the offset inside this block?
-      if ((blkLocations[i].getOffset() <= offset) && (offset
-          < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
-        return i;
-      }
-    }
-    BlockLocation last = blkLocations[blkLocations.length - 1];
-    long fileLength = last.getOffset() + last.getLength() - 1;
-    throw new IllegalArgumentException("Offset " + offset +
-        " is outside of file (0.." +
-        fileLength + ")");
-  }
-
-  /**
-   * get total number of rows. for count(*)
-   *
-   * @throws IOException
-   * @throws IndexBuilderException
-   */
-  public long getRowCount() throws IOException, IndexBuilderException {
-    long rowCount = 0;
-        /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
-
-        // no of core to load the blocks in driver
-        //addSegmentsIfEmpty(job, absoluteTableIdentifier);
-        int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
-        try {
-            numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
-                    .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
-        } catch (NumberFormatException e) {
-            numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
-        }
-        // creating a thread pool
-        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
-        List<Future<Map<String, AbstractIndex>>> loadedBlocks =
-                new ArrayList<Future<Map<String, AbstractIndex>>>();
-        //for each segment fetch blocks matching filter in Driver BTree
-        for (String segmentNo : this.segmentList) {
-            // submitting the task
-            loadedBlocks
-                    .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo)));
-        }
-        threadPool.shutdown();
-        try {
-            threadPool.awaitTermination(1, TimeUnit.HOURS);
-        } catch (InterruptedException e) {
-            throw new IndexBuilderException(e);
-        }
-        try {
-            // adding all the rows of the blocks to get the total row
-            // count
-            for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
-                for (AbstractIndex abstractIndex : block.get().values()) {
-                    rowCount += abstractIndex.getTotalNumberOfRows();
-                }
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IndexBuilderException(e);
-        }*/
-    return rowCount;
-  }
-}
+}
\ No newline at end of file


Mime
View raw message