carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [29/56] [abbrv] incubator-carbondata git commit: Refactor org.carbondata.query package (#692)
Date Thu, 23 Jun 2016 14:16:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/filter/FilterUtil.java b/core/src/main/java/org/carbondata/scan/filter/FilterUtil.java
new file mode 100644
index 0000000..802682b
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/filter/FilterUtil.java
@@ -0,0 +1,1233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.scan.filter;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.cache.Cache;
+import org.carbondata.core.cache.CacheProvider;
+import org.carbondata.core.cache.CacheType;
+import org.carbondata.core.cache.dictionary.Dictionary;
+import org.carbondata.core.cache.dictionary.DictionaryChunksWrapper;
+import org.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.carbondata.core.cache.dictionary.ForwardDictionary;
+import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.carbondata.core.carbon.datastore.IndexKey;
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.carbon.metadata.datatype.DataType;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.keygenerator.KeyGenException;
+import org.carbondata.core.keygenerator.KeyGenerator;
+import org.carbondata.core.util.ByteUtil;
+import org.carbondata.core.util.CarbonProperties;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.core.util.CarbonUtilException;
+import org.carbondata.scan.executor.exception.QueryExecutionException;
+import org.carbondata.scan.expression.ColumnExpression;
+import org.carbondata.scan.expression.Expression;
+import org.carbondata.scan.expression.ExpressionResult;
+import org.carbondata.scan.expression.LiteralExpression;
+import org.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.carbondata.scan.filter.executer.AndFilterExecuterImpl;
+import org.carbondata.scan.filter.executer.ColGroupFilterExecuterImpl;
+import org.carbondata.scan.filter.executer.DimColumnExecuterFilterInfo;
+import org.carbondata.scan.filter.executer.ExcludeFilterExecuterImpl;
+import org.carbondata.scan.filter.executer.FilterExecuter;
+import org.carbondata.scan.filter.executer.IncludeFilterExecuterImpl;
+import org.carbondata.scan.filter.executer.OrFilterExecuterImpl;
+import org.carbondata.scan.filter.executer.RestructureFilterExecuterImpl;
+import org.carbondata.scan.filter.executer.RowLevelFilterExecuterImpl;
+import org.carbondata.scan.filter.executer.RowLevelRangeTypeExecuterFacory;
+import org.carbondata.scan.filter.intf.ExpressionType;
+import org.carbondata.scan.filter.intf.FilterExecuterType;
+import org.carbondata.scan.filter.intf.RowImpl;
+import org.carbondata.scan.filter.intf.RowIntf;
+import org.carbondata.scan.filter.resolver.FilterResolverIntf;
+import org.carbondata.scan.filter.resolver.RowLevelFilterResolverImpl;
+import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.carbondata.scan.util.DataTypeUtil;
+
+public final class FilterUtil {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(FilterUtil.class.getName());
+
+  private FilterUtil() {
+
+  }
+
+  /**
+   * Pattern used : Visitor Pattern
+   * Method will create filter executer tree based on the filter resolved tree,
+   * in this algorithm based on the resolver instance the executers will be visited
+   * and the resolved surrogates will be converted to keys
+   *
+   * @param filterExpressionResolverTree
+   * @param segmentProperties
+   * @return FilterExecuter instance
+   */
+  private static FilterExecuter createFilterExecuterTree(
+      FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties) {
+    FilterExecuterType filterExecuterType = filterExpressionResolverTree.getFilterExecuterType();
+    switch (filterExecuterType) {
+      case INCLUDE:
+        return getIncludeFilterExecuter(filterExpressionResolverTree.getDimColResolvedFilterInfo(),
+            segmentProperties);
+      case EXCLUDE:
+        return new ExcludeFilterExecuterImpl(
+            filterExpressionResolverTree.getDimColResolvedFilterInfo(),
+            segmentProperties.getDimensionKeyGenerator());
+      case OR:
+        return new OrFilterExecuterImpl(
+            createFilterExecuterTree(filterExpressionResolverTree.getLeft(), segmentProperties),
+            createFilterExecuterTree(filterExpressionResolverTree.getRight(), segmentProperties));
+      case AND:
+        return new AndFilterExecuterImpl(
+            createFilterExecuterTree(filterExpressionResolverTree.getLeft(), segmentProperties),
+            createFilterExecuterTree(filterExpressionResolverTree.getRight(), segmentProperties));
+      case RESTRUCTURE:
+        return new RestructureFilterExecuterImpl(
+            filterExpressionResolverTree.getDimColResolvedFilterInfo(),
+            segmentProperties.getDimensionKeyGenerator());
+      case ROWLEVEL_LESSTHAN:
+      case ROWLEVEL_LESSTHAN_EQUALTO:
+      case ROWLEVEL_GREATERTHAN_EQUALTO:
+      case ROWLEVEL_GREATERTHAN:
+        return RowLevelRangeTypeExecuterFacory
+            .getRowLevelRangeTypeExecuter(filterExecuterType, filterExpressionResolverTree);
+      case ROWLEVEL:
+      default:
+        return new RowLevelFilterExecuterImpl(
+            ((RowLevelFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getMsrColEvalutorInfoList(),
+            ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getFilterExpresion(),
+            ((RowLevelFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier());
+
+    }
+
+  }
+
+  /**
+   * It gives filter executer based on columnar or column group
+   *
+   * @param dimColResolvedFilterInfo
+   * @param segmentProperties
+   * @return
+   */
+  private static FilterExecuter getIncludeFilterExecuter(
+      DimColumnResolvedFilterInfo dimColResolvedFilterInfo, SegmentProperties segmentProperties) {
+
+    if (dimColResolvedFilterInfo.getDimension().isColumnar()) {
+      return new IncludeFilterExecuterImpl(dimColResolvedFilterInfo, segmentProperties);
+    } else {
+      return new ColGroupFilterExecuterImpl(dimColResolvedFilterInfo, segmentProperties);
+    }
+  }
+
+  /**
+   * This method will check if a given expression contains a column expression
+   * recursively.
+   *
+   * @return
+   */
+  public static boolean checkIfExpressionContainsColumn(Expression expression) {
+    if (expression instanceof ColumnExpression) {
+      return true;
+    }
+    for (Expression child : expression.getChildren()) {
+      if (checkIfExpressionContainsColumn(child)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * This method will check if a given expression contains a column expression
+   * recursively.
+   *
+   * @return
+   */
+  public static boolean checkIfExpressionContainsUnknownExp(Expression expression) {
+    if (expression.getFilterExpressionType() == ExpressionType.UNKNOWN) {
+      return true;
+    }
+    for (Expression child : expression.getChildren()) {
+      if (checkIfExpressionContainsUnknownExp(child)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * method will get the masked keys based on the keys generated from surrogates.
+   *
+   * @param ranges
+   * @param key
+   * @return byte[]
+   */
+  private static byte[] getMaskedKey(int[] ranges, byte[] key) {
+    byte[] maskkey = new byte[ranges.length];
+
+    for (int i = 0; i < maskkey.length; i++) {
+      maskkey[i] = key[ranges[i]];
+    }
+    return maskkey;
+  }
+
+  /**
+   * This method will return the ranges for the masked Bytes based on the key
+   * Generator.
+   *
+   * @param queryDimensionsOrdinal
+   * @param generator
+   * @return
+   */
+  private static int[] getRangesForMaskedByte(int queryDimensionsOrdinal, KeyGenerator generator) {
+    Set<Integer> integers = new TreeSet<Integer>();
+    int[] range = generator.getKeyByteOffsets(queryDimensionsOrdinal);
+    for (int j = range[0]; j <= range[1]; j++) {
+      integers.add(j);
+    }
+
+    int[] byteIndexs = new int[integers.size()];
+    int j = 0;
+    for (Iterator<Integer> iterator = integers.iterator(); iterator.hasNext(); ) {
+      Integer integer = iterator.next();
+      byteIndexs[j++] = integer.intValue();
+    }
+    return byteIndexs;
+  }
+
+  /**
+   * This method will get the no dictionary data based on filters and same
+   * will be in DimColumnFilterInfo
+   *
+   * @param tableIdentifier
+   * @param columnExpression
+   * @param evaluateResultListFinal
+   * @param isIncludeFilter
+   * @return DimColumnFilterInfo
+   */
+  public static DimColumnFilterInfo getNoDictionaryValKeyMemberForFilter(
+      AbsoluteTableIdentifier tableIdentifier, ColumnExpression columnExpression,
+      List<String> evaluateResultListFinal, boolean isIncludeFilter) {
+    List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
+    for (String result : evaluateResultListFinal) {
+      filterValuesList.add(result.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+    }
+
+    Comparator<byte[]> filterNoDictValueComaparator = new Comparator<byte[]>() {
+
+      @Override public int compare(byte[] filterMember1, byte[] filterMember2) {
+        // TODO Auto-generated method stub
+        return ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterMember1, filterMember2);
+      }
+
+    };
+    Collections.sort(filterValuesList, filterNoDictValueComaparator);
+    DimColumnFilterInfo columnFilterInfo = null;
+    if (filterValuesList.size() > 0) {
+      columnFilterInfo = new DimColumnFilterInfo();
+      columnFilterInfo.setIncludeFilter(isIncludeFilter);
+      columnFilterInfo.setFilterListForNoDictionaryCols(filterValuesList);
+
+    }
+    return columnFilterInfo;
+  }
+
+  /**
+   * Method will prepare the  dimfilterinfo instance by resolving the filter
+   * expression value to its respective surrogates.
+   *
+   * @param tableIdentifier
+   * @param columnExpression
+   * @param evaluateResultList
+   * @param isIncludeFilter
+   * @return
+   * @throws QueryExecutionException
+   */
+  public static DimColumnFilterInfo getFilterValues(AbsoluteTableIdentifier tableIdentifier,
+      ColumnExpression columnExpression, List<String> evaluateResultList, boolean isIncludeFilter)
+      throws QueryExecutionException, FilterUnsupportedException {
+    Dictionary forwardDictionary = null;
+    try {
+      // Reading the dictionary value from cache.
+      forwardDictionary =
+          getForwardDictionaryCache(tableIdentifier, columnExpression.getDimension());
+      return getFilterValues(columnExpression, evaluateResultList, forwardDictionary,
+          isIncludeFilter);
+    } finally {
+      CarbonUtil.clearDictionaryCache(forwardDictionary);
+    }
+  }
+
+  /**
+   * Method will prepare the  dimfilterinfo instance by resolving the filter
+   * expression value to its respective surrogates.
+   *
+   * @param columnExpression
+   * @param evaluateResultList
+   * @param forwardDictionary
+   * @param isIncludeFilter
+   * @return
+   * @throws QueryExecutionException
+   */
+  private static DimColumnFilterInfo getFilterValues(ColumnExpression columnExpression,
+      List<String> evaluateResultList, Dictionary forwardDictionary, boolean isIncludeFilter)
+      throws QueryExecutionException {
+    sortFilterModelMembers(columnExpression, evaluateResultList);
+    List<Integer> surrogates =
+        new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // Reading the dictionary value from cache.
+    getDictionaryValue(evaluateResultList, forwardDictionary, surrogates);
+    Collections.sort(surrogates);
+    DimColumnFilterInfo columnFilterInfo = null;
+    if (surrogates.size() > 0) {
+      columnFilterInfo = new DimColumnFilterInfo();
+      columnFilterInfo.setIncludeFilter(isIncludeFilter);
+      columnFilterInfo.setFilterList(surrogates);
+    }
+    return columnFilterInfo;
+  }
+
+  /**
+   * This API will get the Dictionary value for the respective filter member
+   * string.
+   *
+   * @param evaluateResultList filter value
+   * @param surrogates
+   * @throws QueryExecutionException
+   */
+  private static void getDictionaryValue(List<String> evaluateResultList,
+      Dictionary forwardDictionary, List<Integer> surrogates) throws QueryExecutionException {
+    ((ForwardDictionary) forwardDictionary)
+        .getSurrogateKeyByIncrementalSearch(evaluateResultList, surrogates);
+  }
+
+  /**
+   * This method will get all the members of column from the forward dictionary
+   * cache, this method will be basically used in row level filter resolver.
+   *
+   * @param tableIdentifier
+   * @param expression
+   * @param columnExpression
+   * @param isIncludeFilter
+   * @return DimColumnFilterInfo
+   * @throws QueryExecutionException
+   */
+  public static DimColumnFilterInfo getFilterListForAllValues(
+      AbsoluteTableIdentifier tableIdentifier, Expression expression,
+      final ColumnExpression columnExpression, boolean isIncludeFilter)
+      throws FilterUnsupportedException {
+    Dictionary forwardDictionary = null;
+    List<String> evaluateResultListFinal = new ArrayList<String>(20);
+    DictionaryChunksWrapper dictionaryWrapper = null;
+    try {
+      forwardDictionary =
+          getForwardDictionaryCache(tableIdentifier, columnExpression.getDimension());
+      dictionaryWrapper = forwardDictionary.getDictionaryChunks();
+      while (dictionaryWrapper.hasNext()) {
+        byte[] columnVal = dictionaryWrapper.next();
+        try {
+          RowIntf row = new RowImpl();
+          String stringValue =
+              new String(columnVal, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+          if (stringValue.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+            stringValue = null;
+          }
+          row.setValues(new Object[] { DataTypeUtil.getDataBasedOnDataType(stringValue,
+              columnExpression.getCarbonColumn().getDataType()) });
+          Boolean rslt = expression.evaluate(row).getBoolean();
+          if (null != rslt && !(rslt ^ isIncludeFilter)) {
+            if (null == stringValue) {
+              evaluateResultListFinal.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
+            } else {
+              evaluateResultListFinal.add(stringValue);
+            }
+          }
+        } catch (FilterUnsupportedException e) {
+          LOGGER.audit(e.getMessage());
+          throw new FilterUnsupportedException(e.getMessage());
+        }
+      }
+      return getFilterValues(columnExpression, evaluateResultListFinal, forwardDictionary,
+          isIncludeFilter);
+    } catch (QueryExecutionException e) {
+      throw new FilterUnsupportedException(e.getMessage());
+    } finally {
+      CarbonUtil.clearDictionaryCache(forwardDictionary);
+    }
+  }
+
+  private static void sortFilterModelMembers(final ColumnExpression columnExpression,
+      List<String> evaluateResultListFinal) {
+    Comparator<String> filterActualValueComaparator = new Comparator<String>() {
+
+      @Override public int compare(String filterMember1, String filterMember2) {
+        return compareFilterMembersBasedOnActualDataType(filterMember1, filterMember2,
+            columnExpression.getDataType());
+      }
+
+    };
+    Collections.sort(evaluateResultListFinal, filterActualValueComaparator);
+  }
+
+  /**
+   * Metahod will resolve the filter member to its respective surrogates by
+   * scanning the dictionary cache.
+   *
+   * @param tableIdentifier
+   * @param expression
+   * @param columnExpression
+   * @param isIncludeFilter
+   * @return
+   * @throws QueryExecutionException
+   */
+  public static DimColumnFilterInfo getFilterList(AbsoluteTableIdentifier tableIdentifier,
+      Expression expression, ColumnExpression columnExpression, boolean isIncludeFilter)
+      throws QueryExecutionException {
+    DimColumnFilterInfo resolvedFilterObject = null;
+    List<String> evaluateResultListFinal = new ArrayList<String>(20);
+    try {
+      List<ExpressionResult> evaluateResultList = expression.evaluate(null).getList();
+      for (ExpressionResult result : evaluateResultList) {
+        if (result.getString() == null) {
+          evaluateResultListFinal.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
+          continue;
+        }
+        evaluateResultListFinal.add(result.getString());
+      }
+
+      if (null != columnExpression.getCarbonColumn() && !columnExpression.getCarbonColumn()
+          .hasEncoding(Encoding.DICTIONARY)) {
+        resolvedFilterObject =
+            getNoDictionaryValKeyMemberForFilter(tableIdentifier, columnExpression,
+                evaluateResultListFinal, isIncludeFilter);
+      } else {
+        resolvedFilterObject =
+            getFilterValues(tableIdentifier, columnExpression, evaluateResultListFinal,
+                isIncludeFilter);
+      }
+    } catch (FilterUnsupportedException e) {
+      LOGGER.audit(e.getMessage());
+    }
+    return resolvedFilterObject;
+  }
+
+  /**
+   * Method will prepare the  dimfilterinfo instance by resolving the filter
+   * expression value to its respective surrogates in the scenario of restructure.
+   *
+   * @param expression
+   * @param columnExpression
+   * @param defaultValues
+   * @param defaultSurrogate
+   * @return
+   */
+  public static DimColumnFilterInfo getFilterListForRS(Expression expression,
+      ColumnExpression columnExpression, String defaultValues, int defaultSurrogate) {
+    List<Integer> filterValuesList = new ArrayList<Integer>(20);
+    DimColumnFilterInfo columnFilterInfo = null;
+    // List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
+    List<String> evaluateResultListFinal = new ArrayList<String>(20);
+    // KeyGenerator keyGenerator =
+    // KeyGeneratorFactory.getKeyGenerator(new int[] { defaultSurrogate });
+    try {
+      List<ExpressionResult> evaluateResultList = expression.evaluate(null).getList();
+      for (ExpressionResult result : evaluateResultList) {
+        if (result.getString() == null) {
+          evaluateResultListFinal.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
+          continue;
+        }
+        evaluateResultListFinal.add(result.getString());
+      }
+
+      for (int i = 0; i < evaluateResultListFinal.size(); i++) {
+        if (evaluateResultListFinal.get(i).equals(defaultValues)) {
+          filterValuesList.add(defaultSurrogate);
+          break;
+        }
+      }
+      if (filterValuesList.size() > 0) {
+        columnFilterInfo = new DimColumnFilterInfo();
+        columnFilterInfo.setFilterList(filterValuesList);
+      }
+    } catch (FilterUnsupportedException e) {
+      LOGGER.audit(e.getMessage());
+    }
+    return columnFilterInfo;
+  }
+
+  /**
+   * This method will get the member based on filter expression evaluation from the
+   * forward dictionary cache, this method will be basically used in restructure.
+   *
+   * @param expression
+   * @param columnExpression
+   * @param defaultValues
+   * @param defaultSurrogate
+   * @param isIncludeFilter
+   * @return
+   */
+  public static DimColumnFilterInfo getFilterListForAllMembersRS(Expression expression,
+      ColumnExpression columnExpression, String defaultValues, int defaultSurrogate,
+      boolean isIncludeFilter) {
+    List<Integer> filterValuesList = new ArrayList<Integer>(20);
+    List<String> evaluateResultListFinal = new ArrayList<String>(20);
+    DimColumnFilterInfo columnFilterInfo = null;
+
+    // KeyGenerator keyGenerator =
+    // KeyGeneratorFactory.getKeyGenerator(new int[] { defaultSurrogate });
+    try {
+      RowIntf row = new RowImpl();
+      if (defaultValues.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+        defaultValues = null;
+      }
+      row.setValues(new Object[] { DataTypeUtil.getDataBasedOnDataType(defaultValues,
+          columnExpression.getCarbonColumn().getDataType()) });
+      Boolean rslt = expression.evaluate(row).getBoolean();
+      if (null != rslt && !(rslt ^ isIncludeFilter)) {
+        if (null == defaultValues) {
+          evaluateResultListFinal.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
+        } else {
+          evaluateResultListFinal.add(defaultValues);
+        }
+      }
+    } catch (FilterUnsupportedException e) {
+      LOGGER.audit(e.getMessage());
+    }
+
+    if (null == defaultValues) {
+      defaultValues = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+    }
+    columnFilterInfo = new DimColumnFilterInfo();
+    for (int i = 0; i < evaluateResultListFinal.size(); i++) {
+      if (evaluateResultListFinal.get(i).equals(defaultValues)) {
+        filterValuesList.add(defaultSurrogate);
+        break;
+      }
+    }
+    columnFilterInfo.setFilterList(filterValuesList);
+    return columnFilterInfo;
+  }
+
+  public static byte[][] getKeyArray(DimColumnFilterInfo dimColumnFilterInfo,
+      CarbonDimension carbonDimension, KeyGenerator blockLevelKeyGenerator) {
+    if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+      return dimColumnFilterInfo.getNoDictionaryFilterValuesList()
+          .toArray((new byte[dimColumnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
+    }
+    int[] keys = new int[blockLevelKeyGenerator.getDimCount()];
+    List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
+    Arrays.fill(keys, 0);
+    int[] rangesForMaskedByte =
+        getRangesForMaskedByte((carbonDimension.getKeyOrdinal()), blockLevelKeyGenerator);
+    if (null != dimColumnFilterInfo) {
+      for (Integer surrogate : dimColumnFilterInfo.getFilterList()) {
+        try {
+          keys[carbonDimension.getKeyOrdinal()] = surrogate;
+          filterValuesList
+              .add(getMaskedKey(rangesForMaskedByte, blockLevelKeyGenerator.generateKey(keys)));
+        } catch (KeyGenException e) {
+          LOGGER.error(e.getMessage());
+        }
+      }
+    }
+    return filterValuesList.toArray(new byte[filterValuesList.size()][]);
+
+  }
+
+  /**
+   * Method will return the start key based on KeyGenerator for the respective
+   * filter resolved instance.
+   *
+   * @param dimColResolvedFilterInfo
+   * @param segmentProperties
+   * @return long[] start key
+   */
+  public static long[] getStartKey(DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
+      SegmentProperties segmentProperties, long[] startKey) {
+    Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter =
+        dimColResolvedFilterInfo.getDimensionResolvedFilterInstance();
+    for (Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      List<DimColumnFilterInfo> values = entry.getValue();
+      if (null == values || !entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
+        continue;
+      }
+      boolean isExcludePresent = false;
+      for (DimColumnFilterInfo info : values) {
+        if (!info.isIncludeFilter()) {
+          isExcludePresent = true;
+        }
+      }
+      if (isExcludePresent) {
+        continue;
+      }
+      getStartKeyBasedOnFilterResoverInfo(dimensionFilter, startKey);
+    }
+    return startKey;
+  }
+
+  /**
+   * Algorithm for getting the start key for a filter
+   * step 1: Iterate through each dimension and verify whether its not an exclude filter.
+   * step 2: Intialize start key with the first filter member value present in each filter model
+   * for the respective dimensions.
+   * step 3: since its a no dictionary start key there will only actual value so compare
+   * the first filter model value with respect to the dimension data type.
+   * step 4: The least value will be considered as the start key of dimension by comparing all
+   * its filter model.
+   * step 5: create a byte array of start key which comprises of least filter member value of
+   * all dimension and the indexes which will help to read the respective filter value.
+   *
+   * @param dimColResolvedFilterInfo
+   * @param segmentProperties
+   * @param setOfStartKeyByteArray
+   * @return
+   */
+  public static void getStartKeyForNoDictionaryDimension(
+      DimColumnResolvedFilterInfo dimColResolvedFilterInfo, SegmentProperties segmentProperties,
+      SortedMap<Integer, byte[]> setOfStartKeyByteArray) {
+    Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter =
+        dimColResolvedFilterInfo.getDimensionResolvedFilterInstance();
+    // step 1
+    for (Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      if (!entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
+        List<DimColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
+        if (null == listOfDimColFilterInfo) {
+          continue;
+        }
+        boolean isExcludePresent = false;
+        for (DimColumnFilterInfo info : listOfDimColFilterInfo) {
+          if (!info.isIncludeFilter()) {
+            isExcludePresent = true;
+          }
+        }
+        if (isExcludePresent) {
+          continue;
+        }
+        // step 2
+        byte[] noDictionaryStartKey =
+            listOfDimColFilterInfo.get(0).getNoDictionaryFilterValuesList().get(0);
+        if (setOfStartKeyByteArray.isEmpty()) {
+          setOfStartKeyByteArray.put(entry.getKey().getOrdinal(), noDictionaryStartKey);
+        } else if (null == setOfStartKeyByteArray.get(entry.getKey().getOrdinal())) {
+          setOfStartKeyByteArray.put(entry.getKey().getOrdinal(), noDictionaryStartKey);
+
+        } else if (ByteUtil.UnsafeComparer.INSTANCE
+            .compareTo(setOfStartKeyByteArray.get(entry.getKey().getOrdinal()),
+                noDictionaryStartKey) > 0) {
+          setOfStartKeyByteArray.put(entry.getKey().getOrdinal(), noDictionaryStartKey);
+        }
+      }
+    }
+  }
+
+  /**
+   * Algorithm for getting the end key for a filter
+   * step 1: Iterate through each dimension and verify whether its not an exclude filter.
+   * step 2: Initialize end key with the last filter member value present in each filter model
+   * for the respective dimensions.(Already filter models are sorted)
+   * step 3: since its a no dictionary end key there will only actual value so compare
+   * the last filter model value with respect to the dimension data type.
+   * step 4: The highest value will be considered as the end key of dimension by comparing all
+   * its filter model.
+   * step 5: create a byte array of end key which comprises of highest filter member value of
+   * all dimension and the indexes which will help to read the respective filter value.
+   *
+   * @param dimColResolvedFilterInfo
+   * @param segmentProperties
+   * @param setOfEndKeyByteArray
+   * @return end key array
+   */
+  public static void getEndKeyForNoDictionaryDimension(
+      DimColumnResolvedFilterInfo dimColResolvedFilterInfo, SegmentProperties segmentProperties,
+      SortedMap<Integer, byte[]> setOfEndKeyByteArray) {
+
+    Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter =
+        dimColResolvedFilterInfo.getDimensionResolvedFilterInstance();
+    // step 1
+    for (Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      if (!entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
+        List<DimColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
+        if (null == listOfDimColFilterInfo) {
+          continue;
+        }
+        boolean isExcludePresent = false;
+        for (DimColumnFilterInfo info : listOfDimColFilterInfo) {
+          if (!info.isIncludeFilter()) {
+            isExcludePresent = true;
+          }
+        }
+        if (isExcludePresent) {
+          continue;
+        }
+        // step 2
+        byte[] noDictionaryEndKey = listOfDimColFilterInfo.get(0).getNoDictionaryFilterValuesList()
+            .get(listOfDimColFilterInfo.get(0).getNoDictionaryFilterValuesList().size() - 1);
+        if (setOfEndKeyByteArray.isEmpty()) {
+          setOfEndKeyByteArray.put(entry.getKey().getOrdinal(), noDictionaryEndKey);
+        } else if (null == setOfEndKeyByteArray.get(entry.getKey().getOrdinal())) {
+          setOfEndKeyByteArray.put(entry.getKey().getOrdinal(), noDictionaryEndKey);
+
+        } else if (ByteUtil.UnsafeComparer.INSTANCE
+            .compareTo(setOfEndKeyByteArray.get(entry.getKey().getOrdinal()), noDictionaryEndKey)
+            < 0) {
+          setOfEndKeyByteArray.put(entry.getKey().getOrdinal(), noDictionaryEndKey);
+        }
+
+      }
+    }
+  }
+
+  /**
+   * Method will pack all the byte[] to a single byte[] value by appending the
+   * indexes of the byte[] value which needs to be read. this method will be mailny used
+   * in case of no dictionary dimension processing for filters.
+   *
+   * @param noDictionaryValKeyList
+   * @return packed key with its indexes added in starting and its actual values.
+   */
+  private static byte[] getKeyWithIndexesAndValues(List<byte[]> noDictionaryValKeyList) {
+    ByteBuffer[] buffArr = new ByteBuffer[noDictionaryValKeyList.size()];
+    int index = 0;
+    for (byte[] singleColVal : noDictionaryValKeyList) {
+      buffArr[index] = ByteBuffer.allocate(singleColVal.length);
+      buffArr[index].put(singleColVal);
+      buffArr[index++].rewind();
+    }
+    // byteBufer.
+    return CarbonUtil.packByteBufferIntoSingleByteArray(buffArr);
+
+  }
+
+  /**
+   * This method will fill the start key array  with the surrogate key present
+   * in filterinfo instance.
+   *
+   * @param dimensionFilter
+   * @param startKey
+   */
+  private static void getStartKeyBasedOnFilterResoverInfo(
+      Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter, long[] startKey) {
+    for (Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      List<DimColumnFilterInfo> values = entry.getValue();
+      if (null == values) {
+        continue;
+      }
+      boolean isExcludePresent = false;
+      for (DimColumnFilterInfo info : values) {
+        if (!info.isIncludeFilter()) {
+          isExcludePresent = true;
+        }
+      }
+      if (isExcludePresent) {
+        continue;
+      }
+      for (DimColumnFilterInfo info : values) {
+        if (startKey[entry.getKey().getKeyOrdinal()] < info.getFilterList().get(0)) {
+          startKey[entry.getKey().getKeyOrdinal()] = info.getFilterList().get(0);
+        }
+      }
+    }
+  }
+
+  public static void getEndKey(Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+      AbsoluteTableIdentifier tableIdentifier, long[] endKey, SegmentProperties segmentProperties)
+      throws QueryExecutionException {
+
+    List<CarbonDimension> updatedDimListBasedOnKeyGenerator =
+        getCarbonDimsMappedToKeyGenerator(segmentProperties.getDimensions());
+    for (int i = 0; i < endKey.length; i++) {
+      endKey[i] = getMaxValue(tableIdentifier, updatedDimListBasedOnKeyGenerator.get(i),
+          segmentProperties.getDimColumnsCardinality());
+    }
+    getEndKeyWithFilter(dimensionFilter, endKey);
+
+  }
+
+  private static List<CarbonDimension> getCarbonDimsMappedToKeyGenerator(
+      List<CarbonDimension> carbonDimensions) {
+    List<CarbonDimension> listOfCarbonDimPartOfKeyGen =
+        new ArrayList<CarbonDimension>(carbonDimensions.size());
+    for (CarbonDimension carbonDim : carbonDimensions) {
+      if (CarbonUtil.hasEncoding(carbonDim.getEncoder(), Encoding.DICTIONARY) || CarbonUtil
+          .hasEncoding(carbonDim.getEncoder(), Encoding.DIRECT_DICTIONARY)) {
+        listOfCarbonDimPartOfKeyGen.add(carbonDim);
+      }
+
+    }
+    return listOfCarbonDimPartOfKeyGen;
+  }
+
+  private static void getEndKeyWithFilter(
+      Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter, long[] endKey) {
+    for (Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      List<DimColumnFilterInfo> values = entry.getValue();
+      if (null == values || !entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
+        continue;
+      }
+      boolean isExcludeFilterPresent = false;
+      for (DimColumnFilterInfo info : values) {
+        if (!info.isIncludeFilter()) {
+          isExcludeFilterPresent = true;
+        }
+      }
+      if (isExcludeFilterPresent) {
+        continue;
+      }
+
+      for (DimColumnFilterInfo info : values) {
+        if (endKey[entry.getKey().getKeyOrdinal()] > info.getFilterList()
+            .get(info.getFilterList().size() - 1)) {
+          endKey[entry.getKey().getKeyOrdinal()] =
+              info.getFilterList().get(info.getFilterList().size() - 1);
+        }
+      }
+    }
+
+  }
+
+  /**
+   * This API will get the max value of surrogate key which will be used for
+   * determining the end key of particular btree.
+   *
+   * @param dimCarinality
+   * @throws QueryExecutionException
+   */
+  private static long getMaxValue(AbsoluteTableIdentifier tableIdentifier,
+      CarbonDimension carbonDimension, int[] dimCarinality) throws QueryExecutionException {
+    //    if (DataType.TIMESTAMP == carbonDimension.getDataType()) {
+    //      return Integer.MAX_VALUE;
+    //    }
+    // Get data from all the available slices of the cube
+    if (null != dimCarinality) {
+      return dimCarinality[carbonDimension.getKeyOrdinal()];
+    }
+    return -1;
+  }
+
+  /**
+   * @param tableIdentifier
+   * @param carbonDimension
+   * @return
+   * @throws QueryExecutionException
+   */
+  public static Dictionary getForwardDictionaryCache(AbsoluteTableIdentifier tableIdentifier,
+      CarbonDimension carbonDimension) throws QueryExecutionException {
+    DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+        new DictionaryColumnUniqueIdentifier(tableIdentifier.getCarbonTableIdentifier(),
+            String.valueOf(carbonDimension.getColumnId()), carbonDimension.getDataType());
+    CacheProvider cacheProvider = CacheProvider.getInstance();
+    Cache forwardDictionaryCache =
+        cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, tableIdentifier.getStorePath());
+    // get the forward dictionary object
+    Dictionary forwardDictionary = null;
+    try {
+      forwardDictionary = (Dictionary) forwardDictionaryCache.get(dictionaryColumnUniqueIdentifier);
+    } catch (CarbonUtilException e) {
+      throw new QueryExecutionException(e);
+    }
+    return forwardDictionary;
+  }
+
+  public static IndexKey createIndexKeyFromResolvedFilterVal(long[] startOrEndKey,
+      KeyGenerator keyGenerator, byte[] startOrEndKeyForNoDictDimension) {
+    IndexKey indexKey = null;
+    try {
+      indexKey =
+          new IndexKey(keyGenerator.generateKey(startOrEndKey), startOrEndKeyForNoDictDimension);
+    } catch (KeyGenException e) {
+      LOGGER.error(e.getMessage());
+    }
+    return indexKey;
+  }
+
+  /**
+   * API will create an filter executer tree based on the filter resolver
+   *
+   * @param filterExpressionResolverTree
+   * @param segmentProperties
+   * @return
+   */
+  public static FilterExecuter getFilterExecuterTree(
+      FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties) {
+    return createFilterExecuterTree(filterExpressionResolverTree, segmentProperties);
+  }
+
+  /**
+   * API will prepare the Keys from the surrogates of particular filter resolver
+   *
+   * @param filterValues
+   * @param blockKeyGenerator
+   * @param dimension
+   * @param dimColumnExecuterInfo
+   */
+  public static void prepareKeysFromSurrogates(DimColumnFilterInfo filterValues,
+      KeyGenerator blockKeyGenerator, CarbonDimension dimension,
+      DimColumnExecuterFilterInfo dimColumnExecuterInfo) {
+    byte[][] keysBasedOnFilter = getKeyArray(filterValues, dimension, blockKeyGenerator);
+    dimColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
+
+  }
+
+  /**
+   * method will create a default end key in case of no end key is been derived using existing
+   * filter or in case of non filter queries.
+   *
+   * @param segmentProperties
+   * @return
+   * @throws KeyGenException
+   */
+  public static IndexKey prepareDefaultEndIndexKey(SegmentProperties segmentProperties)
+      throws KeyGenException {
+    long[] dictionarySurrogateKey =
+        new long[segmentProperties.getDimensions().size() - segmentProperties
+            .getNumberOfNoDictionaryDimension()];
+    Arrays.fill(dictionarySurrogateKey, Long.MAX_VALUE);
+    IndexKey endIndexKey;
+    byte[] dictionaryendMdkey =
+        segmentProperties.getDimensionKeyGenerator().generateKey(dictionarySurrogateKey);
+    byte[] noDictionaryEndKeyBuffer = getNoDictionaryDefaultEndKey(segmentProperties);
+    endIndexKey = new IndexKey(dictionaryendMdkey, noDictionaryEndKeyBuffer);
+    return endIndexKey;
+  }
+
+  public static byte[] getNoDictionaryDefaultEndKey(SegmentProperties segmentProperties) {
+    // in case of non filter query when no dictionary columns are present we
+    // need to set the default end key, as for non filter query
+    // we need to get the last
+    // block of the btree so we are setting the max byte value in the end key
+    ByteBuffer noDictionaryEndKeyBuffer = ByteBuffer.allocate(
+        (segmentProperties.getNumberOfNoDictionaryDimension()
+            * CarbonCommonConstants.SHORT_SIZE_IN_BYTE) + segmentProperties
+            .getNumberOfNoDictionaryDimension());
+    // end key structure will be
+    //<Offset of first No Dictionary key in 2 Bytes><Offset of second No Dictionary key in 2 Bytes>
+    //<Offset of n No Dictionary key in 2 Bytes><first no dictionary column value>
+    // <second no dictionary column value> <N no dictionary column value>
+    //example if we have 2 no dictionary column
+    //<[0,4,0,5,127,127]>
+    short startPoint = (short) (segmentProperties.getNumberOfNoDictionaryDimension()
+        * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    for (int i = 0; i < segmentProperties.getNumberOfNoDictionaryDimension(); i++) {
+      noDictionaryEndKeyBuffer.putShort((startPoint));
+      startPoint++;
+    }
+    for (int i = 0; i < segmentProperties.getNumberOfNoDictionaryDimension(); i++) {
+      noDictionaryEndKeyBuffer.put((byte) 127);
+    }
+    return noDictionaryEndKeyBuffer.array();
+  }
+
+  /**
+   * method will create a default end key in case of no end key is been
+   * derived using existing filter or in case of non filter queries.
+   *
+   * @param segmentProperties
+   * @return
+   * @throws KeyGenException
+   */
+  public static IndexKey prepareDefaultStartIndexKey(SegmentProperties segmentProperties)
+      throws KeyGenException {
+    IndexKey startIndexKey;
+    long[] dictionarySurrogateKey =
+        new long[segmentProperties.getDimensions().size() - segmentProperties
+            .getNumberOfNoDictionaryDimension()];
+    byte[] dictionaryStartMdkey =
+        segmentProperties.getDimensionKeyGenerator().generateKey(dictionarySurrogateKey);
+    byte[] noDictionaryStartKeyArray = getNoDictionaryDefaultStartKey(segmentProperties);
+
+    startIndexKey = new IndexKey(dictionaryStartMdkey, noDictionaryStartKeyArray);
+    return startIndexKey;
+  }
+
+  public static byte[] getNoDictionaryDefaultStartKey(SegmentProperties segmentProperties) {
+    // in case of non filter query when no dictionary columns are present we
+    // need to set the default start key, as for non filter query we need to get the first
+    // block of the btree so we are setting the least byte value in the start key
+    ByteBuffer noDictionaryStartKeyBuffer = ByteBuffer.allocate(
+        (segmentProperties.getNumberOfNoDictionaryDimension()
+            * CarbonCommonConstants.SHORT_SIZE_IN_BYTE) + segmentProperties
+            .getNumberOfNoDictionaryDimension());
+    // end key structure will be
+    //<Offset of first No Dictionary key in 2 Bytes><Offset of second No Dictionary key in 2 Bytes>
+    //<Offset of n No Dictionary key in 2 Bytes><first no dictionary column value>
+    // <second no dictionary column value> <N no dictionary column value>
+    //example if we have 2 no dictionary column
+    //<[0,4,0,5,0,0]>
+    short startPoint = (short) (segmentProperties.getNumberOfNoDictionaryDimension()
+        * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    for (int i = 0; i < segmentProperties.getNumberOfNoDictionaryDimension(); i++) {
+      noDictionaryStartKeyBuffer.putShort((startPoint));
+      startPoint++;
+    }
+    for (int i = 0; i < segmentProperties.getNumberOfNoDictionaryDimension(); i++) {
+      noDictionaryStartKeyBuffer.put((byte) 0);
+    }
+    return noDictionaryStartKeyBuffer.array();
+  }
+
+  public static int compareFilterKeyBasedOnDataType(String dictionaryVal, String memberVal,
+      DataType dataType) {
+    try {
+      switch (dataType) {
+        case INT:
+
+          return Integer.compare((Integer.parseInt(dictionaryVal)), (Integer.parseInt(memberVal)));
+        case DOUBLE:
+          return Double
+              .compare((Double.parseDouble(dictionaryVal)), (Double.parseDouble(memberVal)));
+        case LONG:
+          return Long.compare((Long.parseLong(dictionaryVal)), (Long.parseLong(memberVal)));
+        case BOOLEAN:
+          return Boolean
+              .compare((Boolean.parseBoolean(dictionaryVal)), (Boolean.parseBoolean(memberVal)));
+        case TIMESTAMP:
+          SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                  CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+          Date dateToStr;
+          Date dictionaryDate;
+          dateToStr = parser.parse(memberVal);
+          dictionaryDate = parser.parse(dictionaryVal);
+          return dictionaryDate.compareTo(dateToStr);
+
+        case DECIMAL:
+          java.math.BigDecimal javaDecValForDictVal = new java.math.BigDecimal(dictionaryVal);
+          java.math.BigDecimal javaDecValForMemberVal = new java.math.BigDecimal(memberVal);
+          return javaDecValForDictVal.compareTo(javaDecValForMemberVal);
+        default:
+          return -1;
+      }
+    } catch (Exception e) {
+      return -1;
+    }
+  }
+
+  /**
+   * method will set the start and end key for as per the filter resolver tree
+   * utilized visitor pattern inorder to populate the start and end key population.
+   *
+   * @param segmentProperties
+   * @param tableIdentifier
+   * @param filterResolver
+   * @param listOfStartEndKeys
+   * @throws QueryExecutionException
+   */
+  public static void traverseResolverTreeAndGetStartAndEndKey(SegmentProperties segmentProperties,
+      AbsoluteTableIdentifier tableIdentifier, FilterResolverIntf filterResolver,
+      List<IndexKey> listOfStartEndKeys) throws QueryExecutionException {
+    IndexKey searchStartKey = null;
+    IndexKey searchEndKey = null;
+    long[] startKey = new long[segmentProperties.getDimensionKeyGenerator().getDimCount()];
+    long[] endKey = new long[segmentProperties.getDimensionKeyGenerator().getDimCount()];
+    List<byte[]> listOfStartKeyByteArray =
+        new ArrayList<byte[]>(segmentProperties.getNumberOfNoDictionaryDimension());
+    List<byte[]> listOfEndKeyByteArray =
+        new ArrayList<byte[]>(segmentProperties.getNumberOfNoDictionaryDimension());
+    SortedMap<Integer, byte[]> setOfStartKeyByteArray = new TreeMap<Integer, byte[]>();
+    SortedMap<Integer, byte[]> setOfEndKeyByteArray = new TreeMap<Integer, byte[]>();
+    SortedMap<Integer, byte[]> defaultStartValues = new TreeMap<Integer, byte[]>();
+    SortedMap<Integer, byte[]> defaultEndValues = new TreeMap<Integer, byte[]>();
+    traverseResolverTreeAndPopulateStartAndEndKeys(filterResolver, tableIdentifier,
+        segmentProperties, startKey, setOfStartKeyByteArray, endKey, setOfEndKeyByteArray);
+    fillDefaultStartValue(defaultStartValues, segmentProperties);
+    fillDefaultEndValue(defaultEndValues, segmentProperties);
+    fillNullValuesStartIndexWithDefaultKeys(setOfStartKeyByteArray, segmentProperties);
+    fillNullValuesEndIndexWithDefaultKeys(setOfEndKeyByteArray, segmentProperties);
+    pruneStartAndEndKeys(setOfStartKeyByteArray, listOfStartKeyByteArray);
+    pruneStartAndEndKeys(setOfEndKeyByteArray, listOfEndKeyByteArray);
+
+    searchStartKey = FilterUtil
+        .createIndexKeyFromResolvedFilterVal(startKey, segmentProperties.getDimensionKeyGenerator(),
+            FilterUtil.getKeyWithIndexesAndValues(listOfStartKeyByteArray));
+
+    searchEndKey = FilterUtil
+        .createIndexKeyFromResolvedFilterVal(endKey, segmentProperties.getDimensionKeyGenerator(),
+            FilterUtil.getKeyWithIndexesAndValues(listOfEndKeyByteArray));
+    listOfStartEndKeys.add(searchStartKey);
+    listOfStartEndKeys.add(searchEndKey);
+
+  }
+
+  private static int compareFilterMembersBasedOnActualDataType(String filterMember1,
+      String filterMember2, org.carbondata.scan.expression.DataType dataType) {
+    try {
+      switch (dataType) {
+        case IntegerType:
+        case LongType:
+        case DoubleType:
+
+          if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(filterMember1)) {
+            return 1;
+          }
+          Double d1 = Double.parseDouble(filterMember1);
+          Double d2 = Double.parseDouble(filterMember2);
+          return d1.compareTo(d2);
+        case DecimalType:
+          if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(filterMember1)) {
+            return 1;
+          }
+          java.math.BigDecimal val1 = new BigDecimal(filterMember1);
+          java.math.BigDecimal val2 = new BigDecimal(filterMember2);
+          return val1.compareTo(val2);
+        case TimestampType:
+          if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(filterMember1)) {
+            return 1;
+          }
+          SimpleDateFormat parser = new SimpleDateFormat(CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                  CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+          Date date1 = null;
+          Date date2 = null;
+          date1 = parser.parse(filterMember1);
+          date2 = parser.parse(filterMember2);
+          return date1.compareTo(date2);
+        case StringType:
+        default:
+          return filterMember1.compareTo(filterMember2);
+      }
+    } catch (Exception e) {
+      return -1;
+    }
+  }
+
+  private static void fillNullValuesStartIndexWithDefaultKeys(
+      SortedMap<Integer, byte[]> setOfStartKeyByteArray, SegmentProperties segmentProperties) {
+    List<CarbonDimension> allDimension = segmentProperties.getDimensions();
+    for (CarbonDimension dimension : allDimension) {
+      if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
+        continue;
+      }
+      if (null == setOfStartKeyByteArray.get(dimension.getOrdinal())) {
+        setOfStartKeyByteArray.put(dimension.getOrdinal(), new byte[] { 0 });
+      }
+
+    }
+  }
+
+  private static void fillNullValuesEndIndexWithDefaultKeys(
+      SortedMap<Integer, byte[]> setOfStartKeyByteArray, SegmentProperties segmentProperties) {
+    List<CarbonDimension> allDimension = segmentProperties.getDimensions();
+    for (CarbonDimension dimension : allDimension) {
+      if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
+        continue;
+      }
+      if (null == setOfStartKeyByteArray.get(dimension.getOrdinal())) {
+        setOfStartKeyByteArray.put(dimension.getOrdinal(), new byte[] { 127 });
+      }
+
+    }
+  }
+
+  private static void pruneStartAndEndKeys(SortedMap<Integer, byte[]> setOfStartKeyByteArray,
+      List<byte[]> listOfStartKeyByteArray) {
+    for (Map.Entry<Integer, byte[]> entry : setOfStartKeyByteArray.entrySet()) {
+      listOfStartKeyByteArray.add(entry.getValue());
+    }
+  }
+
+  private static void fillDefaultStartValue(SortedMap<Integer, byte[]> setOfStartKeyByteArray,
+      SegmentProperties segmentProperties) {
+    List<CarbonDimension> allDimension = segmentProperties.getDimensions();
+    for (CarbonDimension dimension : allDimension) {
+      if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
+        continue;
+      }
+      setOfStartKeyByteArray.put(dimension.getOrdinal(), new byte[] { 0 });
+    }
+
+  }
+
+  private static void fillDefaultEndValue(SortedMap<Integer, byte[]> setOfEndKeyByteArray,
+      SegmentProperties segmentProperties) {
+    List<CarbonDimension> allDimension = segmentProperties.getDimensions();
+    for (CarbonDimension dimension : allDimension) {
+      if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
+        continue;
+      }
+      setOfEndKeyByteArray.put(dimension.getOrdinal(), new byte[] { 127 });
+    }
+  }
+
+  private static void traverseResolverTreeAndPopulateStartAndEndKeys(
+      FilterResolverIntf filterResolverTree, AbsoluteTableIdentifier tableIdentifier,
+      SegmentProperties segmentProperties, long[] startKeys,
+      SortedMap<Integer, byte[]> setOfStartKeyByteArray, long[] endKeys,
+      SortedMap<Integer, byte[]> setOfEndKeyByteArray) throws QueryExecutionException {
+    if (null == filterResolverTree) {
+      return;
+    }
+    traverseResolverTreeAndPopulateStartAndEndKeys(filterResolverTree.getLeft(), tableIdentifier,
+        segmentProperties, startKeys, setOfStartKeyByteArray, endKeys, setOfEndKeyByteArray);
+
+    filterResolverTree.getStartKey(segmentProperties, startKeys, setOfStartKeyByteArray);
+    filterResolverTree.getEndKey(segmentProperties, tableIdentifier, endKeys, setOfEndKeyByteArray);
+
+    traverseResolverTreeAndPopulateStartAndEndKeys(filterResolverTree.getRight(), tableIdentifier,
+        segmentProperties, startKeys, setOfStartKeyByteArray, endKeys, setOfEndKeyByteArray);
+  }
+
+  /**
+   * Method will find whether the expression needs to be resolved, this can happen
+   * if the expression is exclude and data type is null(mainly in IS NOT NULL filter scenario)
+   * @param rightExp
+   * @param isIncludeFilter
+   * @return
+   */
+  public static boolean isExpressionNeedsToResolved(Expression rightExp, boolean isIncludeFilter) {
+    if (!isIncludeFilter && rightExp instanceof LiteralExpression && (
+        org.carbondata.scan.expression.DataType.NullType == ((LiteralExpression) rightExp)
+            .getLiteralExpDataType())) {
+      return true;
+    }
+    for (Expression child : rightExp.getChildren()) {
+      if (isExpressionNeedsToResolved(child, isIncludeFilter)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/filter/GenericQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/filter/GenericQueryType.java b/core/src/main/java/org/carbondata/scan/filter/GenericQueryType.java
new file mode 100644
index 0000000..30bb6c2
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/filter/GenericQueryType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.scan.filter;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.carbondata.core.datastorage.store.columnar.ColumnarKeyStoreDataHolder;
+import org.carbondata.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.types.DataType;
+
+public interface GenericQueryType {
+
+  String getName();
+
+  void setName(String name);
+
+  String getParentname();
+
+  void setParentname(String parentname);
+
+  int getBlockIndex();
+
+  void setBlockIndex(int blockIndex);
+
+  void addChildren(GenericQueryType children);
+
+  void getAllPrimitiveChildren(List<GenericQueryType> primitiveChild);
+
+  int getSurrogateIndex();
+
+  void setSurrogateIndex(int surrIndex);
+
+  int getColsCount();
+
+  void setKeySize(int[] keyBlockSize);
+
+  int getKeyOrdinalForQuery();
+
+  void setKeyOrdinalForQuery(int keyOrdinalForQuery);
+
+  void parseBlocksAndReturnComplexColumnByteArray(
+      ColumnarKeyStoreDataHolder[] columnarKeyStoreDataHolder, int rowNumber,
+      DataOutputStream dataOutputStream) throws IOException;
+
+  DataType getSchemaType();
+
+  void parseAndGetResultBytes(ByteBuffer complexData, DataOutputStream dataOutput)
+      throws IOException;
+
+  void fillRequiredBlockData(BlocksChunkHolder blockChunkHolder);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/filter/executer/AndFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/filter/executer/AndFilterExecuterImpl.java b/core/src/main/java/org/carbondata/scan/filter/executer/AndFilterExecuterImpl.java
new file mode 100644
index 0000000..10ad66f
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/AndFilterExecuterImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+
+import org.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.carbondata.scan.processor.BlocksChunkHolder;
+
+public class AndFilterExecuterImpl implements FilterExecuter {
+
+  private FilterExecuter leftExecuter;
+  private FilterExecuter rightExecuter;
+
+  public AndFilterExecuterImpl(FilterExecuter leftExecuter, FilterExecuter rightExecuter) {
+    this.leftExecuter = leftExecuter;
+    this.rightExecuter = rightExecuter;
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    BitSet leftFilters = leftExecuter.applyFilter(blockChunkHolder);
+    if (leftFilters.isEmpty()) {
+      return leftFilters;
+    }
+    BitSet rightFilter = rightExecuter.applyFilter(blockChunkHolder);
+    if (rightFilter.isEmpty()) {
+      return rightFilter;
+    }
+    leftFilters.and(rightFilter);
+    return leftFilters;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet leftFilters = leftExecuter.isScanRequired(blockMaxValue, blockMinValue);
+    if (leftFilters.isEmpty()) {
+      return leftFilters;
+    }
+    BitSet rightFilter = rightExecuter.isScanRequired(blockMaxValue, blockMinValue);
+    if (rightFilter.isEmpty()) {
+      return rightFilter;
+    }
+    leftFilters.and(rightFilter);
+    return leftFilters;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/filter/executer/ColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/filter/executer/ColGroupFilterExecuterImpl.java b/core/src/main/java/org/carbondata/scan/filter/executer/ColGroupFilterExecuterImpl.java
new file mode 100644
index 0000000..187ed7a
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/ColGroupFilterExecuterImpl.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.filter.executer;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.keygenerator.KeyGenException;
+import org.carbondata.core.util.ByteUtil;
+import org.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.carbondata.scan.executor.util.QueryUtil;
+import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+
+/**
+ * It checks if filter is required on given block and if required, it does
+ * linear search on block data and set the bitset.
+ */
+public class ColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ColGroupFilterExecuterImpl.class.getName());
+
+  /**
+   * @param dimColResolvedFilterInfo
+   * @param segmentProperties
+   */
+  public ColGroupFilterExecuterImpl(DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
+      SegmentProperties segmentProperties) {
+    super(dimColResolvedFilterInfo, segmentProperties);
+  }
+
+  /**
+   * It fills BitSet with row index which matches filter key
+   */
+  protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+
+    try {
+      KeyStructureInfo keyStructureInfo = getKeyStructureInfo();
+      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+      for (int i = 0; i < filterValues.length; i++) {
+        byte[] filterVal = filterValues[i];
+        for (int rowId = 0; rowId < numerOfRows; rowId++) {
+          byte[] colData = new byte[keyStructureInfo.getMaskByteRanges().length];
+          dimensionColumnDataChunk.fillChunkData(colData, 0, rowId, keyStructureInfo);
+          if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, colData) == 0) {
+            bitSet.set(rowId);
+          }
+        }
+      }
+
+    } catch (Exception e) {
+      LOGGER.error(e);
+    }
+
+    return bitSet;
+  }
+
+  /**
+   * It is required for extracting column data from columngroup chunk
+   *
+   * @return
+   * @throws KeyGenException
+   */
+  private KeyStructureInfo getKeyStructureInfo() throws KeyGenException {
+    List<Integer> ordinals = new ArrayList<Integer>();
+    ordinals.add(dimColumnEvaluatorInfo.getColumnIndex());
+
+    int[] maskByteRanges = QueryUtil
+        .getMaskedByteRangeBasedOrdinal(ordinals, segmentProperties.getDimensionKeyGenerator());
+    byte[] maxKey =
+        QueryUtil.getMaxKeyBasedOnOrinal(ordinals, segmentProperties.getDimensionKeyGenerator());
+    int[] maksedByte = QueryUtil
+        .getMaskedByte(segmentProperties.getDimensionKeyGenerator().getKeySizeInBytes(),
+            maskByteRanges);
+    int blockMdkeyStartOffset = QueryUtil.getBlockMdKeyStartOffset(segmentProperties, ordinals);
+
+    KeyStructureInfo restructureInfos = new KeyStructureInfo();
+    restructureInfos.setKeyGenerator(segmentProperties.getDimensionKeyGenerator());
+    restructureInfos.setMaskByteRanges(maskByteRanges);
+    restructureInfos.setMaxKey(maxKey);
+    restructureInfos.setMaskedBytes(maksedByte);
+    restructureInfos.setBlockMdKeyStartOffset(blockMdkeyStartOffset);
+    return restructureInfos;
+  }
+
+  /**
+   * Check if scan is required on given block based on min and max value
+   */
+  public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+    int[] cols = getAllColumns(columnIndex);
+    byte[] maxValue = getMinMaxData(cols, blkMaxVal[blockIndex], columnIndex);
+    byte[] minValue = getMinMaxData(cols, blkMinVal[blockIndex], columnIndex);
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], maxValue);
+      // and filter-min should be positive
+      int minCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], minValue);
+
+      // if any filter value is in range than this block needs to be
+      // scanned
+      if (maxCompare <= 0 && minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+  /**
+   * It extract min and max data for given column from stored min max value
+   *
+   * @param cols
+   * @param minMaxData
+   * @param columnIndex
+   * @return
+   */
+  private byte[] getMinMaxData(int[] colGrpColumns, byte[] minMaxData, int columnIndex) {
+    int startIndex = 0;
+    int endIndex = 0;
+    for (int i = 0; i < colGrpColumns.length; i++) {
+      int[] byteRange =
+          segmentProperties.getDimensionKeyGenerator().getKeyByteOffsets(colGrpColumns[i]);
+      int colSize = 0;
+      for (int j = byteRange[0]; j <= byteRange[1]; j++) {
+        colSize++;
+      }
+      if (colGrpColumns[i] == columnIndex) {
+        endIndex = startIndex + colSize;
+        break;
+      }
+      startIndex += colSize;
+    }
+    byte[] data = new byte[endIndex - startIndex];
+    System.arraycopy(minMaxData, startIndex, data, 0, data.length);
+    return data;
+  }
+
+  /**
+   * It returns column groups which have provided column ordinal
+   *
+   * @param columnIndex
+   * @return column group array
+   */
+  private int[] getAllColumns(int columnIndex) {
+    int[][] colGroups = segmentProperties.getColumnGroups();
+    for (int i = 0; i < colGroups.length; i++) {
+      if (QueryUtil.searchInArray(colGroups[i], columnIndex)) {
+        return colGroups[i];
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/filter/executer/DimColumnExecuterFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/filter/executer/DimColumnExecuterFilterInfo.java b/core/src/main/java/org/carbondata/scan/filter/executer/DimColumnExecuterFilterInfo.java
new file mode 100644
index 0000000..a81b4a1
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/DimColumnExecuterFilterInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.filter.executer;
+
+public class DimColumnExecuterFilterInfo {
+
+  byte[][] filterKeys;
+
+  public void setFilterKeys(byte[][] filterKeys) {
+    this.filterKeys = filterKeys;
+  }
+
+  public byte[][] getFilterKeys() {
+    return filterKeys;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/carbondata/scan/filter/executer/ExcludeFilterExecuterImpl.java
new file mode 100644
index 0000000..6eb7d9a
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.carbondata.core.keygenerator.KeyGenerator;
+import org.carbondata.core.util.ByteUtil;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.scan.filter.FilterUtil;
+import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.carbondata.scan.processor.BlocksChunkHolder;
+
+public class ExcludeFilterExecuterImpl implements FilterExecuter {
+
+  DimColumnResolvedFilterInfo dimColEvaluatorInfo;
+  DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+
+  public ExcludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColEvaluatorInfo) {
+    this.dimColEvaluatorInfo = dimColEvaluatorInfo;
+  }
+
+  public ExcludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColEvaluatorInfo,
+      KeyGenerator blockKeyGenerator) {
+    this(dimColEvaluatorInfo);
+    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+    FilterUtil.prepareKeysFromSurrogates(dimColEvaluatorInfo.getFilterValues(), blockKeyGenerator,
+        dimColEvaluatorInfo.getDimension(), dimColumnExecuterInfo);
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) {
+    if (null == blockChunkHolder.getDimensionDataChunk()[dimColEvaluatorInfo.getColumnIndex()]) {
+      blockChunkHolder.getDataBlock().getDimensionChunk(blockChunkHolder.getFileReader(),
+          dimColEvaluatorInfo.getColumnIndex());
+    }
+    if (null == blockChunkHolder.getDimensionDataChunk()[dimColEvaluatorInfo.getColumnIndex()]) {
+      blockChunkHolder.getDimensionDataChunk()[dimColEvaluatorInfo.getColumnIndex()] =
+          blockChunkHolder.getDataBlock().getDimensionChunk(blockChunkHolder.getFileReader(),
+              dimColEvaluatorInfo.getColumnIndex());
+    }
+    return getFilteredIndexes(
+        blockChunkHolder.getDimensionDataChunk()[dimColEvaluatorInfo.getColumnIndex()],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimColumnDataChunk, int numerOfRows) {
+    // For high cardinality dimensions.
+    if (dimColumnDataChunk.getAttributes().isNoDictionary()
+        && dimColumnDataChunk instanceof VariableLengthDimensionDataChunk) {
+      return setDirectKeyFilterIndexToBitSet((VariableLengthDimensionDataChunk) dimColumnDataChunk,
+          numerOfRows);
+    }
+    if (null != dimColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimColumnDataChunk, numerOfRows);
+    }
+    return setFilterdIndexToBitSet((FixedLengthDimensionDataChunk) dimColumnDataChunk, numerOfRows);
+  }
+
+  private BitSet setDirectKeyFilterIndexToBitSet(
+      VariableLengthDimensionDataChunk dimColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    bitSet.flip(0, numerOfRows);
+    List<byte[]> listOfColumnarKeyBlockDataForNoDictionaryVal =
+        dimColumnDataChunk.getCompleteDataChunk();
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    int[] columnIndexArray = dimColumnDataChunk.getAttributes().getInvertedIndexes();
+    int[] columnReverseIndexArray = dimColumnDataChunk.getAttributes().getInvertedIndexesReverse();
+    for (int i = 0; i < filterValues.length; i++) {
+      byte[] filterVal = filterValues[i];
+      if (null != listOfColumnarKeyBlockDataForNoDictionaryVal) {
+
+        if (null != columnReverseIndexArray) {
+          for (int index : columnIndexArray) {
+            byte[] noDictionaryVal =
+                listOfColumnarKeyBlockDataForNoDictionaryVal.get(columnReverseIndexArray[index]);
+            if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, noDictionaryVal) == 0) {
+              bitSet.flip(index);
+            }
+          }
+        } else if (null != columnIndexArray) {
+
+          for (int index : columnIndexArray) {
+            byte[] noDictionaryVal =
+                listOfColumnarKeyBlockDataForNoDictionaryVal.get(columnIndexArray[index]);
+            if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, noDictionaryVal) == 0) {
+              bitSet.flip(index);
+            }
+          }
+        } else {
+          for (int index = 0;
+               index < listOfColumnarKeyBlockDataForNoDictionaryVal.size(); index++) {
+            if (ByteUtil.UnsafeComparer.INSTANCE
+                .compareTo(filterVal, listOfColumnarKeyBlockDataForNoDictionaryVal.get(index))
+                == 0) {
+              bitSet.flip(index);
+            }
+          }
+
+        }
+
+      }
+    }
+    return bitSet;
+
+  }
+
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimColumnDataChunk, int numerOfRows) {
+    int[] columnIndex = dimColumnDataChunk.getAttributes().getInvertedIndexes();
+    int startKey = 0;
+    int last = 0;
+    int startIndex = 0;
+    BitSet bitSet = new BitSet(numerOfRows);
+    bitSet.flip(0, numerOfRows);
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    for (int i = 0; i < filterValues.length; i++) {
+      startKey = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i]);
+      if (startKey == -1) {
+        continue;
+      }
+      bitSet.flip(columnIndex[startKey]);
+      last = startKey;
+      for (int j = startKey + 1; j < numerOfRows; j++) {
+        if (ByteUtil.UnsafeComparer.INSTANCE
+            .compareTo(dimColumnDataChunk.getCompleteDataChunk(), j * filterValues[i].length,
+                filterValues[i].length, filterValues[i], 0, filterValues[i].length) == 0) {
+          bitSet.flip(columnIndex[j]);
+          last++;
+        } else {
+          break;
+        }
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  private BitSet setFilterdIndexToBitSet(FixedLengthDimensionDataChunk dimColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int startKey = 0;
+    int last = 0;
+    bitSet.flip(0, numerOfRows);
+    int startIndex = 0;
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    for (int k = 0; k < filterValues.length; k++) {
+      startKey = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[k]);
+      if (startKey == -1) {
+        continue;
+      }
+      bitSet.flip(startKey);
+      last = startKey;
+      for (int j = startKey + 1; j < numerOfRows; j++) {
+        if (ByteUtil.UnsafeComparer.INSTANCE
+            .compareTo(dimColumnDataChunk.getCompleteDataChunk(), j * filterValues[k].length,
+                filterValues[k].length, filterValues[k], 0, filterValues[k].length) == 0) {
+          bitSet.flip(j);
+          last++;
+        } else {
+          break;
+        }
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    bitSet.flip(0, 1);
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/filter/executer/FilterExecuter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/filter/executer/FilterExecuter.java b/core/src/main/java/org/carbondata/scan/filter/executer/FilterExecuter.java
new file mode 100644
index 0000000..42f913b
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/FilterExecuter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+
+import org.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.carbondata.scan.processor.BlocksChunkHolder;
+
+public interface FilterExecuter {
+
+  /**
+   * API will apply filter based on resolver instance
+   *
+   * @return
+   * @throws FilterUnsupportedException
+   */
+  BitSet applyFilter(BlocksChunkHolder blocksChunkHolder) throws FilterUnsupportedException;
+
+  /**
+   * API will verify whether the block can be shortlisted based on block
+   * max and min key.
+   *
+   * @param blockMaxValue, maximum value of the
+   * @param blockMinValue
+   * @return BitSet
+   */
+  BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
new file mode 100644
index 0000000..71567ea
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.carbondata.core.util.ByteUtil;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.scan.filter.FilterUtil;
+import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.carbondata.scan.processor.BlocksChunkHolder;
+
+public class IncludeFilterExecuterImpl implements FilterExecuter {
+
+  protected DimColumnResolvedFilterInfo dimColumnEvaluatorInfo;
+  protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+  protected SegmentProperties segmentProperties;
+
+  public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
+      SegmentProperties segmentProperties) {
+    this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
+    this.segmentProperties = segmentProperties;
+    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+    FilterUtil.prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(),
+        segmentProperties.getDimensionKeyGenerator(), dimColumnEvaluatorInfo.getDimension(),
+        dimColumnExecuterInfo);
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) {
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColumnEvaluatorInfo.getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (dimensionColumnDataChunk.getAttributes().isNoDictionary()
+        && dimensionColumnDataChunk instanceof VariableLengthDimensionDataChunk) {
+      return setDirectKeyFilterIndexToBitSet(
+          (VariableLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    } else if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    }
+
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  private BitSet setDirectKeyFilterIndexToBitSet(
+      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    List<byte[]> listOfColumnarKeyBlockDataForNoDictionaryVals =
+        dimensionColumnDataChunk.getCompleteDataChunk();
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    int[] columnIndexArray = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int[] columnReverseIndexArray =
+        dimensionColumnDataChunk.getAttributes().getInvertedIndexesReverse();
+    for (int i = 0; i < filterValues.length; i++) {
+      byte[] filterVal = filterValues[i];
+      if (null != listOfColumnarKeyBlockDataForNoDictionaryVals) {
+        if (null != columnIndexArray) {
+          for (int index : columnIndexArray) {
+            byte[] noDictionaryVal =
+                listOfColumnarKeyBlockDataForNoDictionaryVals.get(columnReverseIndexArray[index]);
+            if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, noDictionaryVal) == 0) {
+              bitSet.set(index);
+            }
+          }
+        } else if (null != columnReverseIndexArray) {
+          for (int index : columnReverseIndexArray) {
+            byte[] noDictionaryVal =
+                listOfColumnarKeyBlockDataForNoDictionaryVals.get(columnReverseIndexArray[index]);
+            if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, noDictionaryVal) == 0) {
+              bitSet.set(index);
+            }
+          }
+        } else {
+          for (int index = 0;
+               index < listOfColumnarKeyBlockDataForNoDictionaryVals.size(); index++) {
+            if (ByteUtil.UnsafeComparer.INSTANCE
+                .compareTo(filterVal, listOfColumnarKeyBlockDataForNoDictionaryVals.get(index))
+                == 0) {
+              bitSet.set(index);
+            }
+          }
+        }
+      }
+    }
+    return bitSet;
+
+  }
+
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i]);
+      if (start == -1) {
+        continue;
+      }
+      bitSet.set(columnIndex[start]);
+      last = start;
+      for (int j = start + 1; j < numerOfRows; j++) {
+        if (ByteUtil.UnsafeComparer.INSTANCE
+            .compareTo(dimensionColumnDataChunk.getCompleteDataChunk(), j * filterValues[i].length,
+                filterValues[i].length, filterValues[i], 0, filterValues[i].length) == 0) {
+          bitSet.set(columnIndex[j]);
+          last++;
+        } else {
+          break;
+        }
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      FixedLengthDimensionDataChunk fixedDimensionChunk =
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk;
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k]);
+        if (start == -1) {
+          continue;
+        }
+        bitSet.set(start);
+        last = start;
+        for (int j = start + 1; j < numerOfRows; j++) {
+          if (ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(fixedDimensionChunk.getCompleteDataChunk(), j * filterValues[k].length,
+                  filterValues[k].length, filterValues[k], 0, filterValues[k].length) == 0) {
+            bitSet.set(j);
+            last++;
+          } else {
+            break;
+          }
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+
+  public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMaxVal[blockIndex]);
+      // and filter-min should be positive
+      int minCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMinVal[blockIndex]);
+
+      // if any filter value is in range than this block needs to be
+      // scanned
+      if (maxCompare <= 0 && minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+}



Mime
View raw message