carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1238] Decouple the datatype convert from Spark code in core module
Date Fri, 28 Jul 2017 16:31:11 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 742269079 -> 09f7cdd44


[CARBONDATA-1238] Decouple the datatype convert from Spark code in core module

Decouple the datatype convert from Spark code in core moduleļ¼š
1.Use decimal(org.apache.spark.sql.types.Decimal.apply()) in Spark engine, use java's decimal
in other engines.
2.Use org.apache.spark.unsafe.types.UTF8String in Spark engine, use String in other engines.

This closes #1197


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/09f7cdd4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/09f7cdd4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/09f7cdd4

Branch: refs/heads/master
Commit: 09f7cdd4480634407c09a0b4f515fda0d9c4911e
Parents: 7422690
Author: chenliang613 <chenliang613@apache.org>
Authored: Wed Jun 28 23:45:50 2017 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Sat Jul 29 00:30:55 2017 +0800

----------------------------------------------------------------------
 .../impl/AbstractScannedResultCollector.java    |   8 +-
 .../collector/impl/RawBasedResultCollector.java |   1 -
 .../executor/impl/AbstractQueryExecutor.java    |   4 +-
 .../scan/executor/infos/BlockExecutionInfo.java |   1 -
 .../carbondata/core/scan/model/QueryModel.java  |  14 ++-
 .../carbondata/core/util/DataTypeConverter.java |  27 +++++
 .../core/util/DataTypeConverterImpl.java        |  42 ++++++++
 .../carbondata/core/util/DataTypeUtil.java      |  60 +++++------
 .../complextypes/PrimitiveQueryTypeTest.java    |   1 -
 .../carbondata/core/util/DataTypeUtilTest.java  |  24 +----
 .../carbondata/hadoop/CarbonInputFormat.java    | 107 ++++++++++++-------
 .../hadoop/api/CarbonTableInputFormat.java      |  30 +++++-
 .../carbondata/hive/CarbonHiveRecordReader.java |   2 +-
 .../hive/MapredCarbonInputFormat.java           |   4 +-
 .../presto/CarbondataRecordSetProvider.java     |   4 +-
 .../spark/util/SparkDataTypeConverterImpl.java  |  49 +++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   3 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   6 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   1 +
 .../merger/CarbonCompactionExecutor.java        |   2 +
 20 files changed, 283 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index 4dadcc2..ad9b773 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.scan.executor.infos.MeasureInfo;
 import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * It is not a collector it is just a scanned result holder.
@@ -77,8 +78,8 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
         // if not then get the default value and use that value in aggregation
         Object defaultValue = measureInfo.getDefaultValues()[i];
         if (null != defaultValue && measureInfo.getMeasureDataTypes()[i] == DataType.DECIMAL)
{
-          // convert java big decimal to spark decimal type
-          defaultValue = org.apache.spark.sql.types.Decimal.apply((BigDecimal) defaultValue);
+          // convert data type as per the computing engine
+          defaultValue = DataTypeUtil.getDataTypeConverter().convertToDecimal(defaultValue);
         }
         msrValues[i + offset] = defaultValue;
       }
@@ -102,7 +103,8 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
             bigDecimalMsrValue =
                 bigDecimalMsrValue.setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
           }
-          return org.apache.spark.sql.types.Decimal.apply(bigDecimalMsrValue);
+          // convert data type as per the computing engine
+          return DataTypeUtil.getDataTypeConverter().convertToDecimal(bigDecimalMsrValue);
         default:
           return dataChunk.getColumnPage().getDouble(index);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 3e82257..a09676a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 
-
 /**
  * It is not a collector it is just a scanned result holder.
  */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index aa11202..05d0d8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -66,6 +66,7 @@ import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 
@@ -284,7 +285,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
     IndexKey startIndexKey = null;
     IndexKey endIndexKey = null;
     if (null != queryModel.getFilterExpressionResolverTree()) {
-      // loading the filter executer tree for filter evaluation
+      // loading the filter executor tree for filter evaluation
       blockExecutionInfo.setFilterExecuterTree(FilterUtil
           .getFilterExecuterTree(queryModel.getFilterExpressionResolverTree(), segmentProperties,
               blockExecutionInfo.getComlexDimensionInfoMap()));
@@ -405,6 +406,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
         .toArray(new QueryDimension[queryModel.getQueryDimension().size()]));
     blockExecutionInfo.setActualQueryMeasures(queryModel.getQueryMeasures()
         .toArray(new QueryMeasure[queryModel.getQueryMeasures().size()]));
+    DataTypeUtil.setDataTypeConverter(queryModel.getConverter());
     return blockExecutionInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index 7d08dda..adb6dc6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -224,7 +224,6 @@ public class BlockExecutionInfo {
   private String[] deleteDeltaFilePath;
 
   private Map<String, DeleteDeltaVo> deletedRecordsMap;
-
   public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
     return absoluteTableIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 210ee11..7b6aef9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpress
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
 
 /**
  * Query model which will have all the detail
@@ -97,6 +98,8 @@ public class QueryModel implements Serializable {
 
   private boolean vectorReader;
 
+  private DataTypeConverter converter;
+
   /**
    * Invalid table blocks, which need to be removed from
    * memory, invalid blocks can be segment which are deleted
@@ -114,7 +117,7 @@ public class QueryModel implements Serializable {
   }
 
   public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier,
-      CarbonQueryPlan queryPlan, CarbonTable carbonTable) {
+      CarbonQueryPlan queryPlan, CarbonTable carbonTable, DataTypeConverter converter) {
     QueryModel queryModel = new QueryModel();
     String factTableName = carbonTable.getFactTableName();
     queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier);
@@ -123,6 +126,7 @@ public class QueryModel implements Serializable {
 
     queryModel.setForcedDetailRawQuery(queryPlan.isRawDetailQuery());
     queryModel.setQueryId(queryPlan.getQueryId());
+    queryModel.setConverter(converter);
     return queryModel;
   }
 
@@ -360,4 +364,12 @@ public class QueryModel implements Serializable {
   public Map<String,UpdateVO>  getInvalidBlockVOForSegmentId() {
     return  invalidSegmentBlockIdMap;
   }
+
+  public DataTypeConverter getConverter() {
+    return converter;
+  }
+
+  public void setConverter(DataTypeConverter converter) {
+    this.converter = converter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
new file mode 100644
index 0000000..8c9e058
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+public interface DataTypeConverter {
+
+  Object convertToDecimal(Object data);
+  Object convertFromByteToUTF8String(Object data);
+  byte[] convertFromStringToByte(Object data);
+  Object convertFromStringToUTF8String(Object Data);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
new file mode 100644
index 0000000..7c9b132
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.Serializable;
+
+public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
+
+  private static final long serialVersionUID = -1718154403432354200L;
+
+  public Object convertToDecimal(Object data) {
+    java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString());
+    return javaDecVal;
+  }
+
+  public Object convertFromByteToUTF8String(Object data) {
+    return data.toString();
+  }
+
+  public byte[] convertFromStringToByte(Object data) {
+    return data.toString().getBytes();
+  }
+
+  public Object convertFromStringToUTF8String(Object data) {
+    return data.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 39b8b3c..535d84c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -39,7 +39,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
-import org.apache.spark.unsafe.types.UTF8String;
+
 
 public final class DataTypeUtil {
 
@@ -84,6 +84,11 @@ public final class DataTypeUtil {
   }
 
   /**
+   * DataType converter for different computing engines
+   */
+  private static DataTypeConverter converter;
+
+  /**
    * This method will convert a given value to its specific type
    *
    * @param msrValue
@@ -276,7 +281,6 @@ public final class DataTypeUtil {
             LOGGER.error("Cannot convert value to Time/Long type value" + e.getMessage());
             return null;
           }
-
         case TIMESTAMP:
           if (data.isEmpty()) {
             return null;
@@ -292,10 +296,9 @@ public final class DataTypeUtil {
           if (data.isEmpty()) {
             return null;
           }
-          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data);
-          return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+          return getDataTypeConverter().convertToDecimal(data);
         default:
-          return UTF8String.fromString(data);
+          return getDataTypeConverter().convertFromStringToUTF8String(data);
       }
     } catch (NumberFormatException ex) {
       LOGGER.error("Problem while converting data type" + data);
@@ -340,7 +343,7 @@ public final class DataTypeUtil {
     try {
       switch (actualDataType) {
         case STRING:
-          return UTF8String.fromBytes(dataInBytes);
+          return getDataTypeConverter().convertFromByteToUTF8String(dataInBytes);
         case BOOLEAN:
           return ByteUtil.toBoolean(dataInBytes);
         case SHORT:
@@ -435,9 +438,9 @@ public final class DataTypeUtil {
           if (dimension.getColumnSchema().getScale() > javaDecVal.scale()) {
             javaDecVal = javaDecVal.setScale(dimension.getColumnSchema().getScale());
           }
-          return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+          return getDataTypeConverter().convertToDecimal(javaDecVal);
         default:
-          return UTF8String.fromBytes(dataInBytes);
+          return getDataTypeConverter().convertFromByteToUTF8String(dataInBytes);
       }
     } catch (NumberFormatException ex) {
       String data = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
@@ -446,29 +449,6 @@ public final class DataTypeUtil {
     }
   }
 
-  public static Object getMeasureDataBasedOnDataType(Object data, DataType dataType) {
-
-    if (null == data) {
-      return null;
-    }
-    try {
-      switch (dataType) {
-        case DOUBLE:
-          return data;
-        case LONG:
-          return data;
-        case DECIMAL:
-          return org.apache.spark.sql.types.Decimal.apply((java.math.BigDecimal) data);
-        default:
-          return data;
-      }
-    } catch (NumberFormatException ex) {
-      LOGGER.error("Problem while converting data type" + data);
-      return null;
-    }
-
-  }
-
   /**
    * Below method will be used to basically to know whether any non parseable
    * data is present or not. if present then return null so that system can
@@ -654,7 +634,7 @@ public final class DataTypeUtil {
           java.math.BigDecimal javaDecVal = new java.math.BigDecimal(parsedValue);
           return bigDecimalToByte(javaDecVal);
         default:
-          return UTF8String.fromString(data).getBytes();
+          return getDataTypeConverter().convertFromStringToByte(data);
       }
     } catch (NumberFormatException ex) {
       LOGGER.error("Problem while converting data type" + data);
@@ -705,4 +685,20 @@ public final class DataTypeUtil {
     }
     return null;
   }
+
+  /**
+   * set the data type converter as per computing engine
+   * @param converterLocal
+   */
+  public static void setDataTypeConverter(DataTypeConverter converterLocal) {
+    converter = converterLocal;
+  }
+
+  public static DataTypeConverter getDataTypeConverter() {
+    if (converter == null) {
+      converter = new DataTypeConverterImpl();
+    }
+    return converter;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
index b89c9dd..6beb81c 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
@@ -125,7 +125,6 @@ public class PrimitiveQueryTypeTest {
         return "2015-10-20 12:30:01";
       }
     };
-
     Object expectedValue = primitiveQueryTypeForTimeStampForIsDictionaryFalse
         .getDataBasedOnDataTypeFromSurrogates(surrogateData);
     Object actualValue = primitiveQueryTypeForTimeStampForIsDictionaryFalse

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index f5d7dc7..a02bc10 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -21,7 +21,6 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
-import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.Test;
 
 import java.math.BigDecimal;
@@ -82,30 +81,13 @@ public class DataTypeUtilTest {
     assertEquals(getDataBasedOnDataType("0", DataType.LONG), 0L);
     java.math.BigDecimal javaDecVal = new java.math.BigDecimal(1);
     scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal);
-    org.apache.spark.sql.types.Decimal expected =
-        new org.apache.spark.sql.types.Decimal().set(scalaDecVal);
-    assertEquals(getDataBasedOnDataType("1", DataType.DECIMAL), expected);
+    assertEquals(getDataBasedOnDataType("1", DataType.DECIMAL),
+        DataTypeUtil.getDataTypeConverter().convertToDecimal(scalaDecVal));
     assertEquals(getDataBasedOnDataType("default", DataType.NULL),
-        UTF8String.fromString("default"));
+        DataTypeUtil.getDataTypeConverter().convertFromStringToUTF8String("default"));
     assertEquals(getDataBasedOnDataType((String) null, DataType.NULL), null);
   }
 
-  @Test public void testGetMeasureDataBasedOnDataType() throws NumberFormatException {
-    assertEquals(getMeasureDataBasedOnDataType(new Long("1"), DataType.LONG), Long.parseLong("1"));
-    assertEquals(getMeasureDataBasedOnDataType(new Double("1"), DataType.DOUBLE),
-        Double.parseDouble("1"));
-    java.math.BigDecimal javaDecVal = new java.math.BigDecimal(1);
-    scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal);
-    org.apache.spark.sql.types.Decimal expected =
-        new org.apache.spark.sql.types.Decimal().set(scalaDecVal);
-    assertEquals(
-            getMeasureDataBasedOnDataType(
-                    new java.math.BigDecimal(1),
-                    DataType.DECIMAL),
-            expected);
-    assertEquals(getMeasureDataBasedOnDataType("1", DataType.STRING), "1");
-  }
-
   @Test public void testGetMeasureValueBasedOnDataType() {
     ColumnSchema columnSchema = new ColumnSchema();
     CarbonMeasure carbonMeasure = new CarbonMeasure(columnSchema, 1);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 4bebf21..249543e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -64,6 +64,8 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
@@ -111,6 +113,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
   private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+  private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
 
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
@@ -143,6 +146,30 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   }
 
   /**
+   * It is optional, if user does not set then it reads from store
+   *
+   * @param configuration
+   * @param converter is the Data type converter for different computing engine
+   * @throws IOException
+   */
+  public static void setDataTypeConverter(Configuration configuration, DataTypeConverter
converter)
+      throws IOException {
+    if (null != converter) {
+      configuration.set(CARBON_CONVERTER,
+          ObjectSerializationUtil.convertObjectToString(converter));
+    }
+  }
+
+  public static DataTypeConverter getDataTypeConverter(Configuration configuration)
+      throws IOException {
+    String converter = configuration.get(CARBON_CONVERTER);
+    if (converter == null) {
+      return new DataTypeConverterImpl();
+    }
+    return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
+  }
+
+  /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
   private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException
{
@@ -163,12 +190,15 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       return this.carbonTable;
     }
   }
-
   public static void setTablePath(Configuration configuration, String tablePath)
       throws IOException {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
 
+  private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
+    return CarbonStorePath.getCarbonTablePath(absIdentifier);
+  }
+
   /**
    * It sets unresolved filter expression.
    *
@@ -187,6 +217,19 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     }
   }
 
+  protected Expression getFilterPredicates(Configuration configuration) {
+    try {
+      String filterExprString = configuration.get(FILTER_PREDICATE);
+      if (filterExprString == null) {
+        return null;
+      }
+      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+      return (Expression) filter;
+    } catch (IOException e) {
+      throw new RuntimeException("Error while reading filter expression", e);
+    }
+  }
+
   public static void setColumnProjection(Configuration configuration, CarbonProjection projection)
{
     if (projection == null || projection.isEmpty()) {
       return;
@@ -212,8 +255,27 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     }
   }
 
-  private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
-    return CarbonStorePath.getCarbonTablePath(absIdentifier);
+  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+    //By default it uses dictionary decoder read class
+    CarbonReadSupport<T> readSupport = null;
+    if (readSupportClass != null) {
+      try {
+        Class<?> myClass = Class.forName(readSupportClass);
+        Constructor<?> constructor = myClass.getConstructors()[0];
+        Object object = constructor.newInstance();
+        if (object instanceof CarbonReadSupport) {
+          readSupport = (CarbonReadSupport) object;
+        }
+      } catch (ClassNotFoundException ex) {
+        LOG.error("Class " + readSupportClass + "not found", ex);
+      } catch (Exception ex) {
+        LOG.error("Error while creating " + readSupportClass, ex);
+      }
+    } else {
+      readSupport = new DictionaryDecodeReadSupport<>();
+    }
+    return readSupport;
   }
 
   /**
@@ -419,19 +481,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     return result;
   }
 
-  protected Expression getFilterPredicates(Configuration configuration) {
-    try {
-      String filterExprString = configuration.get(FILTER_PREDICATE);
-      if (filterExprString == null) {
-        return null;
-      }
-      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
-      return (Expression) filter;
-    } catch (IOException e) {
-      throw new RuntimeException("Error while reading filter expression", e);
-    }
-  }
-
   /**
    * get data blocks of given segment
    */
@@ -752,7 +801,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     // query plan includes projection column
     String projection = getColumnProjection(configuration);
     CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
-    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable,
+        getDataTypeConverter(configuration));
 
     // set the filter to the query model in order to filter blocklet before scan
     Expression filter = getFilterPredicates(configuration);
@@ -776,29 +826,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     return queryModel;
   }
 
-  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
-    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
-    //By default it uses dictionary decoder read class
-    CarbonReadSupport<T> readSupport = null;
-    if (readSupportClass != null) {
-      try {
-        Class<?> myClass = Class.forName(readSupportClass);
-        Constructor<?> constructor = myClass.getConstructors()[0];
-        Object object = constructor.newInstance();
-        if (object instanceof CarbonReadSupport) {
-          readSupport = (CarbonReadSupport) object;
-        }
-      } catch (ClassNotFoundException ex) {
-        LOG.error("Class " + readSupportClass + "not found", ex);
-      } catch (Exception ex) {
-        LOG.error("Error while creating " + readSupportClass, ex);
-      }
-    } else {
-      readSupport = new DictionaryDecodeReadSupport<>();
-    }
-    return readSupport;
-  }
-
   @Override protected List<FileStatus> listStatus(JobContext job) throws IOException
{
     List<FileStatus> result = new ArrayList<FileStatus>();
     String[] segmentsToConsider = getSegmentsToAccess(job);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 9e6e284..933b583 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -57,6 +57,8 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
@@ -102,6 +104,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
   private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+  private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
 
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
@@ -443,7 +446,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     // query plan includes projection column
     String projection = getColumnProjection(configuration);
     CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
-    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable,
+        getDataTypeConverter(configuration));
 
     // set the filter to the query model in order to filter blocklet before scan
     Expression filter = getFilterPredicates(configuration);
@@ -573,4 +577,28 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     }
     return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
   }
+
+  /**
+   * It is optional, if user does not set then it reads from store
+   *
+   * @param configuration
+   * @param converter is the Data type converter for different computing engine
+   * @throws IOException
+   */
+  public static void setDataTypeConverter(Configuration configuration, DataTypeConverter
converter)
+      throws IOException {
+    if (null != converter) {
+      configuration.set(CARBON_CONVERTER,
+          ObjectSerializationUtil.convertObjectToString(converter));
+    }
+  }
+
+  public static DataTypeConverter getDataTypeConverter(Configuration configuration)
+      throws IOException {
+    String converter = configuration.get(CARBON_CONVERTER);
+    if (converter == null) {
+      return new DataTypeConverterImpl();
+    }
+    return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index 2a92185..ae87d66 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -227,7 +227,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
         return new Text(obj.toString());
       case DECIMAL:
         return new HiveDecimalWritable(
-            HiveDecimal.create(((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal()));
+            HiveDecimal.create((java.math.BigDecimal) obj));
     }
     throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 5ffc058..5190846 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
@@ -127,7 +128,8 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
     String projection = getProjection(configuration, carbonTable,
         identifier.getCarbonTableIdentifier().getTableName());
     CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
-    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable,
+        new DataTypeConverterImpl());
     // set the filter to the query model in order to filter blocklet before scan
     Expression filter = getFilterPredicates(configuration);
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 4b7864d..532ab87 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.presto;
 
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 import com.facebook.presto.spi.ColumnHandle;
@@ -102,7 +103,8 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider
{
     CarbonTable targetTable = tableCacheModel.carbonTable;
     CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
     QueryModel queryModel =
-        QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+        QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(),
+            queryPlan, targetTable, new DataTypeConverterImpl());
 
     // Push down filter
     fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
new file mode 100644
index 0000000..be459ac
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.util.DataTypeConverter;
+
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Convert java data type to spark data type
+ */
+public final class SparkDataTypeConverterImpl implements DataTypeConverter, Serializable
{
+
+  private static final long serialVersionUID = -4379212832935070583L;
+
+  public Object convertToDecimal(Object data) {
+    java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString());
+    return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+  }
+
+  public byte[] convertFromStringToByte(Object data) {
+    return UTF8String.fromString((String) data).getBytes();
+  }
+
+  public Object convertFromByteToUTF8String(Object data) {
+    return UTF8String.fromBytes((byte[]) data);
+  }
+
+  public Object convertFromStringToUTF8String(Object data) {
+    return UTF8String.fromString((String) data);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index c383779..cd3144a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFa
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
 
 /**
  * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
@@ -255,6 +255,7 @@ class CarbonScanRDD(
   private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object]
= {
     CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
     CarbonTableInputFormat.setTableInfo(conf, tableInfo)
+    CarbonTableInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl)
     createInputFormat(conf)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 5d931b8..2c2e954 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -37,14 +37,16 @@ import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
 import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
 import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
 
 private[sql] case class CarbonDatasourceHadoopRelation(
   sqlContext: SQLContext,
@@ -137,6 +139,8 @@ class CarbonHadoopFSRDD[V: ClassTag](
     val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
     val job: Job = new Job(hadoopAttemptContext.getConfiguration)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job)
+    CarbonInputFormat.setDataTypeConverter(hadoopAttemptContext.getConfiguration,
+      new SparkDataTypeConverterImpl)
     hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
     val reader =
       format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 43f6a21..7e27d1b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.{CarbonRDD, CarbonRDDWithTableInfo}
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
 
 /**
  * It decodes the data.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09f7cdd4/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 94bfbe4..be3572c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.BatchResult;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Executor class for executing the query on the selected segments to be merged.
@@ -203,6 +204,7 @@ public class CarbonCompactionExecutor {
     model.setTableBlockInfos(blockList);
     model.setForcedDetailRawQuery(true);
     model.setFilterExpressionResolverTree(null);
+    model.setConverter(DataTypeUtil.getDataTypeConverter());
 
     List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 


Mime
View raw message