carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [4/7] carbondata git commit: [CARBONDATA-1098] Change page statistics use exact type and use column page in writer
Date Fri, 28 Jul 2017 06:10:04 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
deleted file mode 100644
index c954a33..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.compression;
-
-public interface Compression {
-  byte[] compress(byte[] input);
-  byte[] decompress(byte[] input);
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
index 2e8eff2..6b3a365 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
@@ -37,7 +37,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
   protected final Compressor compressor;
 
   // statistics of this page, can be used by subclass
-  protected final ColumnPageStatsVO stats;
+  protected final SimpleStatsResult stats;
 
   // the data type used for storage
   protected final DataType targetDataType;
@@ -46,7 +46,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
   protected final DataType srcDataType;
 
   protected AdaptiveCompressionCodec(DataType srcDataType, DataType targetDataType,
-      ColumnPageStatsVO stats, Compressor compressor) {
+      SimpleStatsResult stats, Compressor compressor) {
     this.stats = stats;
     this.srcDataType = srcDataType;
     this.targetDataType = targetDataType;
@@ -55,7 +55,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
 
   public abstract String getName();
 
-  public abstract byte[] encode(ColumnPage input) throws MemoryException, IOException;
+  public abstract EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException;
 
   public abstract ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
deleted file mode 100644
index fe15ba7..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.LazyColumnPage;
-import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * Codec for integer (byte, short, int, long) data type page.
- * This codec will do type casting on page data to make storage minimum.
- */
-class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
-
-  private ColumnPage encodedPage;
-
-  public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
-      ColumnPageStatsVO stats, Compressor compressor) {
-    return new AdaptiveIntegerCodec(srcDataType, targetDataType, stats, compressor);
-  }
-
-  private AdaptiveIntegerCodec(DataType srcDataType, DataType targetDataType,
-      ColumnPageStatsVO stats, Compressor compressor) {
-    super(srcDataType, targetDataType, stats, compressor);
-  }
-
-  @Override
-  public String getName() {
-    return "AdaptiveIntegerCodec";
-  }
-
-  @Override
-  public byte[] encode(ColumnPage input) throws MemoryException, IOException {
-    encodedPage = ColumnPage
-        .newPage(targetDataType, input.getPageSize(), stats.getScale(), stats.getPrecision());
-    input.encode(codec);
-    byte[] result = encodedPage.compress(compressor);
-    encodedPage.freeMemory();
-    return result;
-  }
-
-  @Override
-  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
-    if (srcDataType.equals(targetDataType)) {
-      return ColumnPage
-          .decompress(compressor, targetDataType, input, offset, length, stats.getScale(),
-              stats.getPrecision());
-    } else {
-      ColumnPage page = ColumnPage
-          .decompress(compressor, targetDataType, input, offset, length, stats.getScale(),
-              stats.getPrecision());
-      return LazyColumnPage.newPage(page, codec);
-    }
-  }
-
-  // encoded value = (type cast page value to target data type)
-  private PrimitiveCodec codec = new PrimitiveCodec() {
-    @Override
-    public void encode(int rowId, byte value) {
-      switch (targetDataType) {
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, short value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, int value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) value);
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, long value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) value);
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int) value);
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int) value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, float value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) value);
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int) value);
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int) value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, double value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte) value);
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short) value);
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int) value);
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int) value);
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long) value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public long decodeLong(byte value) {
-      return value;
-    }
-
-    @Override
-    public long decodeLong(short value) {
-      return value;
-    }
-
-    @Override
-    public long decodeLong(int value) {
-      return value;
-    }
-
-    @Override
-    public double decodeDouble(byte value) {
-      return value;
-    }
-
-    @Override
-    public double decodeDouble(short value) {
-      return value;
-    }
-
-    @Override
-    public double decodeDouble(int value) {
-      return value;
-    }
-
-    @Override
-    public double decodeDouble(long value) {
-      return value;
-    }
-
-    @Override
-    public double decodeDouble(float value) {
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public double decodeDouble(double value) {
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-  };
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegralCodec.java
new file mode 100644
index 0000000..ed8d734
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegralCodec.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.CodecMetaFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This codec will do type casting on page data to make storage minimum.
+ */
+class AdaptiveIntegralCodec extends AdaptiveCompressionCodec {
+
+  private ColumnPage encodedPage;
+
+  public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats, Compressor compressor) {
+    return new AdaptiveIntegralCodec(srcDataType, targetDataType, stats, compressor);
+  }
+
+  private AdaptiveIntegralCodec(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats, Compressor compressor) {
+    super(srcDataType, targetDataType, stats, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "AdaptiveIntegralCodec";
+  }
+
+  @Override
+  public EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException {
+    encodedPage = ColumnPage
+        .newPage(targetDataType, input.getPageSize(), stats.getScale(), stats.getPrecision());
+    input.encode(codec);
+    byte[] result = encodedPage.compress(compressor);
+    encodedPage.freeMemory();
+    return new EncodedMeasurePage(input.getPageSize(), result,
+        CodecMetaFactory.createMeta(stats, targetDataType),
+        ((SimpleStatsResult)input.getStatistics()).getNullBits());
+  }
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+    ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+    return LazyColumnPage.newPage(page, codec);
+  }
+
+  // encoded value = (type cast page value to target data type)
+  private PrimitiveCodec codec = new PrimitiveCodec() {
+    @Override
+    public void encode(int rowId, byte value) {
+      switch (targetDataType) {
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte) value);
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short) value);
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int) value);
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int) value);
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long) value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+  };
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
index 36d5989..a77bf69 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
@@ -34,11 +34,9 @@ public interface ColumnPageCodec {
   String getName();
 
   /**
-   * apply a column page and output encoded byte array
-   * @param input column page to apply
-   * @return encoded data
+   * encode a column page and return the encoded data
    */
-  byte[] encode(ColumnPage input) throws MemoryException, IOException;
+  EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException;
 
   /**
    * decode byte array from offset to a column page
@@ -48,4 +46,5 @@ public interface ColumnPageCodec {
    * @return decoded data
    */
   ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
index 659dc2a..d2d3a44 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -32,29 +32,64 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
   private static final int THREE_BYTES_MAX = (int) Math.pow(2, 23) - 1;
   private static final int THREE_BYTES_MIN = - THREE_BYTES_MAX - 1;
 
-  // fit the long input value into minimum data type
-  private static DataType fitDataType(long value) {
-    if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+  private DataType fitLongMinMax(long max, long min) {
+    if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
       return DataType.BYTE;
-    } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+    } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
       return DataType.SHORT;
-    } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
+    } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
       return DataType.SHORT_INT;
-    } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+    } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
       return DataType.INT;
     } else {
       return DataType.LONG;
     }
   }
 
-  private DataType fitDataType(long max, long min) {
-    if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
+  private DataType fitMinMax(DataType dataType, Object max, Object min) {
+    switch (dataType) {
+      case BYTE:
+        return fitLongMinMax((byte) max, (byte) min);
+      case SHORT:
+        return fitLongMinMax((short) max, (short) min);
+      case INT:
+        return fitLongMinMax((int) max, (int) min);
+      case LONG:
+        return fitLongMinMax((long) max, (long) min);
+      case DOUBLE:
+        return DataType.DOUBLE;
+      default:
+        throw new RuntimeException("internal error: " + dataType);
+    }
+  }
+
+  // fit the long input value into minimum data type
+  private DataType fitDelta(DataType dataType, Object max, Object min) {
+    // use long data type to calculate delta to avoid overflow
+    long value;
+    switch (dataType) {
+      case BYTE:
+        value = (long)(byte) max - (long)(byte) min;
+        break;
+      case SHORT:
+        value = (long)(short) max - (long)(short) min;
+        break;
+      case INT:
+        value = (long)(int) max - (long)(int) min;
+        break;
+      case LONG:
+        // TODO: add overflow detection and return delta type
+        return DataType.LONG;
+      default:
+        throw new RuntimeException("internal error: " + dataType);
+    }
+    if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
       return DataType.BYTE;
-    } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
+    } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
       return DataType.SHORT;
-    } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
+    } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
       return DataType.SHORT_INT;
-    } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
+    } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
       return DataType.INT;
     } else {
       return DataType.LONG;
@@ -63,10 +98,9 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
 
   // choose between adaptive encoder or delta adaptive encoder, based on whose target data type
   // size is smaller
-  @Override
-  ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats) {
+  @Override ColumnPageCodec newCodecForIntegralType(SimpleStatsResult stats) {
     DataType srcDataType = stats.getDataType();
-    DataType adaptiveDataType = fitDataType((long)stats.getMax(), (long)stats.getMin());
+    DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
     DataType deltaDataType;
 
     // TODO: this handling is for data compatibility, change to Override check when implementing
@@ -74,7 +108,7 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
     if (adaptiveDataType == DataType.LONG) {
       deltaDataType = DataType.LONG;
     } else {
-      deltaDataType = fitDataType((long) stats.getMax() - (long) stats.getMin());
+      deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
     }
     if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) ==
         srcDataType.getSizeInBytes()) {
@@ -83,27 +117,24 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
     }
     if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
       // choose adaptive encoding
-      return AdaptiveIntegerCodec.newInstance(
+      return AdaptiveIntegralCodec.newInstance(
           stats.getDataType(), adaptiveDataType, stats, compressor);
     } else {
       // choose delta adaptive encoding
-      return DeltaIntegerCodec.newInstance(stats.getDataType(), deltaDataType, stats, compressor);
+      return DeltaIntegralCodec.newInstance(stats.getDataType(), deltaDataType, stats, compressor);
     }
   }
 
-  @Override
-  ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) {
+  @Override ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats) {
     return DirectCompressCodec.newInstance(stats, compressor);
   }
 
   // for decimal, currently it is a very basic implementation
-  @Override
-  ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) {
+  @Override ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats) {
     return DirectCompressCodec.newInstance(stats, compressor);
   }
 
-  @Override
-  ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) {
+  @Override ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats) {
     return DirectCompressCodec.newInstance(stats, compressor);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
deleted file mode 100644
index a45552a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.LazyColumnPage;
-import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * Codec for integer (byte, short, int, long) data type page.
- * This codec will calculate delta of page max value and page value,
- * and do type casting of the diff to make storage minimum.
- */
-public class DeltaIntegerCodec extends AdaptiveCompressionCodec {
-
-  private ColumnPage encodedPage;
-
-  private long max;
-
-  public static DeltaIntegerCodec newInstance(DataType srcDataType, DataType targetDataType,
-      ColumnPageStatsVO stats, Compressor compressor) {
-    return new DeltaIntegerCodec(srcDataType, targetDataType, stats, compressor);
-  }
-
-  private DeltaIntegerCodec(DataType srcDataType, DataType targetDataType,
-      ColumnPageStatsVO stats, Compressor compressor) {
-    super(srcDataType, targetDataType, stats, compressor);
-    switch (srcDataType) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        max = (long) stats.getMax();
-        break;
-      case FLOAT:
-      case DOUBLE:
-        max = (long)((double) stats.getMax());
-        break;
-    }
-  }
-
-  @Override
-  public String getName() {
-    return "DeltaIntegerCodec";
-  }
-
-  @Override
-  public byte[] encode(ColumnPage input) throws MemoryException, IOException {
-    encodedPage = ColumnPage
-        .newPage(targetDataType, input.getPageSize(), stats.getScale(), stats.getPrecision());
-    input.encode(codec);
-    byte[] result = encodedPage.compress(compressor);
-    encodedPage.freeMemory();
-    return result;
-  }
-
-  @Override
-  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
-    if (srcDataType.equals(targetDataType)) {
-      return ColumnPage
-          .decompress(compressor, targetDataType, input, offset, length, stats.getScale(),
-              stats.getPrecision());
-    } else {
-      ColumnPage page = ColumnPage
-          .decompress(compressor, targetDataType, input, offset, length, stats.getScale(),
-              stats.getPrecision());
-      return LazyColumnPage.newPage(page, codec);
-    }
-  }
-
-  // encoded value = (max value of page) - (page value)
-  private PrimitiveCodec codec = new PrimitiveCodec() {
-    @Override
-    public void encode(int rowId, byte value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, short value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, int value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int)(max - value));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, long value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int)(max - value));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int)(max - value));
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, max - value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, float value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int)(max - value));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int)(max - value));
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, double value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId, (byte)(max - value));
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId, (short)(max - value));
-          break;
-        case SHORT_INT:
-          encodedPage.putShortInt(rowId, (int)(max - value));
-          break;
-        case INT:
-          encodedPage.putInt(rowId, (int)(max - value));
-          break;
-        case LONG:
-          encodedPage.putLong(rowId, (long)(max - value));
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public long decodeLong(byte value) {
-      return max - value;
-    }
-
-    @Override
-    public long decodeLong(short value) {
-      return max - value;
-    }
-
-    @Override
-    public long decodeLong(int value) {
-      return max - value;
-    }
-
-    @Override
-    public double decodeDouble(byte value) {
-      return max - value;
-    }
-
-    @Override
-    public double decodeDouble(short value) {
-      return max - value;
-    }
-
-    @Override
-    public double decodeDouble(int value) {
-      return max - value;
-    }
-
-    @Override
-    public double decodeDouble(long value) {
-      return max - value;
-    }
-
-    @Override
-    public double decodeDouble(float value) {
-      // this codec is for integer type only
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public double decodeDouble(double value) {
-      // this codec is for integer type only
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegralCodec.java
new file mode 100644
index 0000000..53a8295
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegralCodec.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.CodecMetaFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This codec will calculate delta of page max value and page value,
+ * and do type casting of the diff to make storage minimum.
+ */
+public class DeltaIntegralCodec extends AdaptiveCompressionCodec {
+
+  private ColumnPage encodedPage;
+
+  private long max;
+
+  public static DeltaIntegralCodec newInstance(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats, Compressor compressor) {
+    return new DeltaIntegralCodec(srcDataType, targetDataType, stats, compressor);
+  }
+
+  private DeltaIntegralCodec(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats, Compressor compressor) {
+    super(srcDataType, targetDataType, stats, compressor);
+    switch (srcDataType) {
+      case BYTE:
+        max = (byte) stats.getMax();
+        break;
+      case SHORT:
+        max = (short) stats.getMax();
+        break;
+      case INT:
+        max = (int) stats.getMax();
+        break;
+      case LONG:
+        max = (long) stats.getMax();
+        break;
+      case FLOAT:
+      case DOUBLE:
+        max = (long)((double) stats.getMax());
+        break;
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "DeltaIntegralCodec";
+  }
+
+  @Override
+  public EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException {
+    encodedPage = ColumnPage
+        .newPage(targetDataType, input.getPageSize(), stats.getScale(), stats.getPrecision());
+    input.encode(codec);
+    byte[] result = encodedPage.compress(compressor);
+    encodedPage.freeMemory();
+    return new EncodedMeasurePage(input.getPageSize(),
+        result,
+        CodecMetaFactory.createMeta(stats, targetDataType),
+        ((SimpleStatsResult)input.getStatistics()).getNullBits());
+  }
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+    ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+    return LazyColumnPage.newPage(page, codec);
+  }
+
+  // encoded value = (max value of page) - (page value)
+  private PrimitiveCodec codec = new PrimitiveCodec() {
+    @Override
+    public void encode(int rowId, byte value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, max - value);
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      switch (targetDataType) {
+        case BYTE:
+          encodedPage.putByte(rowId, (byte)(max - value));
+          break;
+        case SHORT:
+          encodedPage.putShort(rowId, (short)(max - value));
+          break;
+        case SHORT_INT:
+          encodedPage.putShortInt(rowId, (int)(max - value));
+          break;
+        case INT:
+          encodedPage.putInt(rowId, (int)(max - value));
+          break;
+        case LONG:
+          encodedPage.putLong(rowId, (long)(max - value));
+          break;
+        default:
+          throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      return max - value;
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      return max - value;
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return max - value;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      // this codec is for integer type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      // this codec is for integer type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
index d608fea..a1d4b61 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
@@ -21,8 +21,11 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.CodecMetaFactory;
 
 /**
  * This codec directly apply compression on the input data
@@ -30,14 +33,14 @@ import org.apache.carbondata.core.memory.MemoryException;
 public class DirectCompressCodec implements ColumnPageCodec {
 
   private Compressor compressor;
-  private ColumnPageStatsVO stats;
+  private SimpleStatsResult stats;
 
-  private DirectCompressCodec(ColumnPageStatsVO stats, Compressor compressor) {
+  private DirectCompressCodec(SimpleStatsResult stats, Compressor compressor) {
     this.compressor = compressor;
     this.stats = stats;
   }
 
-  public static DirectCompressCodec newInstance(ColumnPageStatsVO stats, Compressor compressor) {
+  public static DirectCompressCodec newInstance(SimpleStatsResult stats, Compressor compressor) {
     return new DirectCompressCodec(stats, compressor);
   }
 
@@ -47,14 +50,89 @@ public class DirectCompressCodec implements ColumnPageCodec {
   }
 
   @Override
-  public byte[] encode(ColumnPage input) throws IOException, MemoryException {
-    return input.compress(compressor);
+  public EncodedColumnPage encode(ColumnPage input) throws IOException, MemoryException {
+    byte[] result = input.compress(compressor);
+    return new EncodedMeasurePage(input.getPageSize(), result,
+        CodecMetaFactory.createMeta(stats, stats.getDataType()),
+        ((SimpleStatsResult)input.getStatistics()).getNullBits());
   }
 
   @Override
   public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
-    return ColumnPage
+    ColumnPage page = ColumnPage
         .decompress(compressor, stats.getDataType(), input, offset, length, stats.getScale(),
             stats.getPrecision());
+    return LazyColumnPage.newPage(page, codec);
   }
+
+  private PrimitiveCodec codec = new PrimitiveCodec() {
+    @Override
+    public void encode(int rowId, byte value) {
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      return value;
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      return value;
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      return value;
+    }
+  };
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
new file mode 100644
index 0000000..1630e06
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.carbondata.format.DataChunk2;
+
+/**
+ * An column page after encoding and compression.
+ */
+public abstract class EncodedColumnPage {
+
+  // number of row of this page
+  protected int pageSize;
+
+  // encoded and compressed column page data
+  protected byte[] encodedData;
+
+  protected BitSet nullBitSet;
+
+  // metadata of this page
+  protected DataChunk2 dataChunk2;
+
+  EncodedColumnPage(int pageSize, byte[] encodedData) {
+    this.pageSize = pageSize;
+    this.encodedData = encodedData;
+  }
+
+  public abstract DataChunk2 buildDataChunk2() throws IOException;
+
+  /**
+   * return the encoded and compressed data page
+   */
+  public byte[] getEncodedData() {
+    return encodedData;
+  }
+
+  /**
+   * return the size of the s
+   */
+  public int getSerializedSize() {
+    return encodedData.length;
+  }
+
+  public ByteBuffer serialize() {
+    return ByteBuffer.wrap(encodedData);
+  }
+
+  public DataChunk2 getDataChunk2() {
+    return dataChunk2;
+  }
+
+  public void setNullBitSet(BitSet nullBitSet) {
+    this.nullBitSet = nullBitSet;
+  }
+
+  public BitSet getNullBitSet() {
+    return nullBitSet;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
deleted file mode 100644
index 0d1b2e4..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-
-// result result of all columns
-public class EncodedData {
-  // dimension data that include rowid (index)
-  public IndexStorage[] indexStorages;
-
-  // encoded and compressed dimension data
-  public byte[][] dimensions;
-
-  // encoded and compressed measure data
-  public byte[][] measures;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedDimensionPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedDimensionPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedDimensionPage.java
new file mode 100644
index 0000000..30d58cf
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedDimensionPage.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.DimensionType;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.format.BlockletMinMaxIndex;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.Encoding;
+import org.apache.carbondata.format.SortState;
+
+/**
+ * Encoded dimension page that include data and inverted index
+ */
+public class EncodedDimensionPage extends EncodedColumnPage {
+  private IndexStorage indexStorage;
+  private DimensionType dimensionType;
+
+  public EncodedDimensionPage(int pageSize, byte[] encodedData, IndexStorage indexStorage,
+      DimensionType dimensionType) {
+    super(pageSize, encodedData);
+    this.indexStorage = indexStorage;
+    this.dimensionType = dimensionType;
+    this.dataChunk2 = buildDataChunk2();
+  }
+
+  private int getTotalRowIdPageLengthInBytes() {
+    return CarbonCommonConstants.INT_SIZE_IN_BYTE +
+        indexStorage.getRowIdPageLengthInBytes() + indexStorage.getRowIdRlePageLengthInBytes();
+  }
+
+  @Override
+  public int getSerializedSize() {
+    int size = encodedData.length;
+    if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+      size += getTotalRowIdPageLengthInBytes();
+    }
+    if (indexStorage.getDataRlePageLengthInBytes() > 0) {
+      size += indexStorage.getDataRlePageLengthInBytes();
+    }
+    return size;
+  }
+
+  @Override
+  public ByteBuffer serialize() {
+    ByteBuffer buffer = ByteBuffer.allocate(getSerializedSize());
+    buffer.put(encodedData);
+    if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+      buffer.putInt(indexStorage.getRowIdPageLengthInBytes());
+      short[] rowIdPage = (short[])indexStorage.getRowIdPage();
+      for (short rowId : rowIdPage) {
+        buffer.putShort(rowId);
+      }
+      if (indexStorage.getRowIdRlePageLengthInBytes() > 0) {
+        short[] rowIdRlePage = (short[])indexStorage.getRowIdRlePage();
+        for (short rowIdRle : rowIdRlePage) {
+          buffer.putShort(rowIdRle);
+        }
+      }
+    }
+    if (indexStorage.getDataRlePageLengthInBytes() > 0) {
+      short[] dataRlePage = (short[])indexStorage.getDataRlePage();
+      for (short dataRle : dataRlePage) {
+        buffer.putShort(dataRle);
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  @Override
+  public DataChunk2 buildDataChunk2() {
+    DataChunk2 dataChunk = new DataChunk2();
+    dataChunk.min_max = new BlockletMinMaxIndex();
+    dataChunk.setChunk_meta(CarbonMetadataUtil.getSnappyChunkCompressionMeta());
+    dataChunk.setNumberOfRowsInpage(pageSize);
+    List<Encoding> encodings = new ArrayList<Encoding>();
+    dataChunk.setData_page_length(encodedData.length);
+    if (dimensionType == DimensionType.GLOBAL_DICTIONARY ||
+        dimensionType == DimensionType.DIRECT_DICTIONARY ||
+        dimensionType == DimensionType.COMPLEX) {
+      encodings.add(Encoding.DICTIONARY);
+    }
+    if (dimensionType == DimensionType.DIRECT_DICTIONARY) {
+      encodings.add(Encoding.DIRECT_DICTIONARY);
+    }
+    if (indexStorage.getDataRlePageLengthInBytes() > 0 ||
+        dimensionType == DimensionType.GLOBAL_DICTIONARY) {
+      dataChunk.setRle_page_length(indexStorage.getDataRlePageLengthInBytes());
+      encodings.add(Encoding.RLE);
+    }
+    SortState sort = (indexStorage.getRowIdPageLengthInBytes() > 0) ?
+        SortState.SORT_EXPLICIT : SortState.SORT_NATIVE;
+    dataChunk.setSort_state(sort);
+    if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+      dataChunk.setRowid_page_length(getTotalRowIdPageLengthInBytes());
+      encodings.add(Encoding.INVERTED_INDEX);
+    }
+    if (dimensionType == DimensionType.PLAIN_VALUE) {
+      dataChunk.min_max.addToMax_values(ByteBuffer.wrap(
+          TablePageStatistics.updateMinMaxForNoDictionary(indexStorage.getMax())));
+      dataChunk.min_max.addToMin_values(ByteBuffer.wrap(
+          TablePageStatistics.updateMinMaxForNoDictionary(indexStorage.getMin())));
+    } else {
+      dataChunk.min_max.addToMax_values(ByteBuffer.wrap(indexStorage.getMax()));
+      dataChunk.min_max.addToMin_values(ByteBuffer.wrap(indexStorage.getMin()));
+    }
+    dataChunk.setEncoders(encodings);
+    return dataChunk;
+  }
+
+  public IndexStorage getIndexStorage() {
+    return indexStorage;
+  }
+
+  public DimensionType getDimensionType() {
+    return dimensionType;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedMeasurePage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedMeasurePage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedMeasurePage.java
new file mode 100644
index 0000000..0ef48c6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedMeasurePage.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletMinMaxIndex;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.Encoding;
+import org.apache.carbondata.format.PresenceMeta;
+
+/**
+ * Encoded measure page that include data and statistics
+ */
+public class EncodedMeasurePage extends EncodedColumnPage {
+
+  private ValueEncoderMeta metaData;
+
+  public EncodedMeasurePage(int pageSize, byte[] encodedData, ValueEncoderMeta metaData,
+      BitSet nullBitSet) throws IOException {
+    super(pageSize, encodedData);
+    this.metaData = metaData;
+    this.nullBitSet = nullBitSet;
+    this.dataChunk2 = buildDataChunk2();
+  }
+
+  @Override
+  public DataChunk2 buildDataChunk2() throws IOException {
+    DataChunk2 dataChunk = new DataChunk2();
+    dataChunk.min_max = new BlockletMinMaxIndex();
+    dataChunk.setChunk_meta(CarbonMetadataUtil.getSnappyChunkCompressionMeta());
+    dataChunk.setNumberOfRowsInpage(pageSize);
+    dataChunk.setData_page_length(encodedData.length);
+    dataChunk.setRowMajor(false);
+    // TODO : Change as per this encoders.
+    List<Encoding> encodings = new ArrayList<Encoding>();
+    encodings.add(Encoding.DELTA);
+    dataChunk.setEncoders(encodings);
+    PresenceMeta presenceMeta = new PresenceMeta();
+    presenceMeta.setPresent_bit_streamIsSet(true);
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    presenceMeta.setPresent_bit_stream(compressor.compressByte(nullBitSet.toByteArray()));
+    dataChunk.setPresence(presenceMeta);
+    List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
+    if (metaData instanceof ColumnPageCodecMeta) {
+      ColumnPageCodecMeta meta = (ColumnPageCodecMeta) metaData;
+      encoderMetaList.add(ByteBuffer.wrap(meta.serialize()));
+      dataChunk.min_max.addToMax_values(ByteBuffer.wrap(meta.getMaxAsBytes()));
+      dataChunk.min_max.addToMin_values(ByteBuffer.wrap(meta.getMinAsBytes()));
+    } else {
+      encoderMetaList.add(ByteBuffer.wrap(CarbonUtil.serializeEncodeMetaUsingByteBuffer(metaData)));
+      dataChunk.min_max.addToMax_values(ByteBuffer.wrap(CarbonUtil.getMaxValueAsBytes(metaData)));
+      dataChunk.min_max.addToMin_values(ByteBuffer.wrap(CarbonUtil.getMinValueAsBytes(metaData)));
+    }
+    dataChunk.setEncoder_meta(encoderMetaList);
+    return dataChunk;
+  }
+
+  public ValueEncoderMeta getMetaData() {
+    return metaData;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
index 77d3b74..ee13277 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
@@ -17,7 +17,9 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 
 /**
@@ -28,13 +30,13 @@ public abstract class EncodingStrategy {
   /**
    * create codec based on the page data type and statistics
    */
-  public ColumnPageCodec createCodec(ColumnPageStatsVO stats) {
+  public ColumnPageCodec createCodec(SimpleStatsResult stats) {
     switch (stats.getDataType()) {
       case BYTE:
       case SHORT:
       case INT:
       case LONG:
-        return newCodecForIntegerType(stats);
+        return newCodecForIntegralType(stats);
       case FLOAT:
       case DOUBLE:
         return newCodecForFloatingType(stats);
@@ -52,20 +54,58 @@ public abstract class EncodingStrategy {
    * create codec based on the page data type and statistics contained by ValueEncoderMeta
    */
   public ColumnPageCodec createCodec(ValueEncoderMeta meta, int scale, int precision) {
-    ColumnPageStatsVO stats = ColumnPageStatsVO.copyFrom(meta, scale, precision);
-    return createCodec(stats);
+    if (meta instanceof ColumnPageCodecMeta) {
+      ColumnPageCodecMeta codecMeta = (ColumnPageCodecMeta) meta;
+      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(codecMeta);
+      switch (codecMeta.getSrcDataType()) {
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+          return newCodecForIntegralType(stats);
+        case FLOAT:
+        case DOUBLE:
+          return newCodecForFloatingType(stats);
+        case DECIMAL:
+          return newCodecForDecimalType(stats);
+        case BYTE_ARRAY:
+          // no dictionary dimension
+          return newCodecForByteArrayType(stats);
+        default:
+          throw new RuntimeException("unsupported data type: " + stats.getDataType());
+      }
+    } else {
+      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(meta, scale, precision);
+      switch (meta.getType()) {
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+          return newCodecForIntegralType(stats);
+        case FLOAT:
+        case DOUBLE:
+          return newCodecForFloatingType(stats);
+        case DECIMAL:
+          return newCodecForDecimalType(stats);
+        case BYTE_ARRAY:
+          // no dictionary dimension
+          return newCodecForByteArrayType(stats);
+        default:
+          throw new RuntimeException("unsupported data type: " + stats.getDataType());
+      }
+    }
   }
 
   // for byte, short, int, long
-  abstract ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats);
+  abstract ColumnPageCodec newCodecForIntegralType(SimpleStatsResult stats);
 
   // for float, double
-  abstract ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats);
+  abstract ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats);
 
   // for decimal
-  abstract ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats);
+  abstract ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats);
 
   // for byte array
-  abstract ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats);
+  abstract ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats);
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java
new file mode 100644
index 0000000..ef8307e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.key;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+
+public class TablePageKey {
+  private int pageSize;
+
+  private byte[][] currentNoDictionaryKey;
+
+  // MDK start key
+  private byte[] startKey;
+
+  // MDK end key
+  private byte[] endKey;
+
+  // startkey for no dictionary columns
+  private byte[][] noDictStartKey;
+
+  // endkey for no diciotn
+  private byte[][] noDictEndKey;
+
+  // startkey for no dictionary columns after packing into one column
+  private byte[] packedNoDictStartKey;
+
+  // endkey for no dictionary columns after packing into one column
+  private byte[] packedNoDictEndKey;
+
+  private KeyGenerator mdkGenerator;
+  private SegmentProperties segmentProperties;
+  private boolean hasNoDictionary;
+
+  public TablePageKey(int pageSize, KeyGenerator mdkGenerator, SegmentProperties segmentProperties,
+      boolean hasNoDictionary) {
+    this.pageSize = pageSize;
+    this.mdkGenerator = mdkGenerator;
+    this.segmentProperties = segmentProperties;
+    this.hasNoDictionary = hasNoDictionary;
+  }
+
+  /** update all keys based on the input row */
+  public void update(int rowId, CarbonRow row, byte[] mdk) throws KeyGenException {
+    if (hasNoDictionary) {
+      currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+    }
+    if (rowId == 0) {
+      startKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+      noDictStartKey = currentNoDictionaryKey;
+    }
+    noDictEndKey = currentNoDictionaryKey;
+    if (rowId == pageSize - 1) {
+      endKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+      finalizeKeys();
+    }
+  }
+
+  public Object getKey() {
+    return this;
+  }
+
+  /** update all keys if SORT_COLUMNS option is used when creating table */
+  private void finalizeKeys() {
+    // If SORT_COLUMNS is used, may need to update start/end keys since the they may
+    // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
+    // start/end key
+    int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns();
+    if (numberOfDictSortColumns > 0) {
+      // if SORT_COLUMNS contain dictionary columns
+      int[] keySize = segmentProperties.getFixedLengthKeySplitter().getBlockKeySize();
+      if (keySize.length > numberOfDictSortColumns) {
+        // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
+        int newMdkLength = 0;
+        for (int i = 0; i < numberOfDictSortColumns; i++) {
+          newMdkLength += keySize[i];
+        }
+        byte[] newStartKeyOfSortKey = new byte[newMdkLength];
+        byte[] newEndKeyOfSortKey = new byte[newMdkLength];
+        System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
+        System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
+        startKey = newStartKeyOfSortKey;
+        endKey = newEndKeyOfSortKey;
+      }
+    } else {
+      startKey = new byte[0];
+      endKey = new byte[0];
+    }
+
+    // Do the same update for noDictionary start/end Key
+    int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
+    if (numberOfNoDictSortColumns > 0) {
+      // if sort_columns contain no-dictionary columns
+      if (noDictStartKey.length > numberOfNoDictSortColumns) {
+        byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
+        byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
+        System.arraycopy(
+            noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
+        System.arraycopy(
+            noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
+        noDictStartKey = newNoDictionaryStartKey;
+        noDictEndKey = newNoDictionaryEndKey;
+      }
+      packedNoDictStartKey =
+          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
+      packedNoDictEndKey =
+          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
+    } else {
+      noDictStartKey = new byte[0][];
+      noDictEndKey = new byte[0][];
+      packedNoDictStartKey = new byte[0];
+      packedNoDictEndKey = new byte[0];
+    }
+  }
+
+  public byte[] getStartKey() {
+    return startKey;
+  }
+
+  public byte[] getEndKey() {
+    return endKey;
+  }
+
+  public byte[] getNoDictStartKey() {
+    return packedNoDictStartKey;
+  }
+
+  public byte[] getNoDictEndKey() {
+    return packedNoDictEndKey;
+  }
+
+  public int getPageSize() {
+    return pageSize;
+  }
+
+  public byte[] serializeStartKey() {
+    byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(getNoDictStartKey());
+    ByteBuffer buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + startKey.length + updatedNoDictionaryStartKey.length);
+    buffer.putInt(startKey.length);
+    buffer.putInt(updatedNoDictionaryStartKey.length);
+    buffer.put(startKey);
+    buffer.put(updatedNoDictionaryStartKey);
+    buffer.rewind();
+    return buffer.array();
+  }
+
+  public byte[] serializeEndKey() {
+    byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(getNoDictEndKey());
+    ByteBuffer buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + endKey.length + updatedNoDictionaryEndKey.length);
+    buffer.putInt(endKey.length);
+    buffer.putInt(updatedNoDictionaryEndKey.length);
+    buffer.put(endKey);
+    buffer.put(updatedNoDictionaryEndKey);
+    buffer.rewind();
+    return buffer.array();
+  }
+
+  /**
+   * Below method will be used to update the no dictionary start and end key
+   *
+   * @param key key to be updated
+   * @return return no dictionary key
+   */
+  public byte[] updateNoDictionaryStartAndEndKey(byte[] key) {
+    if (key.length == 0) {
+      return key;
+    }
+    // add key to byte buffer remove the length part of the data
+    ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2);
+    // create a output buffer without length
+    ByteBuffer output = ByteBuffer.allocate(key.length - 2);
+    short numberOfByteToStorLength = 2;
+    // as length part is removed, so each no dictionary value index
+    // needs to be reshuffled by 2 bytes
+    int NumberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
+    for (int i = 0; i < NumberOfNoDictSortColumns; i++) {
+      output.putShort((short) (buffer.getShort() - numberOfByteToStorLength));
+    }
+    // copy the data part
+    while (buffer.hasRemaining()) {
+      output.put(buffer.get());
+    }
+    output.rewind();
+    return output.array();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
new file mode 100644
index 0000000..5439a29
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+public interface ColumnPageStatsCollector {
+  void updateNull(int rowId);
+  void update(byte value);
+  void update(short value);
+  void update(int value);
+  void update(long value);
+  void update(double value);
+  void update(byte[] value);
+
+  /**
+   * return the collected statistics
+   */
+  Object getPageStats();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
deleted file mode 100644
index 7ce0be2..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/** statics for one column page */
-public class ColumnPageStatsVO {
-  private DataType dataType;
-
-  /** min and max value of the measures */
-  private Object min, max;
-
-  private int scale;
-
-  private int precision;
-
-  public ColumnPageStatsVO(DataType dataType) {
-    this.dataType = dataType;
-    switch (dataType) {
-      case SHORT:
-      case INT:
-      case LONG:
-        max = Long.MIN_VALUE;
-        min = Long.MAX_VALUE;
-        break;
-      case DOUBLE:
-        max = Double.MIN_VALUE;
-        min = Double.MAX_VALUE;
-        break;
-      case DECIMAL:
-        max = new BigDecimal(Double.MIN_VALUE);
-        min = new BigDecimal(Double.MAX_VALUE);
-        break;
-    }
-  }
-
-  public static ColumnPageStatsVO copyFrom(ValueEncoderMeta meta, int scale, int precision) {
-    ColumnPageStatsVO instance = new ColumnPageStatsVO(meta.getType());
-    instance.min = meta.getMinValue();
-    instance.max = meta.getMaxValue();
-    instance.scale = scale;
-    instance.precision = precision;
-    return instance;
-  }
-
-  /**
-   * update the statistics for the input row
-   */
-  public void update(Object value) {
-    switch (dataType) {
-      case SHORT:
-        max = ((long) max > ((Short) value).longValue()) ? max : ((Short) value).longValue();
-        min = ((long) min < ((Short) value).longValue()) ? min : ((Short) value).longValue();
-        break;
-      case INT:
-        max = ((long) max > ((Integer) value).longValue()) ? max : ((Integer) value).longValue();
-        min = ((long) min  < ((Integer) value).longValue()) ? min : ((Integer) value).longValue();
-        break;
-      case LONG:
-        max = ((long) max > (long) value) ? max : value;
-        min = ((long) min < (long) value) ? min : value;
-        break;
-      case DOUBLE:
-        max = ((double) max > (double) value) ? max : value;
-        min = ((double) min < (double) value) ? min : value;
-        break;
-      case DECIMAL:
-        break;
-      case ARRAY:
-      case STRUCT:
-        // for complex type column, writer is not going to use stats, so, do nothing
-    }
-  }
-
-  public void updateNull() {
-    switch (dataType) {
-      case SHORT:
-        max = ((long) max > 0) ? max : 0L;
-        min = ((long) min < 0) ? min : 0L;
-        break;
-      case INT:
-        max = ((long) max > 0) ? max : 0L;
-        min = ((long) min  < 0) ? min : 0L;
-        break;
-      case LONG:
-        max = ((long) max > 0) ? max : 0L;
-        min = ((long) min < 0) ? min : 0L;
-        break;
-      case DOUBLE:
-        max = ((double) max > 0d) ? max : 0d;
-        min = ((double) min < 0d) ? min : 0d;
-        break;
-      case DECIMAL:
-        break;
-      case ARRAY:
-      case STRUCT:
-        // for complex type column, writer is not going to use stats, so, do nothing
-    }
-  }
-
-  /**
-   * return min value as byte array
-   */
-  public byte[] minBytes() {
-    return getValueAsBytes(getMin());
-  }
-
-  /**
-   * return max value as byte array
-   */
-  public byte[] maxBytes() {
-    return getValueAsBytes(getMax());
-  }
-
-  /**
-   * convert value to byte array
-   */
-  private byte[] getValueAsBytes(Object value) {
-    ByteBuffer b;
-    switch (dataType) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        b = ByteBuffer.allocate(8);
-        b.putLong((Long) value);
-        b.flip();
-        return b.array();
-      case DOUBLE:
-        b = ByteBuffer.allocate(8);
-        b.putDouble((Double) value);
-        b.flip();
-        return b.array();
-      case DECIMAL:
-        return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
-      default:
-        throw new IllegalArgumentException("Invalid data type: " + dataType);
-    }
-  }
-
-  public Object getMin() {
-    return min;
-  }
-
-  public Object getMax() {
-    return max;
-  }
-
-  public DataType getDataType() {
-    return dataType;
-  }
-
-  public int getScale() {
-    return scale;
-  }
-
-  public int getPrecision() {
-    return precision;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("min: %s, max: %s", min, max);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
deleted file mode 100644
index 600867a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-public class MeasurePageStatsVO {
-  // statistics of each measure column
-  private Object[] min, max;
-
-  private DataType[] dataType;
-  private byte[] selectedDataType;
-
-  private MeasurePageStatsVO() {
-  }
-
-  public MeasurePageStatsVO(ColumnPage[] measurePages) {
-    min = new Object[measurePages.length];
-    max = new Object[measurePages.length];
-    dataType = new DataType[measurePages.length];
-    selectedDataType = new byte[measurePages.length];
-    for (int i = 0; i < measurePages.length; i++) {
-      ColumnPageStatsVO stats = measurePages[i].getStatistics();
-      min[i] = stats.getMin();
-      max[i] = stats.getMax();
-      dataType[i] = measurePages[i].getDataType();
-    }
-  }
-
-  public static MeasurePageStatsVO build(ValueEncoderMeta[] encoderMetas) {
-    Object[] max = new Object[encoderMetas.length];
-    Object[] min = new Object[encoderMetas.length];
-    DataType[] dataType = new DataType[encoderMetas.length];
-    byte[] selectedDataType = new byte[encoderMetas.length];
-    for (int i = 0; i < encoderMetas.length; i++) {
-      max[i] = encoderMetas[i].getMaxValue();
-      min[i] = encoderMetas[i].getMinValue();
-      dataType[i] = encoderMetas[i].getType();
-      selectedDataType[i] = encoderMetas[i].getDataTypeSelected();
-    }
-
-    MeasurePageStatsVO stats = new MeasurePageStatsVO();
-    stats.dataType = dataType;
-    stats.selectedDataType = selectedDataType;
-    stats.min = min;
-    stats.max = max;
-    return stats;
-  }
-
-  public DataType getDataType(int measureIndex) {
-    return dataType[measureIndex];
-  }
-
-  public Object getMin(int measureIndex) {
-    return min[measureIndex];
-  }
-
-  public Object getMax(int measureIndex) {
-    return max[measureIndex];
-  }
-
-  public byte getDataTypeSelected(int measureIndex) {
-    return selectedDataType[measureIndex];
-  }
-}


Mime
View raw message