carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [4/7] carbondata git commit: [CARBONDATA-1371] Support creating decoder based on encoding metadata
Date Wed, 23 Aug 2017 07:24:06 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
new file mode 100644
index 0000000..d722c38
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.format.Encoding;
+
+public class HighCardDictDimensionIndexCodec  extends IndexStorageCodec {
+
+  public HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
+      Compressor compressor) {
+    super(isSort, isInvertedIndex, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "HighCardDictDimensionIndexCodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    return new IndexStorageEncoder() {
+
+      @Override
+      protected void encodeIndexStorage(ColumnPage input) {
+        IndexStorage indexStorage;
+        byte[][] data = input.getByteArrayPage();
+        if (isInvertedIndex) {
+          indexStorage = new BlockIndexerStorageForShort(data, false, true, isSort);
+        } else {
+          indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+        }
+        byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+        super.compressedDataPage = compressor.compressByte(flattened);
+        super.indexStorage = indexStorage;
+      }
+
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<>();
+        if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
+        return encodings;
+      }
+
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
new file mode 100644
index 0000000..d228f6e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+public abstract class IndexStorageCodec implements ColumnPageCodec {
+  protected ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+  protected Compressor compressor;
+  protected boolean isSort;
+  protected boolean isInvertedIndex;
+
+  IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+    this.isSort = isSort;
+    this.isInvertedIndex = isInvertedIndex;
+    this.compressor = compressor;
+  }
+
+  @Override
+  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
+    throw new UnsupportedOperationException("internal error");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
new file mode 100644
index 0000000..edc3b64
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageEncoder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.SortState;
+
+public abstract class IndexStorageEncoder extends ColumnPageEncoder {
+  IndexStorage indexStorage;
+  byte[] compressedDataPage;
+
+  abstract void encodeIndexStorage(ColumnPage inputPage);
+
+  @Override
+  protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+    assert (indexStorage == null);
+    assert (compressedDataPage == null);
+    encodeIndexStorage(input);
+    assert (indexStorage != null);
+    assert (compressedDataPage != null);
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(stream);
+    out.write(compressedDataPage);
+    if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+      out.writeInt(indexStorage.getRowIdPageLengthInBytes());
+      short[] rowIdPage = (short[])indexStorage.getRowIdPage();
+      for (short rowId : rowIdPage) {
+        out.writeShort(rowId);
+      }
+      if (indexStorage.getRowIdRlePageLengthInBytes() > 0) {
+        short[] rowIdRlePage = (short[])indexStorage.getRowIdRlePage();
+        for (short rowIdRle : rowIdRlePage) {
+          out.writeShort(rowIdRle);
+        }
+      }
+    }
+    if (indexStorage.getDataRlePageLengthInBytes() > 0) {
+      short[] dataRlePage = (short[])indexStorage.getDataRlePage();
+      for (short dataRle : dataRlePage) {
+        out.writeShort(dataRle);
+      }
+    }
+    return stream.toByteArray();
+  }
+
+  @Override
+  protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+    return null;
+  }
+
+  @Override
+  protected void fillLegacyFields(DataChunk2 dataChunk)
+      throws IOException {
+    SortState sort = (indexStorage.getRowIdPageLengthInBytes() > 0) ?
+        SortState.SORT_EXPLICIT : SortState.SORT_NATIVE;
+    dataChunk.setSort_state(sort);
+    if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+      int rowIdPageLength = CarbonCommonConstants.INT_SIZE_IN_BYTE +
+          indexStorage.getRowIdPageLengthInBytes() +
+          indexStorage.getRowIdRlePageLengthInBytes();
+      dataChunk.setRowid_page_length(rowIdPageLength);
+    }
+    if (indexStorage.getDataRlePageLengthInBytes() > 0) {
+      dataChunk.setRle_page_length(indexStorage.getDataRlePageLengthInBytes());
+    }
+    dataChunk.setData_page_length(compressedDataPage.length);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
new file mode 100644
index 0000000..a2837b8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.rle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * RLE encoding implementation for integral column page.
+ * This encoding keeps track of repeated-run and non-repeated-run, and make use
+ * of the highest bit of the length field to indicate the type of run.
+ * The length field is encoded as 16 bits value. (Page size must be less than 65535 rows)
+ *
+ * For example: input data {5, 5, 1, 2, 3, 3, 3, 3, 3} will be encoded to
+ * {0x00, 0x02, 0x05,             (repeated-run, 2 values of 5)
+ *  0x80, 0x03, 0x01, 0x02, 0x03, (non-repeated-run, 3 values: 1, 2, 3)
+ *  0x00, 0x04, 0x03}             (repeated-run, 4 values of 3)
+ */
+public class RLECodec implements ColumnPageCodec {
+
+  enum RUN_STATE { INIT, START, REPEATED_RUN, NONREPEATED_RUN }
+
+  @Override
+  public String getName() {
+    return "RLECodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    return new RLEEncoder();
+  }
+
+  @Override
+  public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
+    RLEEncoderMeta codecMeta = (RLEEncoderMeta) meta;
+    return new RLEDecoder(codecMeta.getDataType(), codecMeta.getPageSize());
+  }
+
+  // This codec supports integral type only
+  private void validateDataType(DataType dataType) {
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        break;
+      default:
+        throw new UnsupportedOperationException(dataType + " is not supported for RLE");
+    }
+  }
+
+  private class RLEEncoder extends ColumnPageEncoder {
+    // While encoding RLE, this class internally work as a state machine
+    // INIT state is the initial state before any value comes
+    // START state is the start for each run
+    // REPEATED_RUN state means it is collecting repeated values (`lastValue`)
+    // NONREPEATED_RUN state means it is collecting non-repeated values (`nonRepeatValues`)
+    private RUN_STATE runState;
+
+    // count for each run, either REPEATED_RUN or NONREPEATED_RUN
+    private short valueCount;
+
+    // collected value for REPEATED_RUN
+    private Object lastValue;
+
+    // collected value for NONREPEATED_RUN
+    private List<Object> nonRepeatValues;
+
+    // data type of input page
+    private DataType dataType;
+
+    // output stream for encoded data
+    private ByteArrayOutputStream bao;
+    private DataOutputStream stream;
+
+    private RLEEncoder() {
+      this.runState = RUN_STATE.INIT;
+      this.valueCount = 0;
+      this.nonRepeatValues = new ArrayList<>();
+      this.bao = new ByteArrayOutputStream();
+      this.stream = new DataOutputStream(bao);
+    }
+
+    @Override
+    protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+      validateDataType(input.getDataType());
+      this.dataType = input.getDataType();
+      switch (dataType) {
+        case BYTE:
+          byte[] bytePage = input.getBytePage();
+          for (int i = 0; i < bytePage.length; i++) {
+            putValue(bytePage[i]);
+          }
+          break;
+        case SHORT:
+          short[] shortPage = input.getShortPage();
+          for (int i = 0; i < shortPage.length; i++) {
+            putValue(shortPage[i]);
+          }
+          break;
+        case INT:
+          int[] intPage = input.getIntPage();
+          for (int i = 0; i < intPage.length; i++) {
+            putValue(intPage[i]);
+          }
+          break;
+        case LONG:
+          long[] longPage = input.getLongPage();
+          for (int i = 0; i < longPage.length; i++) {
+            putValue(longPage[i]);
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException(input.getDataType() +
+              " does not support RLE encoding");
+      }
+      return collectResult();
+    }
+
+    @Override
+    protected List<Encoding> getEncodingList() {
+      List<Encoding> encodings = new ArrayList<>();
+      encodings.add(Encoding.RLE_INTEGRAL);
+      return encodings;
+    }
+
+    @Override
+    protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+      return new RLEEncoderMeta(
+          inputPage.getDataType(), inputPage.getPageSize(), inputPage.getStatistics());
+    }
+
+    private void putValue(Object value) throws IOException {
+      if (runState == RUN_STATE.INIT) {
+        startNewRun(value);
+      } else {
+        if (lastValue.equals(value)) {
+          putRepeatValue(value);
+        } else {
+          putNonRepeatValue(value);
+        }
+      }
+    }
+
+    // when last row is reached, write out all collected data
+    private byte[] collectResult() throws IOException {
+      switch (runState) {
+        case REPEATED_RUN:
+          writeRunLength(valueCount);
+          writeRunValue(lastValue);
+          break;
+        case NONREPEATED_RUN:
+          writeRunLength(valueCount | 0x8000);
+          for (int i = 0; i < valueCount; i++) {
+            writeRunValue(nonRepeatValues.get(i));
+          }
+          break;
+        default:
+          assert (runState == RUN_STATE.START);
+          writeRunLength(1);
+          writeRunValue(lastValue);
+      }
+      return bao.toByteArray();
+    }
+
+    private void writeRunLength(int length) throws IOException {
+      stream.writeShort(length);
+    }
+
+    private void writeRunValue(Object value) throws IOException {
+      switch (dataType) {
+        case BYTE:
+          stream.writeByte((byte) value);
+          break;
+        case SHORT:
+          stream.writeShort((short) value);
+          break;
+        case INT:
+          stream.writeInt((int) value);
+          break;
+        case LONG:
+          stream.writeLong((long) value);
+          break;
+        default:
+          throw new RuntimeException("internal error");
+      }
+    }
+
+    // for each run, call this to initialize the state and clear the collected data
+    private void startNewRun(Object value) {
+      runState = RUN_STATE.START;
+      valueCount = 1;
+      lastValue = value;
+      nonRepeatValues.clear();
+      nonRepeatValues.add(value);
+    }
+
+    // non-repeated run ends, put the collected data to result page
+    private void encodeNonRepeatedRun() throws IOException {
+      // put the value count (highest bit is 1) and all collected values
+      writeRunLength(valueCount | 0x8000);
+      for (int i = 0; i < valueCount; i++) {
+        writeRunValue(nonRepeatValues.get(i));
+      }
+    }
+
+    // repeated run ends, put repeated value to result page
+    private void encodeRepeatedRun() throws IOException {
+      // put the value count (highest bit is 0) and repeated value
+      writeRunLength(valueCount);
+      writeRunValue(lastValue);
+    }
+
+    private void putRepeatValue(Object value) throws IOException {
+      switch (runState) {
+        case REPEATED_RUN:
+          valueCount++;
+          break;
+        case NONREPEATED_RUN:
+          // non-repeated run ends, encode this run
+          encodeNonRepeatedRun();
+          startNewRun(value);
+          break;
+        default:
+          assert (runState == RUN_STATE.START);
+          // enter repeated run
+          runState = RUN_STATE.REPEATED_RUN;
+          valueCount++;
+          break;
+      }
+    }
+
+    private void putNonRepeatValue(Object value) throws IOException {
+      switch (runState) {
+        case NONREPEATED_RUN:
+          // collect the non-repeated value
+          nonRepeatValues.add(value);
+          lastValue = value;
+          valueCount++;
+          break;
+        case REPEATED_RUN:
+          // repeated-run ends, encode this run
+          encodeRepeatedRun();
+          startNewRun(value);
+          break;
+        default:
+          assert (runState == RUN_STATE.START);
+          // enter non-repeated run
+          runState = RUN_STATE.NONREPEATED_RUN;
+          nonRepeatValues.add(value);
+          lastValue = value;
+          valueCount++;
+          break;
+      }
+    }
+
+  }
+
+  // It decodes data in one shot. It is suitable for scan query
+  // TODO: add a on-the-fly decoder for filter query with high selectivity
+  private class RLEDecoder implements ColumnPageDecoder {
+
+    // src data type
+    private DataType dataType;
+    private int pageSize;
+
+    private RLEDecoder(DataType dataType, int pageSize) {
+      validateDataType(dataType);
+      this.dataType = dataType;
+      this.pageSize = pageSize;
+    }
+
+    @Override
+    public ColumnPage decode(byte[] input, int offset, int length)
+        throws MemoryException, IOException {
+      DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
+      ColumnPage resultPage = ColumnPage.newPage(dataType, pageSize);
+      switch (dataType) {
+        case BYTE:
+          decodeBytePage(in, resultPage);
+          break;
+        case SHORT:
+          decodeShortPage(in, resultPage);
+          break;
+        case INT:
+          decodeIntPage(in, resultPage);
+          break;
+        case LONG:
+          decodeLongPage(in, resultPage);
+          break;
+        default:
+          throw new RuntimeException("unsupported datatype:" + dataType);
+      }
+      return resultPage;
+    }
+
+    private void decodeBytePage(DataInputStream in, ColumnPage decodedPage)
+        throws IOException {
+      int rowId = 0;
+      do {
+        int runLength = in.readShort();
+        int count = runLength & 0x7FFF;
+        if (runLength < 0) {
+          // non-repeated run
+          for (int i = 0; i < count; i++) {
+            decodedPage.putByte(rowId++, in.readByte());
+          }
+        } else {
+          // repeated run
+          byte value = in.readByte();
+          for (int i = 0; i < count; i++) {
+            decodedPage.putByte(rowId++, value);
+          }
+        }
+      } while (in.available() > 0);
+    }
+
+    private void decodeShortPage(DataInputStream in, ColumnPage decodedPage)
+        throws IOException {
+      int rowId = 0;
+      do {
+        int runLength = in.readShort();
+        int count = runLength & 0x7FFF;
+        if (runLength < 0) {
+          // non-repeated run
+          for (int i = 0; i < count; i++) {
+            decodedPage.putShort(rowId++, in.readShort());
+          }
+        } else {
+          // repeated run
+          short value = in.readShort();
+          for (int i = 0; i < count; i++) {
+            decodedPage.putShort(rowId++, value);
+          }
+        }
+      } while (in.available() > 0);
+    }
+
+    private void decodeIntPage(DataInputStream in, ColumnPage decodedPage)
+        throws IOException {
+      int rowId = 0;
+      do {
+        int runLength = in.readShort();
+        int count = runLength & 0x7FFF;
+        if (runLength < 0) {
+          // non-repeated run
+          for (int i = 0; i < count; i++) {
+            decodedPage.putInt(rowId++, in.readInt());
+          }
+        } else {
+          // repeated run
+          int value = in.readInt();
+          for (int i = 0; i < count; i++) {
+            decodedPage.putInt(rowId++, value);
+          }
+        }
+      } while (in.available() > 0);
+    }
+
+    private void decodeLongPage(DataInputStream in, ColumnPage decodedPage)
+        throws IOException {
+      int rowId = 0;
+      do {
+        int runLength = in.readShort();
+        int count = runLength & 0x7FFF;
+        if (runLength < 0) {
+          // non-repeated run
+          for (int i = 0; i < count; i++) {
+            decodedPage.putLong(rowId++, in.readLong());
+          }
+        } else {
+          // repeated run
+          long value = in.readLong();
+          for (int i = 0; i < count; i++) {
+            decodedPage.putLong(rowId++, value);
+          }
+        }
+      } while (in.available() > 0);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
new file mode 100644
index 0000000..5d68872
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.rle;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+/**
+ * Metadata class for RLECodec
+ */
+public class RLEEncoderMeta extends ColumnPageEncoderMeta implements Writable {
+
+  private int pageSize;
+
+  public RLEEncoderMeta() {
+
+  }
+
+  public RLEEncoderMeta(DataType dataType, int pageSize, SimpleStatsResult stats) {
+    super(dataType, stats);
+    this.pageSize = pageSize;
+  }
+
+  public int getPageSize() {
+    return pageSize;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(pageSize);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    pageSize = in.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
index 2440e33..a749587 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
@@ -32,5 +32,5 @@ public interface ColumnPageStatsCollector {
   /**
    * return the collected statistics
    */
-  Object getPageStats();
+  SimpleStatsResult getPageStats();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
new file mode 100644
index 0000000..a13351b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class KeyPageStatsCollector implements ColumnPageStatsCollector {
+
+  private DataType dataType;
+
+  private byte[] min, max;
+
+  public static KeyPageStatsCollector newInstance(DataType dataType) {
+    return new KeyPageStatsCollector(dataType);
+  }
+
+  private KeyPageStatsCollector(DataType dataType) {
+    this.dataType = dataType;
+  }
+
+  @Override
+  public void updateNull(int rowId) {
+
+  }
+
+  @Override
+  public void update(byte value) {
+
+  }
+
+  @Override
+  public void update(short value) {
+
+  }
+
+  @Override
+  public void update(int value) {
+
+  }
+
+  @Override
+  public void update(long value) {
+
+  }
+
+  @Override
+  public void update(double value) {
+
+  }
+
+  @Override
+  public void update(BigDecimal value) {
+
+  }
+
+  @Override
+  public void update(byte[] value) {
+    if (min == null && max == null) {
+      min = value;
+      max = value;
+    } else {
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(min, value) > 0) {
+        min = value;
+      }
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(max, value) < 0) {
+        max = value;
+      }
+    }
+  }
+
+  @Override
+  public SimpleStatsResult getPageStats() {
+    return new SimpleStatsResult() {
+
+      @Override public Object getMin() {
+        return min;
+      }
+
+      @Override public Object getMax() {
+        return max;
+      }
+
+      @Override public int getDecimalPoint() {
+        return 0;
+      }
+
+      @Override public DataType getDataType() {
+        return dataType;
+      }
+
+      @Override public int getScale() {
+        return 0;
+      }
+
+      @Override public int getPrecision() {
+        return 0;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
new file mode 100644
index 0000000..62b18c6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class LVStringStatsCollector implements ColumnPageStatsCollector {
+
+  private byte[] min, max;
+
+  public static LVStringStatsCollector newInstance() {
+    return new LVStringStatsCollector();
+  }
+
+  private LVStringStatsCollector() {
+
+  }
+
+  @Override
+  public void updateNull(int rowId) {
+
+  }
+
+  @Override
+  public void update(byte value) {
+
+  }
+
+  @Override
+  public void update(short value) {
+
+  }
+
+  @Override
+  public void update(int value) {
+
+  }
+
+  @Override
+  public void update(long value) {
+
+  }
+
+  @Override
+  public void update(double value) {
+
+  }
+
+  @Override
+  public void update(BigDecimal value) {
+
+  }
+
+  @Override
+  public void update(byte[] value) {
+    // input value is LV encoded
+    assert (value.length >= 2);
+    if (value.length == 2) {
+      assert (value[0] == 0 && value[1] == 0);
+      if (min == null && max == null) {
+        min = new byte[0];
+        max = new byte[0];
+      }
+      return;
+    }
+    int length = (value[0] << 8) + value[1];
+    assert (length > 0);
+    byte[] v = new byte[value.length - 2];
+    System.arraycopy(value, 2, v, 0, v.length);
+    if (min == null && max == null) {
+      min = v;
+      max = v;
+    } else {
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(min, v) > 0) {
+        min = v;
+      }
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(max, v) < 0) {
+        max = v;
+      }
+    }
+  }
+
+  @Override
+  public SimpleStatsResult getPageStats() {
+    return new SimpleStatsResult() {
+
+      @Override public Object getMin() {
+        return min;
+      }
+
+      @Override public Object getMax() {
+        return max;
+      }
+
+      @Override public int getDecimalPoint() {
+        return 0;
+      }
+
+      @Override public DataType getDataType() {
+        return DataType.STRING;
+      }
+
+      @Override public int getScale() {
+        return 0;
+      }
+
+      @Override public int getPrecision() {
+        return 0;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index 927ab5f..6c25d4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -18,9 +18,8 @@
 package org.apache.carbondata.core.datastore.page.statistics;
 
 import java.math.BigDecimal;
-import java.util.BitSet;
 
-import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
@@ -38,28 +37,24 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   // scale of the double value
   private int decimal;
 
-  // The index of the rowId whose value is null, will be set to 1
-  private BitSet nullBitSet;
-
   private boolean isFirst = true;
   private BigDecimal zeroDecimal;
 
   // this is for encode flow
-  public static PrimitivePageStatsCollector newInstance(DataType dataType, int pageSize, int
-      scale, int precision) {
+  public static PrimitivePageStatsCollector newInstance(DataType dataType,
+      int scale, int precision) {
     switch (dataType) {
       default:
-        return new PrimitivePageStatsCollector(dataType, pageSize, scale, precision);
+        return new PrimitivePageStatsCollector(dataType, scale, precision);
     }
   }
 
-  // this is for decode flow, we do not need to create nullBits, so passing 0 as pageSize
-  public static PrimitivePageStatsCollector newInstance(ColumnPageCodecMeta meta) {
+  // this is for decode flow, create stats from encoder meta in carbondata file
+  public static PrimitivePageStatsCollector newInstance(ColumnPageEncoderMeta meta) {
     PrimitivePageStatsCollector instance =
-        new PrimitivePageStatsCollector(meta.getSrcDataType(), 0, meta.getScale(),
-            meta.getPrecision());
+        new PrimitivePageStatsCollector(meta.getDataType(), meta.getScale(), meta.getPrecision());
     // set min max from meta
-    switch (meta.getSrcDataType()) {
+    switch (meta.getDataType()) {
       case BYTE:
         instance.minByte = (byte) meta.getMinValue();
         instance.maxByte = (byte) meta.getMaxValue();
@@ -90,14 +85,14 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
         break;
       default:
         throw new UnsupportedOperationException(
-            "unsupported data type for stats collection: " + meta.getSrcDataType());
+            "unsupported data type for stats collection: " + meta.getDataType());
     }
     return instance;
   }
 
   public static PrimitivePageStatsCollector newInstance(ValueEncoderMeta meta) {
     PrimitivePageStatsCollector instance =
-        new PrimitivePageStatsCollector(meta.getType(), 0, -1, -1);
+        new PrimitivePageStatsCollector(meta.getType(), -1, -1);
     // set min max from meta
     switch (meta.getType()) {
       case BYTE:
@@ -135,9 +130,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     return instance;
   }
 
-  private PrimitivePageStatsCollector(DataType dataType, int pageSize, int scale, int precision) {
+  private PrimitivePageStatsCollector(DataType dataType, int scale, int precision) {
     this.dataType = dataType;
-    this.nullBitSet = new BitSet(pageSize);
     switch (dataType) {
       case BYTE:
         minByte = Byte.MAX_VALUE;
@@ -174,7 +168,6 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   @Override
   public void updateNull(int rowId) {
-    nullBitSet.set(rowId);
     long value = 0;
     switch (dataType) {
       case BYTE:
@@ -256,12 +249,6 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     if (maxDouble < value) {
       maxDouble = value;
     }
-    int scale = BigDecimal.valueOf(value).scale();
-    if (scale < 0) {
-      decimal = scale;
-    } else {
-      decimal = Math.max(decimal, scale);
-    }
   }
 
   @Override
@@ -281,7 +268,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   }
 
   @Override
-  public Object getPageStats() {
+  public SimpleStatsResult getPageStats() {
     return this;
   }
 
@@ -341,11 +328,6 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   }
 
   @Override
-  public BitSet getNullBits() {
-    return nullBitSet;
-  }
-
-  @Override
   public int getDecimalPoint() {
     return decimal;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
index b40d023..60516fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.core.datastore.page.statistics;
 
-import java.util.BitSet;
-
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public interface SimpleStatsResult {
@@ -27,8 +25,6 @@ public interface SimpleStatsResult {
 
   Object getMax();
 
-  BitSet getNullBits();
-
   int getDecimalPoint();
 
   DataType getDataType();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
index 07de9c0..46ad09c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
@@ -17,22 +17,12 @@
 
 package org.apache.carbondata.core.datastore.page.statistics;
 
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
-import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 // Statistics of dimension and measure column in a TablePage
 public class TablePageStatistics {
 
-  // number of dimension after complex column expanded
-  private int numDimensionsExpanded;
-
   // min of each dimension column
   private byte[][] dimensionMinValue;
 
@@ -45,69 +35,34 @@ public class TablePageStatistics {
   // max os each measure column
   private byte[][] measureMaxValue;
 
-  // null bit set for each measure column
-  private BitSet[] nullBitSet;
-
-  public TablePageStatistics(EncodedDimensionPage[] dimensions,
-      EncodedMeasurePage[] measures) {
-    this.numDimensionsExpanded = dimensions.length;
+  public TablePageStatistics(EncodedColumnPage[] dimensions,
+      EncodedColumnPage[] measures) {
+    int numDimensionsExpanded = dimensions.length;
     int numMeasures = measures.length;
     this.dimensionMinValue = new byte[numDimensionsExpanded][];
     this.dimensionMaxValue = new byte[numDimensionsExpanded][];
     this.measureMinValue = new byte[numMeasures][];
     this.measureMaxValue = new byte[numMeasures][];
-    this.nullBitSet = new BitSet[numMeasures];
     updateDimensionMinMax(dimensions);
     updateMeasureMinMax(measures);
   }
 
-  private void updateDimensionMinMax(EncodedDimensionPage[] dimensions) {
+  private void updateDimensionMinMax(EncodedColumnPage[] dimensions) {
     for (int i = 0; i < dimensions.length; i++) {
-      IndexStorage keyStorageArray = dimensions[i].getIndexStorage();
-      switch (dimensions[i].getDimensionType()) {
-        case GLOBAL_DICTIONARY:
-        case DIRECT_DICTIONARY:
-        case COLUMN_GROUP:
-        case COMPLEX:
-          dimensionMinValue[i] = keyStorageArray.getMin();
-          dimensionMaxValue[i] = keyStorageArray.getMax();
-          break;
-        case PLAIN_VALUE:
-          dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray.getMin());
-          dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray.getMax());
-          break;
-      }
+      SimpleStatsResult stats = dimensions[i].getStats();
+      dimensionMaxValue[i] = CarbonUtil.getValueAsBytes(stats.getDataType(), stats.getMax());
+      dimensionMinValue[i] = CarbonUtil.getValueAsBytes(stats.getDataType(), stats.getMin());
     }
   }
 
-  private void updateMeasureMinMax(EncodedMeasurePage[] measures) {
+  private void updateMeasureMinMax(EncodedColumnPage[] measures) {
     for (int i = 0; i < measures.length; i++) {
-      ValueEncoderMeta meta = measures[i].getMetaData();
-      if (meta instanceof ColumnPageCodecMeta) {
-        ColumnPageCodecMeta metadata = (ColumnPageCodecMeta) meta;
-        measureMaxValue[i] = metadata.getMaxAsBytes();
-        measureMinValue[i] = metadata.getMinAsBytes();
-      } else {
-        measureMaxValue[i] = CarbonUtil.getMaxValueAsBytes(meta);
-        measureMinValue[i] = CarbonUtil.getMinValueAsBytes(meta);
-      }
-      nullBitSet[i] = measures[i].getNullBitSet();
+      SimpleStatsResult stats = measures[i].getStats();
+      measureMaxValue[i] = CarbonUtil.getValueAsBytes(stats.getDataType(), stats.getMax());
+      measureMinValue[i] = CarbonUtil.getValueAsBytes(stats.getDataType(), stats.getMin());
     }
   }
 
-  /**
-   * Below method will be used to update the min or max value
-   * by removing the length from it
-   *
-   * @return min max value without length
-   */
-  public static byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
-    ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
-    byte[] actualValue = new byte[buffer.getShort()];
-    buffer.get(actualValue);
-    return actualValue;
-  }
-
   public byte[][] getDimensionMinValue() {
     return dimensionMinValue;
   }
@@ -124,7 +79,4 @@ public class TablePageStatistics {
     return measureMaxValue;
   }
 
-  public BitSet[] getNullBitSet() {
-    return nullBitSet;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
deleted file mode 100644
index dffd9ea..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import java.math.BigDecimal;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class VarLengthPageStatsCollector implements ColumnPageStatsCollector {
-
-  private byte[] min, max;
-
-  public static VarLengthPageStatsCollector newInstance() {
-    return new VarLengthPageStatsCollector();
-  }
-
-  private VarLengthPageStatsCollector() {
-  }
-
-  @Override
-  public void updateNull(int rowId) {
-
-  }
-
-  @Override
-  public void update(byte value) {
-
-  }
-
-  @Override
-  public void update(short value) {
-
-  }
-
-  @Override
-  public void update(int value) {
-
-  }
-
-  @Override
-  public void update(long value) {
-
-  }
-
-  @Override
-  public void update(double value) {
-
-  }
-
-  @Override
-  public void update(BigDecimal value) {
-
-  }
-
-  @Override
-  public void update(byte[] value) {
-    if (min == null && max == null) {
-      min = value;
-      max = value;
-    } else {
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(min, value) > 0) {
-        min = value;
-      }
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(max, value) < 0) {
-        max = value;
-      }
-    }
-  }
-
-  @Override
-  public Object getPageStats() {
-    // for binary type, we do not collect its stats
-    return new SimpleStatsResult() {
-
-      @Override public Object getMin() {
-        return min;
-      }
-
-      @Override public Object getMax() {
-        return max;
-      }
-
-      @Override public BitSet getNullBits() {
-        return null;
-      }
-
-      @Override public int getDecimalPoint() {
-        return 0;
-      }
-
-      @Override public DataType getDataType() {
-        return null;
-      }
-
-      @Override public int getScale() {
-        return 0;
-      }
-
-      @Override public int getPrecision() {
-        return 0;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java b/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
deleted file mode 100644
index da8a33d..0000000
--- a/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.metadata;
-
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
-
-// It is used for V1 and V2 format only
-public class BlockletInfoColumnar {
-
-  private EncodedTablePage encodedTablePage;
-
-  /**
-   * measureOffset.
-   */
-  private long[] measureOffset;
-
-  /**
-   * measureLength.
-   */
-  private int[] measureLength;
-
-  /**
-   * numberOfKeys.
-   */
-  private int numberOfKeys;
-
-  /**
-   * startKey.
-   */
-  private byte[] startKey;
-
-  /**
-   * endKey.
-   */
-  private byte[] endKey;
-
-  /**
-   * keyOffSets
-   */
-  private long[] keyOffSets;
-
-  /**
-   * keyLengths
-   */
-  private int[] keyLengths;
-
-  /**
-   * isSortedKeyColumn
-   */
-  private boolean[] isSortedKeyColumn;
-
-  /**
-   * keyBlockIndexOffSets
-   */
-  private long[] keyBlockIndexOffSets;
-
-  /**
-   * keyBlockIndexLength
-   */
-  private int[] keyBlockIndexLength;
-
-  /**
-   * dataIndexMap
-   */
-  private int[] dataIndexMapLength;
-
-  /**
-   * dataIndexMap
-   */
-  private long[] dataIndexMapOffsets;
-
-  private boolean[] aggKeyBlock;
-
-  /**
-   * column min array
-   */
-  private byte[][] columnMaxData;
-
-  /**
-   * column max array
-   */
-  private byte[][] columnMinData;
-
-  /**
-   * bit set which will holds the measure
-   * indexes which are null
-   */
-  private BitSet[] measureNullValueIndex;
-
-  /**
-   * getMeasureLength
-   *
-   * @return int[].
-   */
-  public int[] getMeasureLength() {
-    return measureLength;
-  }
-
-  /**
-   * setMeasureLength.
-   *
-   * @param measureLength
-   */
-  public void setMeasureLength(int[] measureLength) {
-    this.measureLength = measureLength;
-  }
-
-  /**
-   * getMeasureOffset.
-   *
-   * @return long[].
-   */
-  public long[] getMeasureOffset() {
-    return measureOffset;
-  }
-
-  /**
-   * setMeasureOffset.
-   *
-   * @param measureOffset
-   */
-  public void setMeasureOffset(long[] measureOffset) {
-    this.measureOffset = measureOffset;
-  }
-
-  /**
-   * getStartKey().
-   *
-   * @return byte[].
-   */
-  public byte[] getStartKey() {
-    return startKey;
-  }
-
-  /**
-   * setStartKey.
-   *
-   * @param startKey
-   */
-  public void setStartKey(byte[] startKey) {
-    this.startKey = startKey;
-  }
-
-  /**
-   * getEndKey().
-   *
-   * @return byte[].
-   */
-  public byte[] getEndKey() {
-    return endKey;
-  }
-
-  /**
-   * setEndKey.
-   *
-   * @param endKey
-   */
-  public void setEndKey(byte[] endKey) {
-    this.endKey = endKey;
-  }
-
-  /**
-   * @return the keyOffSets
-   */
-  public long[] getKeyOffSets() {
-    return keyOffSets;
-  }
-
-  /**
-   * @param keyOffSets the keyOffSets to set
-   */
-  public void setKeyOffSets(long[] keyOffSets) {
-    this.keyOffSets = keyOffSets;
-  }
-
-  /**
-   * @return the keyLengths
-   */
-  public int[] getKeyLengths() {
-    return keyLengths;
-  }
-
-  //TODO SIMIAN
-
-  /**
-   * @param keyLengths the keyLengths to set
-   */
-  public void setKeyLengths(int[] keyLengths) {
-    this.keyLengths = keyLengths;
-  }
-
-  /**
-   * getNumberOfKeys()
-   *
-   * @return int.
-   */
-  public int getNumberOfKeys() {
-    return numberOfKeys;
-  }
-
-  /**
-   * setNumberOfKeys.
-   *
-   * @param numberOfKeys
-   */
-  public void setNumberOfKeys(int numberOfKeys) {
-    this.numberOfKeys = numberOfKeys;
-  }
-
-  /**
-   * @return the isSortedKeyColumn
-   */
-  public boolean[] getIsSortedKeyColumn() {
-    return isSortedKeyColumn;
-  }
-
-  /**
-   * @param isSortedKeyColumn the isSortedKeyColumn to set
-   */
-  public void setIsSortedKeyColumn(boolean[] isSortedKeyColumn) {
-    this.isSortedKeyColumn = isSortedKeyColumn;
-  }
-
-  /**
-   * @return the keyBlockIndexOffSets
-   */
-  public long[] getKeyBlockIndexOffSets() {
-    return keyBlockIndexOffSets;
-  }
-
-  /**
-   * @param keyBlockIndexOffSets the keyBlockIndexOffSets to set
-   */
-  public void setKeyBlockIndexOffSets(long[] keyBlockIndexOffSets) {
-    this.keyBlockIndexOffSets = keyBlockIndexOffSets;
-  }
-
-  /**
-   * @return the keyBlockIndexLength
-   */
-  public int[] getKeyBlockIndexLength() {
-    return keyBlockIndexLength;
-  }
-
-  /**
-   * @param keyBlockIndexLength the keyBlockIndexLength to set
-   */
-  public void setKeyBlockIndexLength(int[] keyBlockIndexLength) {
-    this.keyBlockIndexLength = keyBlockIndexLength;
-  }
-
-  /**
-   * @return the dataIndexMapLenght
-   */
-  public int[] getDataIndexMapLength() {
-    return dataIndexMapLength;
-  }
-
-  public void setDataIndexMapLength(int[] dataIndexMapLength) {
-    this.dataIndexMapLength = dataIndexMapLength;
-  }
-
-  /**
-   * @return the dataIndexMapOffsets
-   */
-  public long[] getDataIndexMapOffsets() {
-    return dataIndexMapOffsets;
-  }
-
-  public void setDataIndexMapOffsets(long[] dataIndexMapOffsets) {
-    this.dataIndexMapOffsets = dataIndexMapOffsets;
-  }
-
-  public boolean[] getAggKeyBlock() {
-    return aggKeyBlock;
-  }
-
-  public void setAggKeyBlock(boolean[] aggKeyBlock) {
-    this.aggKeyBlock = aggKeyBlock;
-  }
-
-  public byte[][] getColumnMaxData() {
-    return this.columnMaxData;
-  }
-
-  public void setColumnMaxData(byte[][] columnMaxData) {
-    this.columnMaxData = columnMaxData;
-  }
-
-  public byte[][] getColumnMinData() {
-    return this.columnMinData;
-  }
-
-  public void setColumnMinData(byte[][] columnMinData) {
-    this.columnMinData = columnMinData;
-  }
-
-  /**
-   * @return the measureNullValueIndex
-   */
-  public BitSet[] getMeasureNullValueIndex() {
-    return measureNullValueIndex;
-  }
-
-  /**
-   * @param measureNullValueIndex the measureNullValueIndex to set
-   */
-  public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
-    this.measureNullValueIndex = measureNullValueIndex;
-  }
-
-  public void setEncodedTablePage(EncodedTablePage encodedData) {
-    this.encodedTablePage = encodedData;
-  }
-
-  public EncodedTablePage getEncodedTablePage() {
-    return encodedTablePage;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/metadata/CodecMetaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CodecMetaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/CodecMetaFactory.java
deleted file mode 100644
index ac83333..0000000
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CodecMetaFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.metadata;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-import static org.apache.carbondata.core.metadata.datatype.DataType.*;
-import static org.apache.carbondata.core.metadata.datatype.DataType.LONG;
-
-public class CodecMetaFactory {
-
-  private static final ColumnarFormatVersion version =
-      CarbonProperties.getInstance().getFormatVersion();
-
-  public static ValueEncoderMeta createMeta() {
-    switch (version) {
-      case V1:
-      case V2:
-        return new ValueEncoderMeta();
-      case V3:
-        return ColumnPageCodecMeta.newInstance();
-      default:
-        throw new UnsupportedOperationException("unsupported version: " + version);
-    }
-  }
-
-  public static ValueEncoderMeta createMeta(SimpleStatsResult stats, DataType targetDataType) {
-    switch (version) {
-      case V1:
-      case V2:
-        ValueEncoderMeta meta = new ValueEncoderMeta();
-        switch (stats.getDataType()) {
-          case SHORT:
-            meta.setMaxValue((long)(short) stats.getMax());
-            meta.setMinValue((long)(short) stats.getMin());
-            break;
-          case INT:
-            meta.setMaxValue((long)(int) stats.getMax());
-            meta.setMinValue((long)(int) stats.getMin());
-            break;
-          default:
-            meta.setMaxValue(stats.getMax());
-            meta.setMinValue(stats.getMin());
-            break;
-        }
-        meta.setDecimal(stats.getDecimalPoint());
-        meta.setType(converType(stats.getDataType()));
-        return meta;
-      case V3:
-        return ColumnPageCodecMeta.newInstance(stats, targetDataType);
-      default:
-        throw new UnsupportedOperationException("unsupported version: " + version);
-    }
-  }
-
-  public static char converType(DataType type) {
-    switch (type) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        return CarbonCommonConstants.BIG_INT_MEASURE;
-      case DOUBLE:
-        return CarbonCommonConstants.DOUBLE_MEASURE;
-      case DECIMAL:
-        return CarbonCommonConstants.BIG_DECIMAL_MEASURE;
-      default:
-        throw new RuntimeException("Unexpected type: " + type);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
deleted file mode 100644
index 54bc2ce..0000000
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.metadata;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/**
- * It holds metadata for one column page
- */
-public class ColumnPageCodecMeta extends ValueEncoderMeta implements Serializable {
-
-  private BitSet nullBitSet;
-
-  private DataType srcDataType;
-
-  private DataType targetDataType;
-
-  private int scale;
-
-  private int precision;
-
-  public static final char BYTE_VALUE_MEASURE = 'c';
-  public static final char SHORT_VALUE_MEASURE = 'j';
-  public static final char INT_VALUE_MEASURE = 'k';
-  public static final char BIG_INT_MEASURE = 'd';
-  public static final char DOUBLE_MEASURE = 'n';
-  public static final char BIG_DECIMAL_MEASURE = 'b';
-
-  static ColumnPageCodecMeta newInstance() {
-    return new ColumnPageCodecMeta();
-  }
-
-  static ColumnPageCodecMeta newInstance(
-      SimpleStatsResult stats, DataType targetDataType) {
-    ColumnPageCodecMeta meta = new ColumnPageCodecMeta();
-    meta.srcDataType = stats.getDataType();
-    meta.targetDataType = targetDataType;
-    meta.nullBitSet = stats.getNullBits();
-    meta.setType(CodecMetaFactory.converType(stats.getDataType()));
-    meta.setMaxValue(stats.getMax());
-    meta.setMinValue(stats.getMin());
-    meta.setDecimal(stats.getDecimalPoint());
-    meta.setScale(stats.getScale());
-    meta.setPrecision(stats.getPrecision());
-    return meta;
-  }
-
-  public DataType getTargetDataType() {
-    return targetDataType;
-  }
-
-  public void setSrcDataType(char type) {
-    switch (type) {
-      case BYTE_VALUE_MEASURE:
-        srcDataType = DataType.BYTE;
-        break;
-      case SHORT_VALUE_MEASURE:
-        srcDataType = DataType.SHORT;
-        break;
-      case INT_VALUE_MEASURE:
-        srcDataType = DataType.INT;
-        break;
-      case BIG_INT_MEASURE:
-        srcDataType = DataType.LONG;
-        break;
-      case DOUBLE_MEASURE:
-        srcDataType = DataType.DOUBLE;
-        break;
-      case BIG_DECIMAL_MEASURE:
-        srcDataType = DataType.DECIMAL;
-        break;
-      default:
-        throw new RuntimeException("Unexpected type: " + type);
-    }
-  }
-
-  private char getSrcDataTypeInChar() {
-    switch (srcDataType) {
-      case BYTE:
-        return BYTE_VALUE_MEASURE;
-      case SHORT:
-        return SHORT_VALUE_MEASURE;
-      case INT:
-        return INT_VALUE_MEASURE;
-      case LONG:
-        return BIG_INT_MEASURE;
-      case DOUBLE:
-        return DOUBLE_MEASURE;
-      case DECIMAL:
-        return BIG_DECIMAL_MEASURE;
-      default:
-        throw new RuntimeException("Unexpected type: " + targetDataType);
-    }
-  }
-
-  public BitSet getNullBitSet() {
-    return nullBitSet;
-  }
-
-  public void setNullBitSet(BitSet nullBitSet) {
-    this.nullBitSet = nullBitSet;
-  }
-
-  public DataType getSrcDataType() {
-    return srcDataType;
-  }
-
-  public byte[] serialize() {
-    ByteBuffer buffer = null;
-    switch (srcDataType) {
-      case BYTE:
-        buffer = ByteBuffer.allocate(
-            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-                + 3);
-        buffer.putChar(getSrcDataTypeInChar());
-        buffer.put((byte) getMaxValue());
-        buffer.put((byte) getMinValue());
-        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
-        break;
-      case SHORT:
-        buffer = ByteBuffer.allocate(
-            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-                + 3);
-        buffer.putChar(getSrcDataTypeInChar());
-        buffer.putShort((short) getMaxValue());
-        buffer.putShort((short) getMinValue());
-        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
-        break;
-      case INT:
-        buffer = ByteBuffer.allocate(
-            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-                + 3);
-        buffer.putChar(getSrcDataTypeInChar());
-        buffer.putInt((int) getMaxValue());
-        buffer.putInt((int) getMinValue());
-        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
-        break;
-      case LONG:
-        buffer = ByteBuffer.allocate(
-            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-                + 3);
-        buffer.putChar(getSrcDataTypeInChar());
-        buffer.putLong((Long) getMaxValue());
-        buffer.putLong((Long) getMinValue());
-        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
-        break;
-      case DOUBLE:
-        buffer = ByteBuffer.allocate(
-            (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-                + 3);
-        buffer.putChar(getSrcDataTypeInChar());
-        buffer.putDouble((Double) getMaxValue());
-        buffer.putDouble((Double) getMinValue());
-        buffer.putDouble((Double) 0d); // unique value is obsoleted, maintain for compatibility
-        break;
-      case DECIMAL:
-        byte[] maxAsBytes = getMaxAsBytes();
-        byte[] minAsBytes = getMinAsBytes();
-        byte[] unique = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
-        buffer = ByteBuffer.allocate(maxAsBytes.length + minAsBytes.length + unique.length
-            + 3 * CarbonCommonConstants.SHORT_SIZE_IN_BYTE
-            + CarbonCommonConstants.INT_SIZE_IN_BYTE * 3 + 3);
-        buffer.putChar(getSrcDataTypeInChar());
-        buffer.putShort((short) maxAsBytes.length);
-        buffer.put(maxAsBytes);
-        buffer.putShort((short)minAsBytes.length);
-        buffer.put(minAsBytes);
-        // unique value is obsoleted, maintain for compatibility
-        buffer.putShort((short) unique.length);
-        buffer.put(unique);
-        buffer.putInt(getScale());
-        buffer.putInt(getPrecision());
-        break;
-      default:
-        throw new RuntimeException("Unexpected type: " + srcDataType);
-    }
-    buffer.putInt(getDecimal());
-    buffer.put(getDataTypeSelected());
-    buffer.flip();
-    return buffer.array();
-  }
-
-  public void deserialize(byte[] encodeMeta) {
-    ByteBuffer buffer = ByteBuffer.wrap(encodeMeta);
-    char srcDataType = buffer.getChar();
-    this.setSrcDataType(srcDataType);
-    switch (srcDataType) {
-      case DOUBLE_MEASURE:
-        this.setMaxValue(buffer.getDouble());
-        this.setMinValue(buffer.getDouble());
-        buffer.getDouble(); // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case BIG_DECIMAL_MEASURE:
-        byte[] max = new byte[buffer.getShort()];
-        buffer.get(max);
-        this.setMaxValue(DataTypeUtil.byteToBigDecimal(max));
-        byte[] min = new byte[buffer.getShort()];
-        buffer.get(min);
-        this.setMinValue(DataTypeUtil.byteToBigDecimal(min));
-        buffer.get(new byte[buffer.getShort()]);
-        this.setScale(buffer.getInt());
-        this.setPrecision(buffer.getInt());
-        break;
-      case BYTE_VALUE_MEASURE:
-        this.setMaxValue(buffer.get());
-        this.setMinValue(buffer.get());
-        buffer.getLong();  // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case SHORT_VALUE_MEASURE:
-        this.setMaxValue(buffer.getShort());
-        this.setMinValue(buffer.getShort());
-        buffer.getLong();  // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case INT_VALUE_MEASURE:
-        this.setMaxValue(buffer.getInt());
-        this.setMinValue(buffer.getInt());
-        buffer.getLong();  // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case BIG_INT_MEASURE:
-        this.setMaxValue(buffer.getLong());
-        this.setMinValue(buffer.getLong());
-        buffer.getLong();  // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      default:
-        throw new IllegalArgumentException("invalid measure type");
-    }
-    this.setDecimal(buffer.getInt());
-    buffer.get(); // for selectedDataType, obsoleted
-  }
-
-  public byte[] getMaxAsBytes() {
-    return getValueAsBytes(getMaxValue());
-  }
-
-  public byte[] getMinAsBytes() {
-    return getValueAsBytes(getMinValue());
-  }
-
-  /**
-   * convert value to byte array
-   */
-  private byte[] getValueAsBytes(Object value) {
-    ByteBuffer b;
-    switch (srcDataType) {
-      case BYTE:
-        b = ByteBuffer.allocate(8);
-        b.putLong((byte) value);
-        b.flip();
-        return b.array();
-      case SHORT:
-        b = ByteBuffer.allocate(8);
-        b.putLong((short) value);
-        b.flip();
-        return b.array();
-      case INT:
-        b = ByteBuffer.allocate(8);
-        b.putLong((int) value);
-        b.flip();
-        return b.array();
-      case LONG:
-        b = ByteBuffer.allocate(8);
-        b.putLong((long) value);
-        b.flip();
-        return b.array();
-      case DOUBLE:
-        b = ByteBuffer.allocate(8);
-        b.putDouble((double) value);
-        b.flip();
-        return b.array();
-      case DECIMAL:
-        return DataTypeUtil.bigDecimalToByte((BigDecimal)value);
-      case BYTE_ARRAY:
-        return new byte[8];
-      default:
-        throw new IllegalArgumentException("Invalid data type: " + targetDataType);
-    }
-  }
-
-  public int getScale() {
-    return scale;
-  }
-
-  public void setScale(int scale) {
-    this.scale = scale;
-  }
-
-  public int getPrecision() {
-    return precision;
-  }
-
-  public void setPrecision(int precision) {
-    this.precision = precision;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java
index d77c8a6..a85706b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/DataChunk.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.metadata.blocklet.datachunk;
 
 import java.io.Serializable;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -50,7 +51,7 @@ public class DataChunk implements Serializable {
   /**
    * information about presence of values in each row of this column chunk
    */
-  private transient PresenceMeta nullValueIndexForColumn;
+  private transient BitSet nullValueIndexForColumn;
 
   /**
    * offset of row id page, only if encoded using inverted index
@@ -128,14 +129,14 @@ public class DataChunk implements Serializable {
   /**
    * @return the nullValueIndexForColumn
    */
-  public PresenceMeta getNullValueIndexForColumn() {
+  public BitSet getNullValueIndexForColumn() {
     return nullValueIndexForColumn;
   }
 
   /**
    * @param nullValueIndexForColumn the nullValueIndexForColumn to set
    */
-  public void setNullValueIndexForColumn(PresenceMeta nullValueIndexForColumn) {
+  public void setNullValueIndexForColumn(BitSet nullValueIndexForColumn) {
     this.nullValueIndexForColumn = nullValueIndexForColumn;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/PresenceMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/PresenceMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/PresenceMeta.java
deleted file mode 100644
index 11b8a2e..0000000
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/datachunk/PresenceMeta.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.metadata.blocklet.datachunk;
-
-import java.util.BitSet;
-
-/**
- * information about presence of values in each row of the column chunk
- */
-public class PresenceMeta {
-
-  /**
-   * if true, ones in the bit stream reprents presence. otherwise represents absence
-   */
-  private boolean representNullValues;
-
-  /**
-   * Compressed bit stream representing the presence of null values
-   */
-  private BitSet bitSet;
-
-  /**
-   * @return the representNullValues
-   */
-  public boolean isRepresentNullValues() {
-    return representNullValues;
-  }
-
-  /**
-   * @param representNullValues the representNullValues to set
-   */
-  public void setRepresentNullValues(boolean representNullValues) {
-    this.representNullValues = representNullValues;
-  }
-
-  /**
-   * @return the bitSet
-   */
-  public BitSet getBitSet() {
-    return bitSet;
-  }
-
-  /**
-   * @param bitSet the bitSet to set
-   */
-  public void setBitSet(BitSet bitSet) {
-    this.bitSet = bitSet;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
index e3d7a9a..06d09f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
@@ -26,7 +26,12 @@ public enum Encoding {
   INVERTED_INDEX,
   BIT_PACKED,
   DIRECT_DICTIONARY,
-  IMPLICIT;
+  IMPLICIT,
+
+  DIRECT_COMPRESS,
+  ADAPTIVE_INTEGRAL,
+  ADAPTIVE_DELTA_INTEGRAL,
+  RLE_INTEGRAL;
 
   public static Encoding valueOf(int ordinal) {
     if (ordinal == DICTIONARY.ordinal()) {
@@ -43,6 +48,14 @@ public enum Encoding {
       return DIRECT_DICTIONARY;
     } else if (ordinal == IMPLICIT.ordinal()) {
       return IMPLICIT;
+    } else if (ordinal == DIRECT_COMPRESS.ordinal()) {
+      return DIRECT_COMPRESS;
+    } else if (ordinal == ADAPTIVE_INTEGRAL.ordinal()) {
+      return ADAPTIVE_INTEGRAL;
+    } else if (ordinal == ADAPTIVE_DELTA_INTEGRAL.ordinal()) {
+      return ADAPTIVE_DELTA_INTEGRAL;
+    } else if (ordinal == RLE_INTEGRAL.ordinal()) {
+      return RLE_INTEGRAL;
     } else {
       throw new RuntimeException("create Encoding with invalid ordinal: " + ordinal);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index ad9b773..84995b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -21,7 +21,7 @@ import java.math.RoundingMode;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
@@ -86,19 +86,19 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
     }
   }
 
-  protected Object getMeasureData(MeasureColumnDataChunk dataChunk, int index,
+  protected Object getMeasureData(ColumnPage dataChunk, int index,
       CarbonMeasure carbonMeasure) {
-    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+    if (!dataChunk.getNullBits().get(index)) {
       switch (carbonMeasure.getDataType()) {
         case SHORT:
-          return (short)dataChunk.getColumnPage().getLong(index);
+          return (short)dataChunk.getLong(index);
         case INT:
-          return (int)dataChunk.getColumnPage().getLong(index);
+          return (int)dataChunk.getLong(index);
         case LONG:
-          return dataChunk.getColumnPage().getLong(index);
+          return dataChunk.getLong(index);
         case DECIMAL:
           BigDecimal bigDecimalMsrValue =
-              dataChunk.getColumnPage().getDecimal(index);
+              dataChunk.getDecimal(index);
           if (null != bigDecimalMsrValue && carbonMeasure.getScale() > bigDecimalMsrValue.scale()) {
             bigDecimalMsrValue =
                 bigDecimalMsrValue.setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
@@ -106,7 +106,7 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
           // convert data type as per the computing engine
           return DataTypeUtil.getDataTypeConverter().convertToDecimal(bigDecimalMsrValue);
         default:
-          return dataChunk.getColumnPage().getDouble(index);
+          return dataChunk.getDouble(index);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 20217b7..4471ced 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -21,9 +21,9 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
@@ -103,13 +103,13 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
       }
       MeasureRawColumnChunk measureRawColumnChunk =
           blockChunkHolder.getMeasureRawDataChunk()[blockIndex];
-      MeasureColumnDataChunk[] measureColumnDataChunks =
-          measureRawColumnChunk.convertToMeasureColDataChunks();
+      ColumnPage[] ColumnPages =
+          measureRawColumnChunk.convertToColumnPage();
       BitSetGroup bitSetGroup = new BitSetGroup(measureRawColumnChunk.getPagesCount());
       DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
-      for (int i = 0; i < measureColumnDataChunks.length; i++) {
+      for (int i = 0; i < ColumnPages.length; i++) {
         BitSet bitSet =
-            getFilteredIndexes(measureColumnDataChunks[i], measureRawColumnChunk.getRowCount()[i],
+            getFilteredIndexes(ColumnPages[i], measureRawColumnChunk.getRowCount()[i],
                 msrType);
         bitSetGroup.setBitSet(bitSet, i);
       }
@@ -133,7 +133,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
     }
   }
 
-  protected BitSet getFilteredIndexes(MeasureColumnDataChunk measureColumnDataChunk,
+  protected BitSet getFilteredIndexes(ColumnPage columnPage,
       int numerOfRows, DataType msrType) {
     // Here the algorithm is
     // Get the measure values from the chunk. compare sequentially with the
@@ -143,7 +143,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
     Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
     SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     for (int i = 0; i < filterValues.length; i++) {
-      BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+      BitSet nullBitSet = columnPage.getNullBits();
       if (filterValues[i] == null) {
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
           bitSet.flip(j);
@@ -154,7 +154,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
         if (!nullBitSet.get(startIndex)) {
           // Check if filterValue[i] matches with measure Values.
           Object msrValue = DataTypeUtil
-              .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
+              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
                   msrType, msrColumnEvaluatorInfo.getMeasure());
 
           if (comparator.compare(msrValue, filterValues[i]) == 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 9a5e754..2195c0a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -21,9 +21,9 @@ import java.util.BitSet;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
@@ -124,13 +124,13 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
               measureRawColumnChunk.getMinValues()[i], msrColumnExecutorInfo.getFilterKeys(),
               msrColumnEvaluatorInfo.getType())) {
             BitSet bitSet =
-                getFilteredIndexesForMeasures(measureRawColumnChunk.convertToMeasureColDataChunk(i),
+                getFilteredIndexesForMeasures(measureRawColumnChunk.convertToColumnPage(i),
                     measureRawColumnChunk.getRowCount()[i], msrType);
             bitSetGroup.setBitSet(bitSet, i);
           }
         } else {
           BitSet bitSet =
-              getFilteredIndexesForMeasures(measureRawColumnChunk.convertToMeasureColDataChunk(i),
+              getFilteredIndexesForMeasures(measureRawColumnChunk.convertToColumnPage(i),
                   measureRawColumnChunk.getRowCount()[i], msrType);
           bitSetGroup.setBitSet(bitSet, i);
         }
@@ -155,7 +155,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     }
   }
 
-  private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk measureColumnDataChunk,
+  private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int rowsInPage, DataType msrType) {
     // Here the algorithm is
     // Get the measure values from the chunk. compare sequentially with the
@@ -164,7 +164,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
 
     SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
-    BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+    BitSet nullBitSet = columnPage.getNullBits();
     for (int i = 0; i < filterValues.length; i++) {
       if (filterValues[i] == null) {
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
@@ -176,7 +176,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
         if (!nullBitSet.get(startIndex)) {
           // Check if filterValue[i] matches with measure Values.
           Object msrValue = DataTypeUtil
-              .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
+              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
                   msrType, msrColumnEvaluatorInfo.getMeasure());
 
           if (comparator.compare(msrValue, filterValues[i]) == 0) {


Mime
View raw message