carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [32/57] [abbrv] incubator-carbondata git commit: move org.apache.carbon.common in core
Date Sat, 14 Jan 2017 07:57:41 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinByte.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
new file mode 100644
index 0000000..74d67d1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.nondecimal;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinByte extends ValueCompressionHolder<byte[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNonDecimalMaxMinByte.class.getName());
+
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private byte[] value;
+
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject );
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public void setValue(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinByte");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    byte byteValue = measureChunkStore.getByte(index);
+    BigDecimal diff = BigDecimal.valueOf(byteValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalMaxMinByte");
+  }
+
+  private void setUncompressedValues(byte[] data, int decimalPlaces, Object maxValueObject) {
+    this.measureChunkStore =
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+
+  @Override public byte[] getValue() { return this.value; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
new file mode 100644
index 0000000..e298c37
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinDefault extends ValueCompressionHolder<double[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNonDecimalMaxMinDefault.class.getName());
+
+  /**
+   * doubleCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private double[] value;
+
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
+  @Override public void setValue(double[] value) {
+    this.value = value;
+  }
+
+  @Override public double[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToDoubleArray(buffer, value.length);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor,dataType,compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinDefault");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    double doubleValue = measureChunkStore.getDouble(index);
+    BigDecimal diff = BigDecimal.valueOf(doubleValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalMaxMinDefault");
+  }
+
+  private void setUncompressedValues(double[] data, int decimalPlaces, Object maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinInt.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
new file mode 100644
index 0000000..fc0f815
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinInt extends ValueCompressionHolder<int[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNonDecimalMaxMinInt.class.getName());
+  /**
+   * intCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private int[] value;
+
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
+  private double divisionFactor;
+
+  private BigDecimal maxValue;
+
+  @Override public void setValue(int[] value) {
+    this.value = value;
+  }
+
+  @Override public int[] getValue() {return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToIntArray(buffer, value.length);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinInt");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    int intValue = measureChunkStore.getInt(index);
+    BigDecimal diff = BigDecimal.valueOf(intValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalMaxMinInt");
+  }
+
+  private void setUncompressedValues(int[] data, int decimalPlaces, Object maxValueObject) {
+    this.measureChunkStore =
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinLong.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
new file mode 100644
index 0000000..8524c20
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinLong extends ValueCompressionHolder<long[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNonDecimalMaxMinLong.class.getName());
+
+  /**
+   * longCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private long[] value;
+
+  private MeasureDataChunkStore<long[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
+  @Override public void setValue(long[] value) {
+    this.value = value;
+
+  }
+
+  @Override public long[] getValue() { return this.value; }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buff = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToLongArray(buff, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinLong");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    long longValue = measureChunkStore.getLong(index);
+    BigDecimal diff = BigDecimal.valueOf(longValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Big decimal value is not supported");
+  }
+
+  private void setUncompressedValues(long[] data, int decimalPlaces, Object maxValueObject) {
+    this.measureChunkStore =
+      MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinShort.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
new file mode 100644
index 0000000..dac557c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinShort extends ValueCompressionHolder<short[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNonDecimalMaxMinShort.class.getName());
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private short[] value;
+
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
+  @Override public void setValue(short[] value) {
+    this.value = value;
+  }
+
+  @Override public short[] getValue() { return this.value; }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToShortArray(buffer, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinShort");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    short shortValue = measureChunkStore.getShort(index);
+    BigDecimal diff = BigDecimal.valueOf(shortValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not supported");
+  }
+
+  private void setUncompressedValues(short[] data, int decimalPlaces, Object maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalShort.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalShort.java
new file mode 100644
index 0000000..334d7e8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/nondecimal/CompressionNonDecimalShort.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalShort extends ValueCompressionHolder<short[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNonDecimalShort.class.getName());
+  /**
+   * shortCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private short[] value;
+
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
+  private double divisionFactory;
+
+  @Override public void setValue(short[] value) {
+    this.value = value;
+  }
+
+  @Override public short[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor,dataType,compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToShortArray(buffer, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalShort");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getShort(index) / this.divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalShort");
+  }
+
+  private void setUncompressedValues(short[] data, int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneByte.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneByte.java
new file mode 100644
index 0000000..308ec8b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneByte.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.none;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNoneByte extends ValueCompressionHolder<byte[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNoneByte.class.getName());
+
+  /**
+   * byteCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private byte[] value;
+
+  /**
+   * actual data type
+   */
+  private DataType actualDataType;
+
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  public CompressionNoneByte(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(byte[] value) {
+    this.value = value;
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int length,
+      int mantissa, Object maxValueObject) {
+    super.unCompress(compressor, dataType, data, offset, length);
+    setUncompressedValues(value);
+  }
+
+  @Override public byte[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getByte(index);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getByte(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal is not defined for CompressionNoneByte");
+  }
+
+  private void setUncompressedValues(byte[] data) {
+    this.measureChunkStore =
+      MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, data.length);
+    this.measureChunkStore.putData(data);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneDefault.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneDefault.java
new file mode 100644
index 0000000..65fa3e5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneDefault.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.none;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNoneDefault extends ValueCompressionHolder<double[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNoneDefault.class.getName());
+  /**
+   * doubleCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private double[] value;
+
+  private DataType actualDataType;
+
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+
+  public CompressionNoneDefault(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(double[] value) {this.value = value; }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, data, offset, length);
+    setUncompressedValues(value);
+  }
+
+  @Override public double[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToDoubleArray(buffer, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDefault");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getDouble(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal is not defined for CompressionNoneDefault");
+  }
+
+  private void setUncompressedValues(double[] data) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneInt.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneInt.java
new file mode 100644
index 0000000..29a60cf
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneInt.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.none;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNoneInt extends ValueCompressionHolder<int[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNoneInt.class.getName());
+  /**
+   * intCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private int[] value;
+
+  private DataType actualDataType;
+
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
+  public CompressionNoneInt(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(int[] value) {
+    this.value = value;
+  }
+
+  @Override public int[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, data, offset, length);
+    setUncompressedValues(value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToIntArray(buffer, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getInt(index);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getInt(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal is not defined for CompressionNoneLong");
+  }
+
+  private void setUncompressedValues(int[] data) {
+    this.measureChunkStore =
+      MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, data.length);
+    this.measureChunkStore.putData(data);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneLong.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneLong.java
new file mode 100644
index 0000000..834467d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneLong.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.none;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNoneLong extends ValueCompressionHolder<long[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNoneLong.class.getName());
+  /**
+   * longCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  protected long[] value;
+
+  private DataType actualDataType;
+
+  private MeasureDataChunkStore<long[]> measureChunkStore;
+
+  public CompressionNoneLong(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(long[] value) { this.value = value;  }
+
+  @Override public long[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, data, offset, length);
+    setUncompressedValues(value);
+  }
+
+  @Override public void setValueInBytes(byte[] byteValue) {
+    ByteBuffer buffer = ByteBuffer.wrap(byteValue);
+    this.value = ValueCompressionUtil.convertToLongArray(buffer, byteValue.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getLong(index);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getLong(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal is not supported");
+  }
+
+  private void setUncompressedValues(long[] data) {
+    this.measureChunkStore =
+      MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, data.length);
+    this.measureChunkStore.putData(data);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneShort.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneShort.java
new file mode 100644
index 0000000..f0b1c99
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneShort.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.compression.none;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.compression.Compressor;
+import org.apache.carbondata.core.datastorage.compression.CompressorFactory;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNoneShort extends ValueCompressionHolder<short[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNoneShort.class.getName());
+
+  /**
+   * shortCompressor.
+   */
+  private static Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private short[] shortValue;
+
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
+  private DataType actualDataType;
+
+  public CompressionNoneShort(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(short[] shortValue) {
+    this.shortValue = shortValue;
+  }
+
+  @Override public short[] getValue() { return this.shortValue; }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, data, offset, length);
+    setUncompressedValues(shortValue);
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_SHORT, shortValue);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    shortValue = ValueCompressionUtil.convertToShortArray(buffer, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getShort(index);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getShort(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal is not defined for CompressionNonShort");
+  }
+
+  private void setUncompressedValues(short[] data) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/compression/type/CompressionBigDecimal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/type/CompressionBigDecimal.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/type/CompressionBigDecimal.java
new file mode 100644
index 0000000..8233e90
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/type/CompressionBigDecimal.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastorage.compression.type;
+
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.BigDecimalCompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+/**
+ * Big decimal compression
+ */
+public class CompressionBigDecimal<T> extends ValueCompressionHolder<T> {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(CompressionBigDecimal.class.getName());
+
+  private BigDecimalCompressionFinder compressionFinder;
+
+  /**
+   * leftPart before decimal
+   */
+  private ValueCompressionHolder leftPart;
+
+  /**
+   * rightPart after decimal
+   */
+  private ValueCompressionHolder rightPart;
+
+  private double divisionFactor;
+
+  private boolean isDecimalPlacesNotZero;
+
+  public CompressionBigDecimal(BigDecimalCompressionFinder compressionFinder,
+      ValueCompressionHolder leftPart, ValueCompressionHolder rightPart) {
+    this.compressionFinder = compressionFinder;
+    this.leftPart = leftPart;
+    this.rightPart = rightPart;
+  }
+
+  @Override public void setValue(T value) {
+    Object[] values = (Object[]) value;
+    leftPart.setValue(values[0]);
+    rightPart.setValue(values[1]);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
+    if (decimalPlaces > 0) {
+      this.isDecimalPlacesNotZero = true;
+    }
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+
+    ByteBuffer buffer = ByteBuffer.wrap(data, offset, length);
+    int leftPathLength = buffer.getInt();
+    int rightPartLength = length - leftPathLength - CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    Long[] maxValue = (Long[]) maxValueObject;
+    leftPart.uncompress(compressionFinder.getLeftConvertedDataType(), data,
+        offset + CarbonCommonConstants.INT_SIZE_IN_BYTE, leftPathLength, decimalPlaces,
+        maxValue[0]);
+    rightPart.uncompress(compressionFinder.getRightConvertedDataType(), data,
+        offset + CarbonCommonConstants.INT_SIZE_IN_BYTE + leftPathLength, rightPartLength,
+        decimalPlaces, maxValue[1]);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long is not defined for CompressionBigDecimal");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    throw new UnsupportedOperationException(
+      "Double is not defined for CompressionBigDecimal");
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    long leftValue = leftPart.getLongValue(index);
+    long rightValue = 0;
+    if (isDecimalPlacesNotZero) {
+      rightValue = rightPart.getLongValue(index);
+    }
+    String decimalPart = Double.toString(rightValue / this.divisionFactor);
+    String bigdStr = Long.toString(leftValue) + CarbonCommonConstants.POINT + decimalPart
+            .substring(decimalPart.indexOf(".") + 1, decimalPart.length());
+    return new BigDecimal(bigdStr);
+  }
+
+  @Override public T getValue() {
+    Object[] values = new Object[2];
+    values[0] = leftPart;
+    values[1] = rightPart;
+    return (T) values;
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    LOGGER.error("setValueInBytes() is not defined for CompressionBigDecimal");
+  }
+
+  @Override public void compress() {
+    leftPart.compress();
+    rightPart.compress();
+  }
+
+  @Override
+  public byte[] getCompressedData() {
+    byte[] leftdata = leftPart.getCompressedData();
+    byte[] rightdata = rightPart.getCompressedData();
+    ByteBuffer byteBuffer = ByteBuffer
+        .allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + leftdata.length
+            + rightdata.length);
+    byteBuffer.putInt(leftdata.length);
+    byteBuffer.put(leftdata);
+    byteBuffer.put(rightdata);
+    byteBuffer.flip();
+    return byteBuffer.array();
+  }
+
+  @Override public void freeMemory() {
+    leftPart.freeMemory();
+    rightPart.freeMemory();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonReadDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonReadDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonReadDataHolder.java
new file mode 100644
index 0000000..29ff254
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonReadDataHolder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.dataholder;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+
+// This class is used with Uncompressor to hold the decompressed column chunk in memory
+public class CarbonReadDataHolder {
+
+  private ValueCompressionHolder unCompressValue;
+
+  public CarbonReadDataHolder(ValueCompressionHolder unCompressValue) {
+    this.unCompressValue = unCompressValue;
+  }
+
+  public long getReadableLongValueByIndex(int index) {
+    return this.unCompressValue.getLongValue(index);
+  }
+
+  public BigDecimal getReadableBigDecimalValueByIndex(int index) {
+    return this.unCompressValue.getBigDecimalValue(index);
+  }
+
+  public double getReadableDoubleValueByIndex(int index) {
+    return this.unCompressValue.getDoubleValue(index);
+  }
+
+  public void freeMemory() {
+    unCompressValue.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonWriteDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonWriteDataHolder.java
new file mode 100644
index 0000000..e6d9123
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonWriteDataHolder.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.dataholder;
+
+public class CarbonWriteDataHolder {
+  /**
+   * doubleValues
+   */
+  private double[] doubleValues;
+
+  /**
+   * longValues
+   */
+  private long[] longValues;
+
+  /**
+   * bigDecimal left part
+   */
+  private long[] bigDecimalLeftValues;
+
+  /**
+   * bigDecimal right part
+   */
+  private long[] bigDecimalRightValues;
+  /**
+   * byteValues
+   */
+  private byte[][] byteValues;
+
+  /**
+   * byteValues for no dictionary and non kettle flow.
+   */
+  private byte[][][] byteValuesForNonDictionary;
+
+  /**
+   * byteValues
+   */
+  private byte[][][] columnByteValues;
+
+  /**
+   * size
+   */
+  private int size;
+
+  /**
+   * totalSize
+   */
+  private int totalSize;
+
+  /**
+   * Method to initialise double array
+   *
+   * @param size
+   */
+  public void initialiseDoubleValues(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+    doubleValues = new double[size];
+  }
+
+  public void reset() {
+    size = 0;
+    totalSize = 0;
+  }
+
+  /**
+   * Method to initialise double array
+   * TODO Remove after kettle flow got removed.
+   *
+   * @param size
+   */
+  public void initialiseByteArrayValues(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+
+    byteValues = new byte[size][];
+    columnByteValues = new byte[size][][];
+  }
+
+  /**
+   * Method to initialise byte array
+   *
+   * @param size
+   */
+  public void initialiseByteArrayValuesWithOutKettle(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+
+    byteValues = new byte[size][];
+  }
+
+  public void initialiseByteArrayValuesForNonDictionary(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+
+    byteValuesForNonDictionary = new byte[size][][];
+  }
+
+  /**
+   * Method to initialise long array
+   *
+   * @param size
+   */
+  public void initialiseLongValues(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+    longValues = new long[size];
+  }
+
+  public void initialiseBigDecimalValues(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+    bigDecimalLeftValues = new long[size];
+    bigDecimalRightValues = new long[size];
+  }
+
+  /**
+   * set double value by index
+   *
+   * @param index
+   * @param value
+   */
+  public void setWritableDoubleValueByIndex(int index, Object value) {
+    doubleValues[index] = (Double) value;
+    size++;
+  }
+
+  /**
+   * set double value by index
+   *
+   * @param index
+   * @param value
+   */
+  public void setWritableLongValueByIndex(int index, Object value) {
+    longValues[index] = (Long) value;
+    size++;
+  }
+
+  /**
+   * set bigdecimal value by index
+   *
+   * @param index
+   * @param value
+   */
+  public void setWritableBigDecimalValueByIndex(int index, long[] value) {
+    bigDecimalLeftValues[index] = value[0];
+    bigDecimalRightValues[index] = value[1];
+    size++;
+  }
+  /**
+   * set byte array value by index
+   *
+   * @param index
+   * @param value
+   */
+  public void setWritableByteArrayValueByIndex(int index, byte[] value) {
+    byteValues[index] = value;
+    size++;
+    if (null != value) totalSize += value.length;
+  }
+
+  public void setWritableNonDictByteArrayValueByIndex(int index, byte[][] value) {
+    byteValuesForNonDictionary[index] = value;
+    size++;
+    if (null != value) totalSize += value.length;
+  }
+
+  /**
+   * set byte array value by index
+   */
+  public void setWritableByteArrayValueByIndex(int index, int mdKeyIndex, Object[] columnData) {
+    int l = 0;
+    columnByteValues[index] = new byte[columnData.length - (mdKeyIndex + 1)][];
+    for (int i = mdKeyIndex + 1; i < columnData.length; i++) {
+      columnByteValues[index][l++] = (byte[]) columnData[i];
+    }
+  }
+
+  /**
+   * Get Writable Double Values
+   */
+  public double[] getWritableDoubleValues() {
+    if (size < doubleValues.length) {
+      double[] temp = new double[size];
+      System.arraycopy(doubleValues, 0, temp, 0, size);
+      doubleValues = temp;
+    }
+    return doubleValues;
+  }
+
+  /**
+   * Get writable byte array values
+   */
+  public byte[] getWritableByteArrayValues() {
+    byte[] temp = new byte[totalSize];
+    int startIndexToCopy = 0;
+    for (int i = 0; i < size; i++) {
+      System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length);
+      startIndexToCopy += byteValues[i].length;
+    }
+    return temp;
+  }
+
+  public byte[][] getByteArrayValues() {
+    if (size < byteValues.length) {
+      byte[][] temp = new byte[size][];
+      System.arraycopy(byteValues, 0, temp, 0, size);
+      byteValues = temp;
+    }
+    return byteValues;
+  }
+
+  public byte[][][] getNonDictByteArrayValues() {
+    if (size < byteValuesForNonDictionary.length) {
+      byte[][][] temp = new byte[size][][];
+      System.arraycopy(byteValuesForNonDictionary, 0, temp, 0, size);
+      byteValuesForNonDictionary = temp;
+    }
+    return byteValuesForNonDictionary;
+  }
+
+  /**
+   * Get Writable Double Values
+   *
+   * @return
+   */
+  public long[] getWritableLongValues() {
+    if (size < longValues.length) {
+      long[] temp = new long[size];
+      System.arraycopy(longValues, 0, temp, 0, size);
+      longValues = temp;
+    }
+    return longValues;
+  }
+
+  /**
+   * Get Writable bigdecimal Values
+   *
+   * @return
+   */
+  public long[][] getWritableBigDecimalValues() {
+    long[][] bigDecimalValues = new long[2][];
+    if (size < bigDecimalLeftValues.length) {
+      long[] temp = new long[size];
+      System.arraycopy(bigDecimalLeftValues, 0, temp, 0, size);
+      bigDecimalLeftValues = temp;
+    }
+    if (size < bigDecimalRightValues.length) {
+      long[] temp = new long[size];
+      System.arraycopy(bigDecimalRightValues, 0, temp, 0, size);
+      bigDecimalRightValues = temp;
+    }
+    bigDecimalValues[0]= bigDecimalLeftValues;
+    bigDecimalValues[1] = bigDecimalRightValues;
+    return bigDecimalValues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AbstractDFSCarbonFile.java
new file mode 100644
index 0000000..8963702
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AbstractDFSCarbonFile.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.filesystem;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public abstract  class AbstractDFSCarbonFile implements CarbonFile {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName());
+  protected FileStatus fileStatus;
+  protected FileSystem fs;
+
+  public AbstractDFSCarbonFile(String filePath) {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    try {
+      fs = path.getFileSystem(FileFactory.getConfiguration());
+      fileStatus = fs.getFileStatus(path);
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+    }
+  }
+
+  public AbstractDFSCarbonFile(Path path) {
+    try {
+      fs = path.getFileSystem(FileFactory.getConfiguration());
+      fileStatus = fs.getFileStatus(path);
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+    }
+  }
+
+  public AbstractDFSCarbonFile(FileStatus fileStatus) {
+    this.fileStatus = fileStatus;
+  }
+
+  @Override public boolean createNewFile() {
+    Path path = fileStatus.getPath();
+    try {
+      return fs.createNewFile(path);
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override public String getAbsolutePath() {
+    return fileStatus.getPath().toString();
+  }
+
+  @Override public String getName() {
+    return fileStatus.getPath().getName();
+  }
+
+  @Override public boolean isDirectory() {
+    return fileStatus.isDirectory();
+  }
+
+  @Override public boolean exists() {
+    try {
+      if (null != fileStatus) {
+        fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+        return fs.exists(fileStatus.getPath());
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+    }
+    return false;
+  }
+
+  @Override public String getCanonicalPath() {
+    return getAbsolutePath();
+  }
+
+  @Override public String getPath() {
+    return getAbsolutePath();
+  }
+
+  @Override public long getSize() {
+    return fileStatus.getLen();
+  }
+
+  public boolean renameTo(String changetoName) {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      return fs.rename(fileStatus.getPath(), new Path(changetoName));
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+      return false;
+    }
+  }
+
+  public boolean delete() {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      return fs.delete(fileStatus.getPath(), true);
+    } catch (IOException e) {
+      LOGGER.error("Exception occured:" + e.getMessage());
+      return false;
+    }
+  }
+
+  @Override public long getLastModifiedTime() {
+    return fileStatus.getModificationTime();
+  }
+
+  @Override public boolean setLastModifiedTime(long timestamp) {
+    try {
+      fs.setTimes(fileStatus.getPath(), timestamp, timestamp);
+    } catch (IOException e) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * This method will delete the data in file data from a given offset
+   */
+  @Override public boolean truncate(String fileName, long validDataEndOffset) {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    boolean fileTruncatedSuccessfully = false;
+    // if bytes to read less than 1024 then buffer size should be equal to the given offset
+    int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ?
+        CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR :
+        (int) validDataEndOffset;
+    // temporary file name
+    String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
+    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+    try {
+      CarbonFile tempFile = null;
+      // delete temporary file if it already exists at a given path
+      if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
+        tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+        tempFile.delete();
+      }
+      // create new temporary file
+      FileFactory.createNewFile(tempWriteFilePath, fileType);
+      tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+      byte[] buff = new byte[bufferSize];
+      dataInputStream = FileFactory.getDataInputStream(fileName, fileType);
+      // read the data
+      int read = dataInputStream.read(buff, 0, buff.length);
+      dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType);
+      dataOutputStream.write(buff, 0, read);
+      long remaining = validDataEndOffset - read;
+      // anytime we should not cross the offset to be read
+      while (remaining > 0) {
+        if (remaining > bufferSize) {
+          buff = new byte[bufferSize];
+        } else {
+          buff = new byte[(int) remaining];
+        }
+        read = dataInputStream.read(buff, 0, buff.length);
+        dataOutputStream.write(buff, 0, read);
+        remaining = remaining - read;
+      }
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+      // rename the temp file to original file
+      tempFile.renameForce(fileName);
+      fileTruncatedSuccessfully = true;
+    } catch (IOException e) {
+      LOGGER.error("Exception occured while truncating the file " + e.getMessage());
+    } finally {
+      CarbonUtil.closeStreams(dataOutputStream, dataInputStream);
+    }
+    return fileTruncatedSuccessfully;
+  }
+
+  /**
+   * This method will be used to check whether a file has been modified or not
+   *
+   * @param fileTimeStamp time to be compared with latest timestamp of file
+   * @param endOffset     file length to be compared with current length of file
+   * @return
+   */
+  @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
+    boolean isFileModified = false;
+    if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
+      isFileModified = true;
+    }
+    return isFileModified;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AlluxioCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AlluxioCarbonFile.java
new file mode 100644
index 0000000..e4ce2b6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AlluxioCarbonFile.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.filesystem;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+
+
+public class AlluxioCarbonFile extends AbstractDFSCarbonFile {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AlluxioCarbonFile.class.getName());
+
+  public AlluxioCarbonFile(String filePath) {
+    super(filePath);
+  }
+
+  public AlluxioCarbonFile(Path path) {
+    super(path);
+  }
+
+  public AlluxioCarbonFile(FileStatus fileStatus) {
+    super(fileStatus);
+  }
+
+  /**
+   * @param listStatus
+   * @return
+   */
+  private CarbonFile[] getFiles(FileStatus[] listStatus) {
+    if (listStatus == null) {
+      return new CarbonFile[0];
+    }
+    CarbonFile[] files = new CarbonFile[listStatus.length];
+    for (int i = 0; i < files.length; i++) {
+      files[i] = new AlluxioCarbonFile(listStatus[i]);
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile[] listFiles() {
+    FileStatus[] listStatus = null;
+    try {
+      if (null != fileStatus && fileStatus.isDirectory()) {
+        Path path = fileStatus.getPath();
+        listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+      } else {
+        return null;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return new CarbonFile[0];
+    }
+    return getFiles(listStatus);
+  }
+
+  @Override
+  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+    CarbonFile[] files = listFiles();
+    if (files != null && files.length >= 1) {
+      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+      for (int i = 0; i < files.length; i++) {
+        if (fileFilter.accept(files[i])) {
+          fileList.add(files[i]);
+        }
+      }
+      if (fileList.size() >= 1) {
+        return fileList.toArray(new CarbonFile[fileList.size()]);
+      } else {
+        return new CarbonFile[0];
+      }
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile getParentFile() {
+    Path parent = fileStatus.getPath().getParent();
+    return null == parent ? null : new AlluxioCarbonFile(parent);
+  }
+
+  @Override
+  public boolean renameForce(String changetoName) {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      if (fs instanceof DistributedFileSystem) {
+        ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
+            org.apache.hadoop.fs.Options.Rename.OVERWRITE);
+        return true;
+      } else {
+        return false;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFile.java
new file mode 100644
index 0000000..0ac4d52
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFile.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.filesystem;
+
+public interface CarbonFile {
+
+  String getAbsolutePath();
+
+  CarbonFile[] listFiles(CarbonFileFilter fileFilter);
+
+  CarbonFile[] listFiles();
+
+  String getName();
+
+  boolean isDirectory();
+
+  boolean exists();
+
+  String getCanonicalPath();
+
+  CarbonFile getParentFile();
+
+  String getPath();
+
+  long getSize();
+
+  boolean renameTo(String changetoName);
+
+  boolean renameForce(String changetoName);
+
+  boolean delete();
+
+  boolean createNewFile();
+
+  long getLastModifiedTime();
+
+  boolean setLastModifiedTime(long timestamp);
+
+  boolean truncate(String fileName, long validDataEndOffset);
+
+  /**
+   * This method will be used to check whether a file has been modified or not
+   *
+   * @param fileTimeStamp time to be compared with latest timestamp of file
+   * @param endOffset     file length to be compared with current length of file
+   * @return
+   */
+  boolean isFileModified(long fileTimeStamp, long endOffset);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFileFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFileFilter.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFileFilter.java
new file mode 100644
index 0000000..e382f92
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFileFilter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.filesystem;
+
+public interface CarbonFileFilter {
+  boolean accept(CarbonFile file);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/HDFSCarbonFile.java
new file mode 100644
index 0000000..a2aaca1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/HDFSCarbonFile.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.filesystem;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+public class HDFSCarbonFile extends AbstractDFSCarbonFile {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(HDFSCarbonFile.class.getName());
+
+  public HDFSCarbonFile(String filePath) {
+    super(filePath);
+  }
+
+  public HDFSCarbonFile(Path path) {
+    super(path);
+  }
+
+  public HDFSCarbonFile(FileStatus fileStatus) {
+    super(fileStatus);
+  }
+
+  /**
+   * @param listStatus
+   * @return
+   */
+  private CarbonFile[] getFiles(FileStatus[] listStatus) {
+    if (listStatus == null) {
+      return new CarbonFile[0];
+    }
+    CarbonFile[] files = new CarbonFile[listStatus.length];
+    for (int i = 0; i < files.length; i++) {
+      files[i] = new HDFSCarbonFile(listStatus[i]);
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile[] listFiles() {
+    FileStatus[] listStatus = null;
+    try {
+      if (null != fileStatus && fileStatus.isDirectory()) {
+        Path path = fileStatus.getPath();
+        listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+      } else {
+        return null;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return new CarbonFile[0];
+    }
+    return getFiles(listStatus);
+  }
+
+  @Override
+  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+    CarbonFile[] files = listFiles();
+    if (files != null && files.length >= 1) {
+      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+      for (int i = 0; i < files.length; i++) {
+        if (fileFilter.accept(files[i])) {
+          fileList.add(files[i]);
+        }
+      }
+      if (fileList.size() >= 1) {
+        return fileList.toArray(new CarbonFile[fileList.size()]);
+      } else {
+        return new CarbonFile[0];
+      }
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile getParentFile() {
+    Path parent = fileStatus.getPath().getParent();
+    return null == parent ? null : new HDFSCarbonFile(parent);
+  }
+
+  @Override
+  public boolean renameForce(String changetoName) {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      if (fs instanceof DistributedFileSystem) {
+        ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
+            org.apache.hadoop.fs.Options.Rename.OVERWRITE);
+        return true;
+      } else {
+        return false;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return false;
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message