carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1429] Add a value based compression for decimal data type when decimal is stored as Int or Long
Date Fri, 15 Sep 2017 09:24:36 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 302ef2f56 -> 6f204376f


[CARBONDATA-1429] Add a value based compression for decimal data type when decimal is stored as Int or Long

Added a value based compression for decimal data type when decimal is stored as Int or Long

When decimal precision is <= 9, decimal values are stored in 4 bytes but they are not compressed further based on min and max values as compared with other primitive data type compression. Therefore now based on min and max value decimal data falling in Integer range will be further compressed as byte or short.
When decimal precision is <= 18, decimal values are stored in 8 bytes but they are not compressed further based on min and max values as compared with other primitive data type compression. Therefore now based on min and max value decimal data falling in Long range will be further compressed as byte, short or int.
Advantage: This will reduce the storage space thereby decreasing the IO time while decompressing the data.

This closes #1297


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6f204376
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6f204376
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6f204376

Branch: refs/heads/master
Commit: 6f204376f880231c8f537052fe1b29008178aad8
Parents: 302ef2f
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Thu Aug 24 12:43:58 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Fri Sep 15 17:22:58 2017 +0800

----------------------------------------------------------------------
 .../core/datastore/page/ColumnPage.java         |  64 +++-
 .../core/datastore/page/DecimalColumnPage.java  | 109 +++++++
 .../core/datastore/page/LazyColumnPage.java     |  20 +-
 .../datastore/page/SafeDecimalColumnPage.java   | 227 ++++++++++++++
 .../datastore/page/SafeFixLengthColumnPage.java |   3 +-
 .../datastore/page/SafeVarLengthColumnPage.java |   5 +-
 .../datastore/page/UnsafeDecimalColumnPage.java | 296 +++++++++++++++++++
 .../page/UnsafeVarLengthColumnPage.java         |  52 +---
 .../datastore/page/VarLengthColumnPageBase.java |  52 +++-
 .../page/encoding/ColumnPageEncoderMeta.java    |  16 +-
 .../page/encoding/DefaultEncodingFactory.java   |  93 +++++-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |  21 +-
 .../adaptive/AdaptiveIntegralCodec.java         |  12 +-
 .../datatype/DecimalConverterFactory.java       |  74 +++--
 14 files changed, 931 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 0be409e..6c534d6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -22,7 +22,6 @@ import java.math.BigDecimal;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
@@ -31,7 +30,6 @@ import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsColle
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
@@ -61,8 +59,6 @@ public abstract class ColumnPage {
   // statistics collector for this column page
   private ColumnPageStatsCollector statsCollector;
 
-  DecimalConverterFactory.DecimalConverter decimalConverter;
-
   protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
           CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT));
@@ -75,12 +71,6 @@ public abstract class ColumnPage {
     this.dataType = dataType;
     this.pageSize = pageSize;
     this.nullBitSet = new BitSet(pageSize);
-    if (dataType == DECIMAL) {
-      assert (columnSpec.getColumnType() == ColumnType.MEASURE);
-      int precision = columnSpec.getPrecision();
-      int scale = columnSpec.getScale();
-      decimalConverter = DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale);
-    }
   }
 
   public DataType getDataType() {
@@ -130,6 +120,19 @@ public abstract class ColumnPage {
     this.statsCollector = statsCollector;
   }
 
+  private static ColumnPage createDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
+      int pageSize) {
+    if (unsafe) {
+      try {
+        return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
+      } catch (MemoryException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      return new SafeDecimalColumnPage(columnSpec, dataType, pageSize);
+    }
+  }
+
   private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
       int pageSize) {
     if (unsafe) {
@@ -158,7 +161,9 @@ public abstract class ColumnPage {
 
   private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
       int pageSize) {
-    if (dataType.equals(BYTE_ARRAY) || dataType.equals(DECIMAL)) {
+    if (dataType.equals(DECIMAL)) {
+      return createDecimalPage(columnSpec, dataType, pageSize);
+    } else if (dataType.equals(BYTE_ARRAY)) {
       return createVarLengthPage(columnSpec, dataType, pageSize);
     } else {
       return createFixLengthPage(columnSpec, dataType, pageSize);
@@ -189,6 +194,8 @@ public abstract class ColumnPage {
           instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
           break;
         case DECIMAL:
+          instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
+          break;
         case STRING:
         case BYTE_ARRAY:
           instance =
@@ -631,14 +638,43 @@ public abstract class ColumnPage {
   }
 
   /**
-   * Decompress decimal data and create a column page
+   * Decompress data and create a decimal column page using the decompressed data
    */
   public static ColumnPage decompressDecimalPage(ColumnPageEncoderMeta meta, byte[] compressedData,
       int offset, int length) throws MemoryException {
     Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
-    byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
-    return newDecimalPage(columnSpec, lvEncodedBytes);
+    ColumnPage decimalPage = null;
+    switch (meta.getStoreDataType()) {
+      case BYTE:
+        byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), byteData.length);
+        decimalPage.setBytePage(byteData);
+        return decimalPage;
+      case SHORT:
+        short[] shortData = compressor.unCompressShort(compressedData, offset, length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortData.length);
+        decimalPage.setShortPage(shortData);
+        return decimalPage;
+      case SHORT_INT:
+        byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortIntData.length);
+        decimalPage.setShortIntPage(shortIntData);
+        return decimalPage;
+      case INT:
+        int[] intData = compressor.unCompressInt(compressedData, offset, length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), intData.length);
+        decimalPage.setIntPage(intData);
+        return decimalPage;
+      case LONG:
+        long[] longData = compressor.unCompressLong(compressedData, offset, length);
+        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), longData.length);
+        decimalPage.setLongPage(longData);
+        return decimalPage;
+      default:
+        byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
+        return newDecimalPage(columnSpec, lvEncodedBytes);
+    }
   }
 
   public BitSet getNullBits() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
new file mode 100644
index 0000000..2624223
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+
+/**
+ * Represent a columnar data in one page for one column of decimal data type
+ */
+public abstract class DecimalColumnPage extends VarLengthColumnPageBase {
+
+  /**
+   * decimal converter instance
+   */
+  DecimalConverterFactory.DecimalConverter decimalConverter;
+
+  DecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
+    super(columnSpec, dataType, pageSize);
+    decimalConverter = DecimalConverterFactory.INSTANCE
+        .getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale());
+  }
+
+  public DecimalConverterFactory.DecimalConverter getDecimalConverter() {
+    return decimalConverter;
+  }
+
+  @Override
+  public byte[] getBytePage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public short[] getShortPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[] getShortIntPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public int[] getIntPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public long[] getLongPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public float[] getFloatPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public double[] getDoublePage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[][] getByteArrayPage() {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void putDouble(int rowId, double value) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setFloatPage(float[] floatData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public void setDoublePage(double[] doubleData) {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 1e90387..4bdb252 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+
 /**
  * This is a decorator of column page, it performs decoding lazily (when caller calls getXXX
  * method to get the value from the page)
@@ -93,7 +95,23 @@ public class LazyColumnPage extends ColumnPage {
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    return columnPage.getDecimal(rowId);
+    DecimalConverterFactory.DecimalConverter decimalConverter =
+        ((DecimalColumnPage) columnPage).getDecimalConverter();
+    switch (columnPage.getDataType()) {
+      case BYTE:
+        return decimalConverter.getDecimal(converter.decodeLong(columnPage.getByte(rowId)));
+      case SHORT:
+        return decimalConverter.getDecimal(converter.decodeLong(columnPage.getShort(rowId)));
+      case SHORT_INT:
+        return decimalConverter.getDecimal(converter.decodeLong(columnPage.getShortInt(rowId)));
+      case INT:
+        return decimalConverter.getDecimal(converter.decodeLong(columnPage.getInt(rowId)));
+      case LONG:
+      case DECIMAL:
+        return columnPage.getDecimal(rowId);
+      default:
+        throw new RuntimeException("internal error: " + this.toString());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
new file mode 100644
index 0000000..01d3d87
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Represent a columnar data in one page for one column of decimal data type
+ */
+public class SafeDecimalColumnPage extends DecimalColumnPage {
+
+  // Only one of following fields will be used
+  private byte[] byteData;
+  private short[] shortData;
+  private int[] intData;
+  private long[] longData;
+  private byte[] shortIntData;
+  private byte[][] byteArrayData;
+
+  SafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
+    super(columnSpec, dataType, pageSize);
+    byteArrayData = new byte[pageSize][];
+  }
+
+  @Override
+  public void setBytePage(byte[] byteData) {
+    this.byteData = byteData;
+  }
+
+  @Override
+  public void setShortPage(short[] shortData) {
+    this.shortData = shortData;
+  }
+
+  @Override
+  public void setShortIntPage(byte[] shortIntData) {
+    this.shortIntData = shortIntData;
+  }
+
+  @Override
+  public void setIntPage(int[] intData) {
+    this.intData = intData;
+  }
+
+  @Override
+  public void setLongPage(long[] longData) {
+    this.longData = longData;
+  }
+
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    byteArrayData = byteArray;
+  }
+
+  /**
+   * Set byte value at rowId
+   */
+  @Override
+  public void putByte(int rowId, byte value) {
+    byteData[rowId] = value;
+  }
+
+  /**
+   * Set short value at rowId
+   */
+  @Override
+  public void putShort(int rowId, short value) {
+    shortData[rowId] = value;
+  }
+
+  /**
+   * Set integer value at rowId
+   */
+  @Override
+  public void putInt(int rowId, int value) {
+    intData[rowId] = value;
+  }
+
+  /**
+   * Set long value at rowId
+   */
+  @Override
+  public void putLong(int rowId, long value) {
+    longData[rowId] = value;
+  }
+
+  @Override
+  void putBytesAtRow(int rowId, byte[] bytes) {
+    byteArrayData[rowId] = bytes;
+  }
+
+  @Override
+  public void putDecimal(int rowId, BigDecimal decimal) {
+    switch (decimalConverter.getDecimalConverterType()) {
+      case DECIMAL_INT:
+        if (null == intData) {
+          intData = new int[pageSize];
+        }
+        putInt(rowId, (int) decimalConverter.convert(decimal));
+        break;
+      case DECIMAL_LONG:
+        if (null == longData) {
+          longData = new long[pageSize];
+        }
+        putLong(rowId, (long) decimalConverter.convert(decimal));
+        break;
+      default:
+        putBytes(rowId, (byte[]) decimalConverter.convert(decimal));
+    }
+  }
+
+  @Override
+  public void putShortInt(int rowId, int value) {
+    byte[] converted = ByteUtil.to3Bytes(value);
+    System.arraycopy(converted, 0, shortIntData, rowId * 3, 3);
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    byteArrayData[rowId] = new byte[length];
+    System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length);
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    return byteData[rowId];
+  }
+
+  @Override
+  public byte[] getBytes(int rowId) {
+    return byteArrayData[rowId];
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    return shortData[rowId];
+  }
+
+  @Override
+  public int getShortInt(int rowId) {
+    return ByteUtil.valueOf3Bytes(shortIntData, rowId * 3);
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    return intData[rowId];
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    return longData[rowId];
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    long value;
+    switch (dataType) {
+      case BYTE:
+        value = getByte(rowId);
+        break;
+      case SHORT:
+        value = getShort(rowId);
+        break;
+      case SHORT_INT:
+        value = getShortInt(rowId);
+        break;
+      case INT:
+        value = getInt(rowId);
+        break;
+      case LONG:
+        value = getLong(rowId);
+        break;
+      default:
+        byte[] bytes = byteArrayData[rowId];
+        return decimalConverter.getDecimal(bytes);
+    }
+    return decimalConverter.getDecimal(value);
+  }
+
+  @Override
+  public void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
+    System.arraycopy(byteArrayData[rowId], 0, dest, destOffset, length);
+  }
+
+  @Override
+  public void convertValue(ColumnPageValueConverter codec) {
+    switch (decimalConverter.getDecimalConverterType()) {
+      case DECIMAL_INT:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, intData[i]);
+        }
+        break;
+      case DECIMAL_LONG:
+        for (int i = 0; i < pageSize; i++) {
+          codec.encode(i, longData[i]);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "not support value conversion on " + dataType + " page");
+    }
+  }
+
+  @Override
+  public void freeMemory() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 5e0e822..33d306d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -166,8 +166,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
     return doubleData[rowId];
   }
 
-  @Override
-  public BigDecimal getDecimal(int rowId) {
+  @Override public BigDecimal getDecimal(int rowId) {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index dde6132..b5daddb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -51,13 +51,12 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
-    putBytes(rowId, decimalConverter.convert(decimal));
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    byte[] bytes = byteArrayData[rowId];
-    return decimalConverter.getDecimal(bytes);
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
new file mode 100644
index 0000000..45fa7d8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Represents a columnar data for decimal data type column for one page
+ */
+public class UnsafeDecimalColumnPage extends DecimalColumnPage {
+
+  UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
+      throws MemoryException {
+    super(columnSpec, dataType, pageSize);
+    capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
+    initMemory();
+  }
+
+  UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
+      int capacity) throws MemoryException {
+    super(columnSpec, dataType, pageSize);
+    this.capacity = capacity;
+    initMemory();
+  }
+
+  private void initMemory() throws MemoryException {
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        int size = pageSize << dataType.getSizeBits();
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
+        baseAddress = memoryBlock.getBaseObject();
+        baseOffset = memoryBlock.getBaseOffset();
+        break;
+      case SHORT_INT:
+        size = pageSize * 3;
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
+        baseAddress = memoryBlock.getBaseObject();
+        baseOffset = memoryBlock.getBaseOffset();
+        break;
+      case DECIMAL:
+        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
+        baseAddress = memoryBlock.getBaseObject();
+        baseOffset = memoryBlock.getBaseOffset();
+        break;
+      default:
+        throw new UnsupportedOperationException("invalid data type: " + dataType);
+    }
+  }
+
+  @Override
+  public void setBytePage(byte[] byteData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(byteData, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseAddress, baseOffset,
+            byteData.length << byteBits);
+  }
+
+  @Override
+  public void setShortPage(short[] shortData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(shortData, CarbonUnsafe.SHORT_ARRAY_OFFSET, baseAddress, baseOffset,
+            shortData.length << shortBits);
+  }
+
+  @Override
+  public void setShortIntPage(byte[] shortIntData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(shortIntData, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseAddress, baseOffset,
+            shortIntData.length);
+  }
+
+  @Override
+  public void setIntPage(int[] intData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(intData, CarbonUnsafe.INT_ARRAY_OFFSET, baseAddress, baseOffset,
+            intData.length << intBits);
+  }
+
+  @Override
+  public void setLongPage(long[] longData) {
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(longData, CarbonUnsafe.LONG_ARRAY_OFFSET, baseAddress, baseOffset,
+            longData.length << longBits);
+  }
+
+  @Override
+  public void setByteArrayPage(byte[][] byteArray) {
+    if (totalLength != 0) {
+      throw new IllegalStateException("page is not empty");
+    }
+    for (int i = 0; i < byteArray.length; i++) {
+      putBytes(i, byteArray[i]);
+    }
+  }
+
+  @Override
+  public void freeMemory() {
+    if (memoryBlock != null) {
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+      memoryBlock = null;
+      baseAddress = null;
+      baseOffset = 0;
+    }
+  }
+
+  @Override
+  public void putByte(int rowId, byte value) {
+    long offset = rowId << byteBits;
+    CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putShort(int rowId, short value) {
+    long offset = rowId << shortBits;
+    CarbonUnsafe.getUnsafe().putShort(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putShortInt(int rowId, int value) {
+    byte[] data = ByteUtil.to3Bytes(value);
+    long offset = rowId * 3L;
+    CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, data[0]);
+    CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 1, data[1]);
+    CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 2, data[2]);
+  }
+
+  @Override
+  public void putInt(int rowId, int value) {
+    long offset = rowId << intBits;
+    CarbonUnsafe.getUnsafe().putInt(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putLong(int rowId, long value) {
+    long offset = rowId << longBits;
+    CarbonUnsafe.getUnsafe().putLong(baseAddress, baseOffset + offset, value);
+  }
+
+  @Override
+  public void putBytesAtRow(int rowId, byte[] bytes) {
+    putBytes(rowId, bytes, 0, bytes.length);
+  }
+
+  @Override
+  public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+    try {
+      ensureMemory(length);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
+    CarbonUnsafe.getUnsafe().copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset, baseAddress,
+        baseOffset + rowOffset[rowId], length);
+  }
+
+  @Override
+  public void putDecimal(int rowId, BigDecimal decimal) {
+    switch (decimalConverter.getDecimalConverterType()) {
+      case DECIMAL_INT:
+        putInt(rowId, (int) decimalConverter.convert(decimal));
+        break;
+      case DECIMAL_LONG:
+        putLong(rowId, (long) decimalConverter.convert(decimal));
+        break;
+      default:
+        putBytes(rowId, (byte[]) decimalConverter.convert(decimal));
+    }
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    long offset = rowId << byteBits;
+    return CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public byte[] getBytes(int rowId) {
+    int length = rowOffset[rowId + 1] - rowOffset[rowId];
+    byte[] bytes = new byte[length];
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
+        bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+    return bytes;
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    long offset = rowId << shortBits;
+    return CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public int getShortInt(int rowId) {
+    long offset = rowId * 3L;
+    byte[] data = new byte[3];
+    data[0] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset);
+    data[1] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset + 1);
+    data[2] = CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset + 2);
+    return ByteUtil.valueOf3Bytes(data, 0);
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    long offset = rowId << intBits;
+    return CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    long offset = rowId << longBits;
+    return CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset);
+  }
+
+  @Override
+  public BigDecimal getDecimal(int rowId) {
+    long value;
+    switch (dataType) {
+      case BYTE:
+        value = getByte(rowId);
+        break;
+      case SHORT:
+        value = getShort(rowId);
+        break;
+      case SHORT_INT:
+        value = getShortInt(rowId);
+        break;
+      case INT:
+        value = getInt(rowId);
+        break;
+      case LONG:
+        value = getLong(rowId);
+        break;
+      default:
+        int length = rowOffset[rowId + 1] - rowOffset[rowId];
+        byte[] bytes = new byte[length];
+        CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], bytes,
+            CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+        return decimalConverter.getDecimal(bytes);
+    }
+    return decimalConverter.getDecimal(value);
+  }
+
+  @Override
+  void copyBytes(int rowId, byte[] dest, int destOffset, int length) {
+    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], dest,
+        CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length);
+  }
+
+  @Override
+  public void convertValue(ColumnPageValueConverter codec) {
+    convertValueForDecimalType(codec);
+  }
+
+  private void convertValueForDecimalType(ColumnPageValueConverter codec) {
+    switch (decimalConverter.getDecimalConverterType()) {
+      case DECIMAL_INT:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << intBits;
+          codec.encode(i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
+        }
+        break;
+      case DECIMAL_LONG:
+        for (int i = 0; i < pageSize; i++) {
+          long offset = i << longBits;
+          codec.encode(i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "not support value conversion on " + dataType + " page");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 85b9b9f..c9737a4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -21,35 +21,15 @@ import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
-// This extension uses unsafe memory to store page data, for variable length data type (string,
-// decimal)
+/**
+ * This extension uses unsafe memory to store page data, for variable length data type (string)
+ */
 public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
-  // memory allocated by Unsafe
-  private MemoryBlock memoryBlock;
-
-  // base address of memoryBlock
-  private Object baseAddress;
-
-  // base offset of memoryBlock
-  private long baseOffset;
-
-  // size of the allocated memory, in bytes
-  private int capacity;
-
-  // default size for each row, grows as needed
-  private static final int DEFAULT_ROW_SIZE = 8;
-
-  private static final double FACTOR = 1.25;
-
-  private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-
   /**
    * create a page
    */
@@ -84,23 +64,6 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
     }
   }
 
-  /**
-   * reallocate memory if capacity length than current size + request size
-   */
-  private void ensureMemory(int requestSize) throws MemoryException {
-    if (totalLength + requestSize > capacity) {
-      int newSize = 2 * capacity;
-      MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
-      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
-          newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
-      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
-      memoryBlock = newBlock;
-      baseAddress = newBlock.getBaseObject();
-      baseOffset = newBlock.getBaseOffset();
-      capacity = newSize;
-    }
-  }
-
   @Override
   public void putBytesAtRow(int rowId, byte[] bytes) {
     putBytes(rowId, bytes, 0, bytes.length);
@@ -128,17 +91,12 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
   }
 
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
-    putBytes(rowId, decimalConverter.convert(decimal));
+
   }
 
   @Override
   public BigDecimal getDecimal(int rowId) {
-    int length = rowOffset[rowId + 1] - rowOffset[rowId];
-    byte[] bytes = new byte[length];
-    CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId],
-        bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-
-    return decimalConverter.getDecimal(bytes);
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 9338bbc..83e6ef0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -22,21 +22,48 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
+import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
 import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
 
 public abstract class VarLengthColumnPageBase extends ColumnPage {
 
+  static final int byteBits = BYTE.getSizeBits();
+  static final int shortBits = DataType.SHORT.getSizeBits();
+  static final int intBits = DataType.INT.getSizeBits();
+  static final int longBits = DataType.LONG.getSizeBits();
+  // default size for each row, grows as needed
+  static final int DEFAULT_ROW_SIZE = 8;
+
+  static final double FACTOR = 1.25;
+
+  final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
+  // memory allocated by Unsafe
+  MemoryBlock memoryBlock;
+
+  // base address of memoryBlock
+  Object baseAddress;
+
   // the offset of row in the unsafe memory, its size is pageSize + 1
   int[] rowOffset;
 
   // the length of bytes added in the page
   int totalLength;
 
+  // base offset of memoryBlock
+  long baseOffset;
+
+  // size of the allocated memory, in bytes
+  int capacity;
   VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
     super(columnSpec, dataType, pageSize);
     rowOffset = new int[pageSize + 1];
@@ -116,9 +143,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     VarLengthColumnPageBase page;
     if (unsafe) {
-      page = new UnsafeVarLengthColumnPage(columnSpec, DECIMAL, rowId);
+      page = new UnsafeDecimalColumnPage(columnSpec, DECIMAL, rowId);
     } else {
-      page = new SafeVarLengthColumnPage(columnSpec, DECIMAL, rowId);
+      page = new SafeDecimalColumnPage(columnSpec, DECIMAL, rowId);
     }
 
     // set total length and rowOffset in page
@@ -159,9 +186,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     VarLengthColumnPageBase page;
     int inputDataLength = offset;
     if (unsafe) {
-      page = new UnsafeVarLengthColumnPage(columnSpec, DECIMAL, numRows, inputDataLength);
+      page = new UnsafeDecimalColumnPage(columnSpec, DECIMAL, numRows, inputDataLength);
     } else {
-      page = new SafeVarLengthColumnPage(columnSpec, dataType, numRows);
+      page = new SafeDecimalColumnPage(columnSpec, dataType, numRows);
     }
 
     // set total length and rowOffset in page
@@ -330,4 +357,21 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   public void convertValue(ColumnPageValueConverter codec) {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
+
+  /**
+   * reallocate memory if capacity length than current size + request size
+   */
+  protected void ensureMemory(int requestSize) throws MemoryException {
+    if (totalLength + requestSize > capacity) {
+      int newSize = 2 * capacity;
+      MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
+          newBlock.getBaseObject(), newBlock.getBaseOffset(), capacity);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+      memoryBlock = newBlock;
+      baseAddress = newBlock.getBaseObject();
+      baseOffset = newBlock.getBaseOffset();
+      capacity = newSize;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 87eb77a..422ce67 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -161,8 +161,8 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
         out.writeDouble(0d); // unique value is obsoleted, maintain for compatibility
         break;
       case DECIMAL:
-        byte[] maxAsBytes = getMaxAsBytes();
-        byte[] minAsBytes = getMinAsBytes();
+        byte[] maxAsBytes = getMaxAsBytes(columnSpec.getSchemaDataType());
+        byte[] minAsBytes = getMinAsBytes(columnSpec.getSchemaDataType());
         byte[] unique = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
         out.writeShort((short) maxAsBytes.length);
         out.write(maxAsBytes);
@@ -232,20 +232,20 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
     }
   }
 
-  public byte[] getMaxAsBytes() {
-    return getValueAsBytes(getMaxValue());
+  public byte[] getMaxAsBytes(DataType dataType) {
+    return getValueAsBytes(getMaxValue(), dataType);
   }
 
-  public byte[] getMinAsBytes() {
-    return getValueAsBytes(getMinValue());
+  public byte[] getMinAsBytes(DataType dataType) {
+    return getValueAsBytes(getMinValue(), dataType);
   }
 
   /**
    * convert value to byte array
    */
-  private byte[] getValueAsBytes(Object value) {
+  private byte[] getValueAsBytes(Object value, DataType dataType) {
     ByteBuffer b;
-    switch (storeDataType) {
+    switch (dataType) {
       case BYTE:
         b = ByteBuffer.allocate(8);
         b.putLong((byte) value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index f08444b..ce16ad5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -17,10 +17,13 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import java.math.BigDecimal;
+
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
@@ -31,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.Direc
 import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 
 /**
  * Default factory will select encoding base on column page data type and statistics
@@ -113,10 +117,11 @@ public class DefaultEncodingFactory extends EncodingFactory {
       case INT:
       case LONG:
         return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
+      case DECIMAL:
+        return createEncoderForDecimalDataTypeMeasure(columnPage);
       case FLOAT:
       case DOUBLE:
         return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
-      case DECIMAL:
       case BYTE_ARRAY:
         return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
       default:
@@ -124,6 +129,19 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
   }
 
+  private ColumnPageEncoder createEncoderForDecimalDataTypeMeasure(ColumnPage columnPage) {
+    DecimalConverterFactory.DecimalConverterType decimalConverterType =
+        ((DecimalColumnPage) columnPage).getDecimalConverter().getDecimalConverterType();
+    switch (decimalConverterType) {
+      case DECIMAL_INT:
+      case DECIMAL_LONG:
+        return selectCodecByAlgorithmForDecimal(columnPage.getStatistics(), decimalConverterType)
+            .createEncoder(null);
+      default:
+        return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
+    }
+  }
+
   private static DataType fitLongMinMax(long max, long min) {
     if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
       return DataType.BYTE;
@@ -155,6 +173,35 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
   }
 
+  private static DataType fitMinMaxForDecimalType(DataType dataType, Object max, Object min,
+      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
+    long maxValue = ((BigDecimal) max).unscaledValue().longValue();
+    long minValue = ((BigDecimal) min).unscaledValue().longValue();
+    switch (decimalConverterType) {
+      case DECIMAL_INT:
+        return fitLongMinMax((int) maxValue, (int) minValue);
+      case DECIMAL_LONG:
+        return fitLongMinMax(maxValue, minValue);
+      default:
+        throw new RuntimeException("internal error: " + dataType);
+    }
+  }
+
+  private static DataType fitDeltaForDecimalType(DataType dataType, Object max, Object min,
+      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
+    long maxValue = ((BigDecimal) max).unscaledValue().longValue();
+    long minValue = ((BigDecimal) min).unscaledValue().longValue();
+    switch (decimalConverterType) {
+      case DECIMAL_INT:
+        long value = maxValue - minValue;
+        return compareMinMaxAndSelectDataType(value);
+      case DECIMAL_LONG:
+        return DataType.LONG;
+      default:
+        throw new RuntimeException("internal error: " + dataType);
+    }
+  }
+
   // fit the long input value into minimum data type
   private static DataType fitDelta(DataType dataType, Object max, Object min) {
     // use long data type to calculate delta to avoid overflow
@@ -177,6 +224,10 @@ public class DefaultEncodingFactory extends EncodingFactory {
       default:
         throw new RuntimeException("internal error: " + dataType);
     }
+    return compareMinMaxAndSelectDataType(value);
+  }
+
+  private static DataType compareMinMaxAndSelectDataType(long value) {
     if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
       return DataType.BYTE;
     } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
@@ -204,8 +255,10 @@ public class DefaultEncodingFactory extends EncodingFactory {
     } else {
       deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
     }
-    if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) ==
-        srcDataType.getSizeInBytes()) {
+    // in case of decimal data type check if the decimal converter type is Int or Long and based on
+    // that get size in bytes
+    if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType
+        .getSizeInBytes()) {
       // no effect to use adaptive or delta, use compression only
       return new DirectCompressCodec(stats.getDataType());
     }
@@ -247,4 +300,38 @@ public class DefaultEncodingFactory extends EncodingFactory {
     }
   }
 
+  /**
+   * choose between adaptive encoder or delta adaptive encoder, based on whose target data type
+   * size is smaller for decimal data type
+   */
+  static ColumnPageCodec selectCodecByAlgorithmForDecimal(SimpleStatsResult stats,
+      DecimalConverterFactory.DecimalConverterType decimalConverterType) {
+    DataType srcDataType = stats.getDataType();
+    DataType adaptiveDataType =
+        fitMinMaxForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
+            decimalConverterType);
+    DataType deltaDataType;
+
+    if (adaptiveDataType == DataType.LONG) {
+      deltaDataType = DataType.LONG;
+    } else {
+      deltaDataType = fitDeltaForDecimalType(stats.getDataType(), stats.getMax(), stats.getMin(),
+          decimalConverterType);
+    }
+    // in case of decimal data type check if the decimal converter type is Int or Long and based on
+    // that get size in bytes
+    if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType
+        .getSizeInBytes()) {
+      // no effect to use adaptive or delta, use compression only
+      return new DirectCompressCodec(stats.getDataType());
+    }
+    if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
+      // choose adaptive encoding
+      return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
+    } else {
+      // choose delta adaptive encoding
+      return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index ad327f7..383670a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,8 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.format.Encoding;
 
+import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
+
 /**
  * Codec for integer (byte, short, int, long) data type and floating data type (in case of
  * scale is 0).
@@ -65,6 +68,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       case DOUBLE:
         this.max = (long) (double) stats.getMax();
         break;
+      case DECIMAL:
+        this.max = ((BigDecimal) stats.getMax()).unscaledValue().longValue();
+        break;
       default:
         // this codec is for integer type only
         throw new UnsupportedOperationException(
@@ -111,13 +117,18 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
     };
   }
 
-  @Override
-  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+  @Override public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
     return new ColumnPageDecoder() {
-      @Override
-      public ColumnPage decode(byte[] input, int offset, int length)
+      @Override public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
+        ColumnPage page = null;
+        switch (meta.getSchemaDataType()) {
+          case DECIMAL:
+            page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+            break;
+          default:
+            page = ColumnPage.decompress(meta, input, offset, length);
+        }
         return LazyColumnPage.newPage(page, converter);
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 6df2e64..c7c10a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -92,7 +92,14 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
+        ColumnPage page = null;
+        switch (meta.getSchemaDataType()) {
+          case DECIMAL:
+            page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+            break;
+          default:
+            page = ColumnPage.decompress(meta, input, offset, length);
+        }
         return LazyColumnPage.newPage(page, converter);
       }
     };
@@ -151,6 +158,9 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
         case INT:
           encodedPage.putInt(rowId, (int) value);
           break;
+        case LONG:
+          encodedPage.putLong(rowId, (long) value);
+          break;
         default:
           throw new RuntimeException("internal error: " + debugInfo());
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f204376/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 0343e38..9dbc9b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.metadata.datatype;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
@@ -53,37 +52,50 @@ public final class DecimalConverterFactory {
     return data;
   }
 
+  public enum DecimalConverterType {
+    DECIMAL_LV(-1), DECIMAL_INT(4), DECIMAL_LONG(8), DECIMAL_UNSCALED(-1);
+
+    private int sizeInBytes;
+
+    DecimalConverterType(int sizeInBytes) {
+      this.sizeInBytes = sizeInBytes;
+    }
+
+    public int getSizeInBytes() {
+      return sizeInBytes;
+    }
+
+  }
+
   public interface DecimalConverter {
 
-    byte[] convert(BigDecimal decimal);
+    Object convert(BigDecimal decimal);
 
-    BigDecimal getDecimal(byte[] bytes);
+    BigDecimal getDecimal(Object valueToBeConverted);
 
     void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId);
 
     int getSize();
 
+    DecimalConverterType getDecimalConverterType();
+
   }
 
   public class DecimalIntConverter implements DecimalConverter {
 
-    private ByteBuffer buffer = ByteBuffer.allocate(4);
-
     private int scale;
 
     public DecimalIntConverter(int precision, int scale) {
       this.scale = scale;
     }
 
-    @Override public byte[] convert(BigDecimal decimal) {
+    @Override public Object convert(BigDecimal decimal) {
       long longValue = decimal.unscaledValue().longValue();
-      buffer.putInt(0, (int) longValue);
-      return buffer.array().clone();
+      return (int) longValue;
     }
 
-    @Override public BigDecimal getDecimal(byte[] bytes) {
-      long unscaled = getUnscaledLong(bytes);
-      return BigDecimal.valueOf(unscaled, scale);
+    @Override public BigDecimal getDecimal(Object valueToBeConverted) {
+      return BigDecimal.valueOf((Long) valueToBeConverted, scale);
     }
 
     @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) {
@@ -94,6 +106,10 @@ public final class DecimalConverterFactory {
     @Override public int getSize() {
       return 4;
     }
+
+    @Override public DecimalConverterType getDecimalConverterType() {
+      return DecimalConverterType.DECIMAL_INT;
+    }
   }
 
   private long getUnscaledLong(byte[] bytes) {
@@ -112,23 +128,19 @@ public final class DecimalConverterFactory {
 
   public class DecimalLongConverter implements DecimalConverter {
 
-    private ByteBuffer buffer = ByteBuffer.allocate(8);
-
     private int scale;
 
     public DecimalLongConverter(int precision, int scale) {
       this.scale = scale;
     }
 
-    @Override public byte[] convert(BigDecimal decimal) {
+    @Override public Object convert(BigDecimal decimal) {
       long longValue = decimal.unscaledValue().longValue();
-      buffer.putLong(0, longValue);
-      return buffer.array().clone();
+      return longValue;
     }
 
-    @Override public BigDecimal getDecimal(byte[] bytes) {
-      long unscaled = getUnscaledLong(bytes);
-      return BigDecimal.valueOf(unscaled, scale);
+    @Override public BigDecimal getDecimal(Object valueToBeConverted) {
+      return BigDecimal.valueOf((Long) valueToBeConverted, scale);
     }
 
     @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) {
@@ -139,6 +151,10 @@ public final class DecimalConverterFactory {
     @Override public int getSize() {
       return 8;
     }
+
+    @Override public DecimalConverterType getDecimalConverterType() {
+      return DecimalConverterType.DECIMAL_LONG;
+    }
   }
 
   public class DecimalUnscaledConverter implements DecimalConverter {
@@ -155,7 +171,7 @@ public final class DecimalConverterFactory {
       this.numBytes = minBytesForPrecision[precision];
     }
 
-    @Override public byte[] convert(BigDecimal decimal) {
+    @Override public Object convert(BigDecimal decimal) {
       byte[] bytes = decimal.unscaledValue().toByteArray();
       byte[] fixedLengthBytes = null;
       if (bytes.length == numBytes) {
@@ -181,8 +197,8 @@ public final class DecimalConverterFactory {
       return value;
     }
 
-    @Override public BigDecimal getDecimal(byte[] bytes) {
-      BigInteger bigInteger = new BigInteger(bytes);
+    @Override public BigDecimal getDecimal(Object valueToBeConverted) {
+      BigInteger bigInteger = new BigInteger((byte[]) valueToBeConverted);
       return new BigDecimal(bigInteger, scale);
     }
 
@@ -193,18 +209,22 @@ public final class DecimalConverterFactory {
     @Override public int getSize() {
       return numBytes;
     }
+
+    @Override public DecimalConverterType getDecimalConverterType() {
+      return DecimalConverterType.DECIMAL_UNSCALED;
+    }
   }
 
   public static class LVBytesDecimalConverter implements DecimalConverter {
 
     public static LVBytesDecimalConverter INSTANCE = new LVBytesDecimalConverter();
 
-    @Override public byte[] convert(BigDecimal decimal) {
+    @Override public Object convert(BigDecimal decimal) {
       return DataTypeUtil.bigDecimalToByte(decimal);
     }
 
-    @Override public BigDecimal getDecimal(byte[] bytes) {
-      return DataTypeUtil.byteToBigDecimal(bytes);
+    @Override public BigDecimal getDecimal(Object valueToBeConverted) {
+      return DataTypeUtil.byteToBigDecimal((byte[]) valueToBeConverted);
     }
 
     @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) {
@@ -214,6 +234,10 @@ public final class DecimalConverterFactory {
     @Override public int getSize() {
       return -1;
     }
+
+    @Override public DecimalConverterType getDecimalConverterType() {
+      return DecimalConverterType.DECIMAL_LV;
+    }
   }
 
   public DecimalConverter getDecimalConverter(int precision, int scale) {


Mime
View raw message