parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [26/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:23 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
new file mode 100644
index 0000000..2f311ec
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
@@ -0,0 +1,208 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+
+public class TestBitPackingColumn {
+  private static final Log LOG = Log.getLog(TestBitPackingColumn.class);
+
+  @Test
+  public void testZero() throws IOException {
+    int bitLength = 0;
+    int[] vals = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+    String expected = "";
+    validateEncodeDecode(bitLength, vals, expected);
+  }
+
+  @Test
+  public void testOne_0() throws IOException {
+    int[] vals = {0};
+    String expected = "00000000";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testOne_1() throws IOException {
+    int[] vals = {1};
+    String expected = "10000000";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testOne_0_0() throws IOException {
+    int[] vals = {0, 0};
+    String expected = "00000000";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testOne_1_1() throws IOException {
+    int[] vals = {1, 1};
+    String expected = "11000000";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testOne_9_1s() throws IOException {
+    int[] vals = {1, 1, 1, 1, 1, 1, 1, 1, 1};
+    String expected = "11111111 10000000";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testOne_9_0s() throws IOException {
+    int[] vals = {0, 0, 0, 0, 0, 0, 0, 0, 0};
+    String expected = "00000000 00000000";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testOne_7_0s_1_1() throws IOException {
+    int[] vals = {0, 0, 0, 0, 0, 0, 0, 1};
+    String expected = "00000001";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testOne_9_0s_1_1() throws IOException {
+    int[] vals = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1};
+    String expected = "00000000 01000000";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testOne() throws IOException {
+    int[] vals = {0, 1, 0, 0, 1, 1, 1, 0, 0, 1};
+    String expected = "01001110 01000000";
+    validateEncodeDecode(1, vals, expected);
+  }
+
+  @Test
+  public void testTwo() throws IOException {
+    int[] vals = {0, 1, 2, 3, 3, 3, 2, 1, 1, 0, 0, 0, 1};
+    String expected = "00011011 11111001 01000000 01000000";
+    validateEncodeDecode(2, vals, expected);
+  }
+
+  @Test
+  public void testThree() throws IOException {
+    int[] vals = {0, 1, 2, 3, 4, 5, 6, 7, 1};
+    String expected =
+        "00000101 00111001 01110111 " +
+        "00100000";
+    validateEncodeDecode(3, vals, expected);
+  }
+
+  @Test
+  public void testFour() throws IOException {
+    int[] vals = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 1};
+    String expected = "00000001 00100011 01000101 01100111 10001001 10101011 11001101 11101111 00010000";
+    validateEncodeDecode(4, vals, expected);
+  }
+
+  @Test
+  public void testFive() throws IOException {
+    int[] vals = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 1};
+    String expected =
+        "00000000 01000100 00110010 00010100 11000111 " +
+        "01000010 01010100 10110110 00110101 11001111 " +
+        "10000100 01100101 00111010 01010110 11010111 " +
+        "11000110 01110101 10111110 01110111 11011111 " +
+        "00001000";
+    validateEncodeDecode(5, vals, expected);
+  }
+
+  @Test
+  public void testSix() throws IOException {
+    int[] vals = { 0, 28, 34, 35, 63, 1};
+    // 000000, 011100, 100010, 100011, 111111, 000001
+    String expected =
+        "00000001 11001000 10100011 " +
+        "11111100 00010000";
+    validateEncodeDecode(6, vals, expected);
+  }
+
+  @Test
+  public void testSeven() throws IOException {
+    int[] vals = { 0, 28, 34, 35, 63, 1, 125, 1, 1};
+    // 0000000, 0011100, 0100010, 0100011, 0111111, 0000001, 1111101, 0000001, 0000001
+    String expected =
+        "00000000 01110001 00010010 00110111 11100000 01111110 10000001 " +
+        "00000010";
+    validateEncodeDecode(7, vals, expected);
+  }
+
+  private void validateEncodeDecode(int bitLength, int[] vals, String expected) throws IOException {
+    for (PACKING_TYPE type : PACKING_TYPE.values()) {
+      LOG.debug(type);
+      final int bound = (int)Math.pow(2, bitLength) - 1;
+      ValuesWriter w = type.getWriter(bound);
+      for (int i : vals) {
+        w.writeInteger(i);
+      }
+      byte[] bytes = w.getBytes().toByteArray();
+      LOG.debug("vals ("+bitLength+"): " + TestBitPacking.toString(vals));
+      LOG.debug("bytes: " + TestBitPacking.toString(bytes));
+      assertEquals(type.toString(), expected, TestBitPacking.toString(bytes));
+      ValuesReader r = type.getReader(bound);
+      r.initFromPage(vals.length, bytes, 0);
+      int[] result = new int[vals.length];
+      for (int i = 0; i < result.length; i++) {
+        result[i] = r.readInteger();
+      }
+      LOG.debug("result: " + TestBitPacking.toString(result));
+      assertArrayEquals(type + " result: " + TestBitPacking.toString(result), vals, result);
+    }
+  }
+
+  private static enum PACKING_TYPE {
+    BYTE_BASED_MANUAL {
+      public ValuesReader getReader(final int bound) {
+        return new BitPackingValuesReader(bound);
+      }
+      public ValuesWriter getWriter(final int bound) {
+        return new BitPackingValuesWriter(bound, 32*1024, 64*1024);
+      }
+    }
+    ,
+    BYTE_BASED_GENERATED {
+      public ValuesReader getReader(final int bound) {
+        return new ByteBitPackingValuesReader(bound, BIG_ENDIAN);
+      }
+      public ValuesWriter getWriter(final int bound) {
+        return new ByteBitPackingValuesWriter(bound, BIG_ENDIAN);
+      }
+    }
+    ;
+    abstract public ValuesReader getReader(final int bound);
+    abstract public ValuesWriter getWriter(final int bound);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
new file mode 100644
index 0000000..ba979b7
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
@@ -0,0 +1,172 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class TestBoundedColumns {
+  private final Random r = new Random(42L);
+
+  @Test
+  public void testWriterRepeatNoRepeatAndRepeatUnderThreshold() throws IOException {
+    int[] ints = {
+        1, 1, 1, 1,
+        0,
+        0,
+        2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, // 16 2s
+        1,
+        5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 // 24 5s
+        };
+    String[] result = {"1",b(1,3),b(4),"0",b(0,3),"0",b(0,3),"1",b(2,3),b(16),"0",b(1,3),"1",b(5,3),b(24)};
+    compareOutput(7, ints, result);
+  }
+
+  @Test
+  public void testWriterNoRepeat() throws IOException {
+    int bound = 7;
+    int[] ints = { 0, 1, 2, 3, 4, 5, 6, 7};
+    String[] result = {"0",b(0,3),"0",b(1,3),"0",b(2,3),"0",b(3,3),"0",b(4,3),"0",b(5,3),"0",b(6,3),"0",b(7,3)};
+    compareOutput(bound, ints, result);
+  }
+
+  private void compareOutput(int bound, int[] ints, String[] result) throws IOException {
+    BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024);
+    for (int i : ints) {
+      bicw.writeInteger(i);
+    }
+    System.out.println(Arrays.toString(ints));
+    System.out.println(Arrays.toString(result));
+    byte[] byteArray = bicw.getBytes().toByteArray();
+    assertEquals(concat(result), toBinaryString(byteArray, 4));
+    BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound);
+    bicr.initFromPage(1, byteArray, 0);
+    String expected = "";
+    String got = "";
+    for (int i : ints) {
+      expected += " " + i;
+      got += " " + bicr.readInteger();
+    }
+    assertEquals(expected, got);
+  }
+
+  private String concat(String[] result) {
+    String r = "";
+    for (String string : result) {
+      r = string + r;
+    }
+    return r;
+  }
+
+  private String b(int i) {
+    return b(i,8);
+  }
+
+  private String b(int i, int size) {
+    String binaryString = Integer.toBinaryString(i);
+    while (binaryString.length() < size) {
+      binaryString = "0" + binaryString;
+    }
+    return binaryString;
+  }
+
+  public static String toBinaryString(byte[] bytes) {
+    return toBinaryString(bytes, 0);
+  }
+
+  private static String toBinaryString(byte[] bytes, int offset) {
+    String result = "";
+    for (int i = offset; i < bytes.length; i++) {
+      int b = bytes[i] < 0 ? 256 + bytes[i] : bytes[i];
+      String binaryString = Integer.toBinaryString(b);
+      while (binaryString.length() < 8) {
+        binaryString = "0" + binaryString;
+      }
+      result = binaryString + result;
+    }
+    return result;
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    int[] valuesPerStripe = new int[] { 50, 100, 700, 1, 200 };
+    int totalValuesInStream = 0;
+    for (int v : valuesPerStripe) {
+      totalValuesInStream += v * 2;
+    }
+
+    for (int bound = 1; bound < 8; bound++) {
+      System.out.println("bound: "+ bound);
+      ByteArrayOutputStream tmp = new ByteArrayOutputStream();
+
+      int[] stream = new int[totalValuesInStream];
+      BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024);
+      int idx = 0;
+      for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) {
+        int next = 0;
+        for (int i = 0; i < valuesPerStripe[stripeNum]; i++) {
+          int temp = r.nextInt(bound + 1);
+          while (next == temp) {
+            temp = r.nextInt(bound + 1);
+          }
+          next = temp;
+          stream[idx++] = next;
+          int ct;
+          if (r.nextBoolean()) {
+            stream[idx++] = ct = r.nextInt(1000) + 1;
+          } else {
+            stream[idx++] = ct = 1;
+          }
+          for (int j = 0; j < ct; j++) {
+            bicw.writeInteger(next);
+          }
+        }
+        bicw.getBytes().writeAllTo(tmp);
+        bicw.reset();
+      }
+      tmp.close();
+
+      byte[] input = tmp.toByteArray();
+
+      BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound);
+      idx = 0;
+      int offset = 0;
+      for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) {
+        bicr.initFromPage(1, input, offset);
+        offset = bicr.getNextOffset();
+        for (int i = 0; i < valuesPerStripe[stripeNum]; i++) {
+          int number = stream[idx++];
+          int ct = stream[idx++];
+          assertTrue(number <= bound);
+          assertTrue(ct > 0);
+          for (int j = 0; j < ct; j++) {
+            assertEquals("Failed on bound ["+bound+"], stripe ["+stripeNum+"], iteration ["+i+"], on count ["+ct+"]", number, bicr.readInteger());
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
new file mode 100644
index 0000000..d428fbf
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
@@ -0,0 +1,262 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class DeltaBinaryPackingValuesWriterTest {
+  DeltaBinaryPackingValuesReader reader;
+  private int blockSize;
+  private int miniBlockNum;
+  private ValuesWriter writer;
+  private Random random;
+
+  @Before
+  public void setUp() {
+    blockSize = 128;
+    miniBlockNum = 4;
+    writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200);
+    random = new Random();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void miniBlockSizeShouldBeMultipleOf8() {
+    new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100);
+  }
+
+  /* When data size is multiple of Block*/
+  @Test
+  public void shouldWriteWhenDataIsAlignedWithBlock() throws IOException {
+    int[] data = new int[5 * blockSize];
+    for (int i = 0; i < blockSize * 5; i++) {
+      data[i] = random.nextInt();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenBlockIsNotFullyWritten() throws IOException {
+    int[] data = new int[blockSize - 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenAMiniBlockIsNotFullyWritten() throws IOException {
+    int miniBlockSize = blockSize / miniBlockNum;
+    int[] data = new int[miniBlockSize - 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteNegativeDeltas() throws IOException {
+    int[] data = new int[blockSize];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = 10 - (i * 32 - random.nextInt(6));
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenDeltasAreSame() throws IOException {
+    int[] data = new int[2 * blockSize];
+    for (int i = 0; i < blockSize; i++) {
+      data[i] = i * 32;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenValuesAreSame() throws IOException {
+    int[] data = new int[2 * blockSize];
+    for (int i = 0; i < blockSize; i++) {
+      data[i] = 3;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteWhenDeltaIs0ForEachBlock() throws IOException {
+    int[] data = new int[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = (i - 1) / blockSize;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReadWriteWhenDataIsNotAlignedWithBlock() throws IOException {
+    int[] data = new int[5 * blockSize + 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt(20) - 10;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReadMaxMinValue() throws IOException {
+    int[] data = new int[10];
+    for (int i = 0; i < data.length; i++) {
+      if(i%2==0) {
+        data[i]=Integer.MIN_VALUE;
+      }else {
+        data[i]=Integer.MAX_VALUE;
+      }
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
+    int[] data = new int[2 * blockSize + 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    writeData(data);
+
+    reader = new DeltaBinaryPackingValuesReader();
+    BytesInput bytes = writer.getBytes();
+    byte[] valueContent = bytes.toByteArray();
+    byte[] pageContent = new byte[valueContent.length * 10];
+    int contentOffsetInPage = 33;
+    System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
+
+    //offset should be correct
+    reader.initFromPage(100, pageContent, contentOffsetInPage);
+    int offset= reader.getNextOffset();
+    assertEquals(valueContent.length + contentOffsetInPage, offset);
+
+    //should be able to read data correclty
+    for (int i : data) {
+      assertEquals(i, reader.readInteger());
+    }
+  }
+
+  @Test
+  public void shouldThrowExceptionWhenReadMoreThanWritten() throws IOException {
+    int[] data = new int[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    shouldWriteAndRead(data);
+    try {
+      reader.readInteger();
+    } catch (ParquetDecodingException e) {
+      assertEquals("no more value to read, total value count is " + data.length, e.getMessage());
+    }
+
+  }
+
+  @Test
+  public void shouldSkip() throws IOException {
+    int[] data = new int[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    writeData(data);
+    reader = new DeltaBinaryPackingValuesReader();
+    reader.initFromPage(100, writer.getBytes().toByteArray(), 0);
+    for (int i = 0; i < data.length; i++) {
+      if (i % 3 == 0) {
+        reader.skip();
+      } else {
+        assertEquals(i * 32, reader.readInteger());
+      }
+    }
+  }
+
+  @Test
+  public void shouldReset() throws IOException {
+    shouldReadWriteWhenDataIsNotAlignedWithBlock();
+    int[] data = new int[5 * blockSize];
+    for (int i = 0; i < blockSize * 5; i++) {
+      data[i] = i * 2;
+    }
+    writer.reset();
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void randomDataTest() throws IOException {
+    int maxSize = 1000;
+    int[] data = new int[maxSize];
+
+    for (int round = 0; round < 100000; round++) {
+
+
+      int size = random.nextInt(maxSize);
+
+      for (int i = 0; i < size; i++) {
+        data[i] = random.nextInt();
+      }
+      shouldReadAndWrite(data, size);
+      writer.reset();
+    }
+  }
+
+  private void shouldWriteAndRead(int[] data) throws IOException {
+    shouldReadAndWrite(data, data.length);
+  }
+
+  private void shouldReadAndWrite(int[] data, int length) throws IOException {
+    writeData(data, length);
+    reader = new DeltaBinaryPackingValuesReader();
+    byte[] page = writer.getBytes().toByteArray();
+    int miniBlockSize = blockSize / miniBlockNum;
+
+    double miniBlockFlushed = Math.ceil(((double) length - 1) / miniBlockSize);
+    double blockFlushed = Math.ceil(((double) length - 1) / blockSize);
+    double estimatedSize = 4 * 5 //blockHeader
+        + 4 * miniBlockFlushed * miniBlockSize //data(aligned to miniBlock)
+        + blockFlushed * miniBlockNum //bitWidth of mini blocks
+        + (5.0 * blockFlushed);//min delta for each block
+    assertTrue(estimatedSize >= page.length);
+    reader.initFromPage(100, page, 0);
+
+    for (int i = 0; i < length; i++) {
+      assertEquals(data[i], reader.readInteger());
+    }
+  }
+
+  private void writeData(int[] data) {
+    writeData(data, data.length);
+  }
+
+  private void writeData(int[] data, int length) {
+    for (int i = 0; i < length; i++) {
+      writer.writeInteger(data[i]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchMarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchMarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchMarkTest.java
new file mode 100644
index 0000000..6b02b9a
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchMarkTest.java
@@ -0,0 +1,43 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta.benchmark;
+
+import org.apache.parquet.column.values.ValuesWriter;
+
+public abstract class BenchMarkTest {
+  public static int[] data;
+
+  protected void runWriteTest(ValuesWriter writer){
+    int pageCount = 10;
+    double avg = 0.0;
+    for (int i = 0; i < pageCount ; i++) {
+      writer.reset();
+      long startTime = System.nanoTime();
+      for(int item:data){
+        writer.writeInteger(item);
+      }
+      long endTime = System.nanoTime();
+      long duration = endTime - startTime;
+      avg += (double) duration / pageCount;
+    }
+
+    System.out.println("size is "+writer.getBytes().size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
new file mode 100644
index 0000000..dc69fcc
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
@@ -0,0 +1,102 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta.benchmark;
+
+import org.junit.Test;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import java.util.Random;
+
+public class BenchmarkIntegerOutputSize {
+  public static int blockSize=128;
+  public static int miniBlockNum=4;
+  public static int dataSize=10000 * blockSize;
+
+  private interface IntFunc {
+    public int getIntValue();
+  }
+
+  @Test
+  public void testBigNumbers() {
+    final Random r=new Random();
+    testRandomIntegers(new IntFunc() {
+      @Override
+      public int getIntValue() {
+        return r.nextInt();
+      }
+    },32);
+  }
+
+  @Test
+  public void testRangedNumbersWithSmallVariations() {
+    final Random r=new Random();
+    testRandomIntegers(new IntFunc() {
+      @Override
+      public int getIntValue() {
+        return 1000+r.nextInt(20);
+      }
+    },10);
+  }
+
+  @Test
+  public void testSmallNumbersWithSmallVariations() {
+    final Random r=new Random();
+    testRandomIntegers(new IntFunc() {
+      @Override
+      public int getIntValue() {
+        return 40+r.nextInt(20);
+      }
+    },6);
+  }
+
+  @Test
+  public void testSmallNumberVariation() {
+      final Random r=new Random();
+      testRandomIntegers(new IntFunc() {
+        @Override
+        public int getIntValue() {
+          return r.nextInt(20)-10;
+        }
+      },4);
+  }
+
+  public void testRandomIntegers(IntFunc func,int bitWidth) {
+    DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000);
+    RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000);
+    for (int i = 0; i < dataSize; i++) {
+      int v = func.getIntValue();
+      delta.writeInteger(v);
+      rle.writeInteger(v);
+    }
+    System.out.println("delta size: "+delta.getBytes().size());
+    System.out.println("estimated size"+estimatedSize());
+    System.out.println("rle size: "+rle.getBytes().size());
+  }
+
+  private double estimatedSize(){
+    int miniBlockSize = blockSize / miniBlockNum;
+    double miniBlockFlushed = Math.ceil(((double) dataSize - 1) / miniBlockSize);
+    double blockFlushed = Math.ceil(((double) dataSize - 1) / blockSize);
+    double estimatedSize = 4 * 5 //blockHeader
+            + 4 * miniBlockFlushed * miniBlockSize //data(aligned to miniBlock)
+            + blockFlushed * miniBlockNum //bitWidth of mini blocks
+            + (5.0 * blockFlushed);//min delta for each block
+    return estimatedSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
new file mode 100644
index 0000000..24b007f
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
@@ -0,0 +1,96 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta.benchmark;
+
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.BenchmarkRule;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+
+import java.io.IOException;
+import java.util.Random;
+
+@AxisRange(min = 0, max = 1)
+@BenchmarkMethodChart(filePrefix = "benchmark-encoding-reading-random")
+public class BenchmarkReadingRandomIntegers {
+  public static int blockSize = 128;
+  public static int miniBlockNum = 4;
+  public static byte[] deltaBytes;
+  public static byte[] rleBytes;
+  public static int[] data;
+  @Rule
+  public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule();
+
+  @BeforeClass
+  public static void prepare() throws IOException {
+    Random random = new Random();
+    data = new int[100000 * blockSize];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt(100) - 200;
+    }
+
+    ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
+    ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000);
+
+    for (int i = 0; i < data.length; i++) {
+      delta.writeInteger(data[i]);
+      rle.writeInteger(data[i]);
+    }
+    deltaBytes = delta.getBytes().toByteArray();
+    rleBytes = rle.getBytes().toByteArray();
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 10)
+  @Test
+  public void readingDelta() throws IOException {
+    for (int j = 0; j < 10; j++) {
+
+      DeltaBinaryPackingValuesReader reader = new DeltaBinaryPackingValuesReader();
+      readData(reader, deltaBytes);
+    }
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 10)
+  @Test
+  public void readingRLE() throws IOException {
+    for (int j = 0; j < 10; j++) {
+
+      ValuesReader reader = new RunLengthBitPackingHybridValuesReader(32);
+      readData(reader, rleBytes);
+    }
+  }
+
+  private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException {
+    reader.initFromPage(data.length, deltaBytes, 0);
+    for (int i = 0; i < data.length; i++) {
+      reader.readInteger();
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
new file mode 100644
index 0000000..50c97cf
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
@@ -0,0 +1,70 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta.benchmark;
+
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.BenchmarkRule;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import java.util.Random;
+
+@AxisRange(min = 0, max = 1)
+@BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random")
+public class RandomWritingBenchmarkTest extends BenchMarkTest{
+  public static int blockSize=128;
+  public static int miniBlockNum=4;
+  @Rule
+  public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule();
+
+  @BeforeClass
+  public static void prepare() {
+    Random random=new Random();
+    data = new int[10000 * blockSize];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt(100) - 200;
+    }
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
+  @Test
+  public void writeDeltaPackingTest(){
+    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
+    runWriteTest(writer);
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
+  @Test
+  public void writeRLETest(){
+    ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000);
+    runWriteTest(writer);
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
+  @Test
+  public void writeDeltaPackingTest2(){
+    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
+    runWriteTest(writer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
new file mode 100644
index 0000000..3141fd7
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
@@ -0,0 +1,48 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta.benchmark;
+
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import java.util.Random;
+
+@AxisRange(min = 0, max = 2)
+@BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random-small")
+public class SmallRangeWritingBenchmarkTest extends RandomWritingBenchmarkTest {
+  @BeforeClass
+  public static void prepare() {
+    Random random=new Random();
+    data = new int[100000 * blockSize];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt(2) - 1;
+    }
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
+  @Test
+  public void writeRLEWithSmallBitWidthTest(){
+    ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000);
+    runWriteTest(writer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
new file mode 100644
index 0000000..aaae064
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
@@ -0,0 +1,74 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltalengthbytearray;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.apache.parquet.column.values.Utils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+public class TestDeltaLengthByteArray {
+
+  String[] values = { "parquet", "hadoop", "mapreduce"};
+
+  @Test
+  public void testSerialization () throws IOException {
+    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+    DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
+
+    Utils.writeData(writer, values);
+    Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
+
+    for(int i =0; i< bin.length ; i++) {
+      Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
+    }
+  }
+
+  @Test
+  public void testRandomStrings() throws IOException {
+    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+    DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
+
+    String[] values = Utils.getRandomStringSamples(1000, 32);
+    Utils.writeData(writer, values);
+    Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
+
+    for(int i =0; i< bin.length ; i++) {
+      Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
+    }
+  }
+
+  @Test
+  public void testLengths() throws IOException {
+    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+    ValuesReader reader = new DeltaBinaryPackingValuesReader();
+
+    Utils.writeData(writer, values);
+    int[] bin = Utils.readInts(reader, writer.getBytes().toByteArray(), values.length);
+
+    for(int i =0; i< bin.length ; i++) {
+      Assert.assertEquals(values[i].length(), bin[i]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
new file mode 100644
index 0000000..f5f9d76
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
@@ -0,0 +1,71 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltalengthbytearray.benchmark;
+
+import java.io.IOException;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.column.values.Utils;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
+import org.apache.parquet.column.values.plain.BinaryPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.io.api.Binary;
+
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.BenchmarkRule;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
+
+@AxisRange(min = 0, max = 1)
+@BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random")
+public class BenchmarkDeltaLengthByteArray {
+
+  @Rule
+  public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule();
+
+  String[] values = Utils.getRandomStringSamples(1000000, 32);
+
+  @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
+  @Test
+  public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException {
+    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+    BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
+
+    Utils.writeData(writer, values);
+    byte [] data = writer.getBytes().toByteArray();
+    Binary[] bin = Utils.readData(reader, data, values.length);
+    System.out.println("size " + data.length);
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
+  @Test
+  public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
+    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+    DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
+
+    Utils.writeData(writer, values);
+    byte [] data = writer.getBytes().toByteArray();
+    Binary[] bin = Utils.readData(reader, data, values.length);
+    System.out.println("size " + data.length);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
new file mode 100644
index 0000000..693557a
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
@@ -0,0 +1,84 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltastrings;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.apache.parquet.column.values.Utils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+public class TestDeltaByteArray {
+
+  static String[] values = {"parquet-mr", "parquet", "parquet-format"};
+  static String[] randvalues = Utils.getRandomStringSamples(10000, 32);
+
+  @Test
+  public void testSerialization () throws IOException {
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayReader reader = new DeltaByteArrayReader();
+
+    Utils.writeData(writer, values);
+    Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
+
+    for(int i =0; i< bin.length ; i++) {
+      Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
+    }
+  }
+
+  @Test
+  public void testRandomStrings() throws IOException {
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayReader reader = new DeltaByteArrayReader();
+
+    Utils.writeData(writer, randvalues);
+    Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), randvalues.length);
+
+    for(int i =0; i< bin.length ; i++) {
+      Assert.assertEquals(Binary.fromString(randvalues[i]), bin[i]);
+    }
+  }
+
+  @Test
+  public void testLengths() throws IOException {
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    ValuesReader reader = new DeltaBinaryPackingValuesReader();
+
+    Utils.writeData(writer, values);
+    byte[] data = writer.getBytes().toByteArray();
+    int[] bin = Utils.readInts(reader, data, values.length);
+
+    // test prefix lengths
+    Assert.assertEquals(0, bin[0]);
+    Assert.assertEquals(7, bin[1]);
+    Assert.assertEquals(7, bin[2]);
+
+    int offset = reader.getNextOffset();
+    reader = new DeltaBinaryPackingValuesReader();
+    bin = Utils.readInts(reader, writer.getBytes().toByteArray(), offset, values.length);
+    // test suffix lengths
+    Assert.assertEquals(10, bin[0]);
+    Assert.assertEquals(0, bin[1]);
+    Assert.assertEquals(7, bin[2]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
new file mode 100644
index 0000000..c61ef30
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
@@ -0,0 +1,101 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltastrings.benchmark;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.parquet.column.values.Utils;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import org.apache.parquet.column.values.plain.BinaryPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.io.api.Binary;
+
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.BenchmarkRule;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
+
+@AxisRange(min = 0, max = 1)
+@BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random")
+public class BenchmarkDeltaByteArray {
+
+  @Rule
+  public org.junit.rules.TestRule benchmarkRun = new BenchmarkRule();
+
+  static String[] values = Utils.getRandomStringSamples(1000000, 32);
+  static String[] sortedVals;
+  static
+  {
+   sortedVals = Arrays.copyOf(values, values.length);
+   Arrays.sort(sortedVals);
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
+  @Test
+  public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException {
+    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+    BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
+
+    Utils.writeData(writer, values);
+    byte [] data = writer.getBytes().toByteArray();
+    Binary[] bin = Utils.readData(reader, data, values.length);
+    System.out.println("size " + data.length);
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
+  @Test
+  public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayReader reader = new DeltaByteArrayReader();
+
+    Utils.writeData(writer, values);
+    byte [] data = writer.getBytes().toByteArray();
+    Binary[] bin = Utils.readData(reader, data, values.length);
+    System.out.println("size " + data.length);
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
+  @Test
+  public void benchmarkSortedStringsWithPlainValuesWriter() throws IOException {
+    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+    BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
+
+    Utils.writeData(writer, sortedVals);
+    byte [] data = writer.getBytes().toByteArray();
+    Binary[] bin = Utils.readData(reader, data, values.length);
+    System.out.println("size " + data.length);
+  }
+
+  @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
+  @Test
+  public void benchmarkSortedStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayReader reader = new DeltaByteArrayReader();
+
+    Utils.writeData(writer, sortedVals);
+    byte [] data = writer.getBytes().toByteArray();
+    Binary[] bin = Utils.readData(reader, data, values.length);
+    System.out.println("size " + data.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
new file mode 100644
index 0000000..e60b3ec
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
@@ -0,0 +1,531 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+import org.apache.parquet.column.values.plain.BinaryPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+public class TestDictionary {
+
+  private <I extends DictionaryValuesWriter> FallbackValuesWriter<I, PlainValuesWriter> plainFallBack(I dvw, int initialSize) {
+    return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5));
+  }
+
+  private FallbackValuesWriter<PlainBinaryDictionaryValuesWriter, PlainValuesWriter> newPlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  private FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> newPlainLongDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  private FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> newPlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  private FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> newPlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  private FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> newPlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  @Test
+  public void testBinaryDictionary() throws IOException {
+    int COUNT = 100;
+    ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(200, 10000);
+    writeRepeated(COUNT, cw, "a");
+    BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    writeRepeated(COUNT, cw, "b");
+    BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    // now we will fall back
+    writeDistinct(COUNT, cw, "c");
+    BytesInput bytes3 = getBytesAndCheckEncoding(cw, PLAIN);
+
+    DictionaryValuesReader cr = initDicReader(cw, BINARY);
+    checkRepeated(COUNT, bytes1, cr, "a");
+    checkRepeated(COUNT, bytes2, cr, "b");
+    BinaryPlainValuesReader cr2 = new BinaryPlainValuesReader();
+    checkDistinct(COUNT, bytes3, cr2, "c");
+  }
+
+  @Test
+  public void testBinaryDictionaryFallBack() throws IOException {
+    int slabSize = 100;
+    int maxDictionaryByteSize = 50;
+    final ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+    int fallBackThreshold = maxDictionaryByteSize;
+    int dataSize=0;
+    for (long i = 0; i < 100; i++) {
+      Binary binary = Binary.fromString("str" + i);
+      cw.writeBytes(binary);
+      dataSize += (binary.length() + 4);
+      if (dataSize < fallBackThreshold) {
+        assertEquals(PLAIN_DICTIONARY, cw.getEncoding());
+      } else {
+        assertEquals(PLAIN, cw.getEncoding());
+      }
+    }
+
+    //Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
+    ValuesReader reader = new BinaryPlainValuesReader();
+    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+
+    for (long i = 0; i < 100; i++) {
+      assertEquals(Binary.fromString("str" + i), reader.readBytes());
+    }
+
+    //simulate cutting the page
+    cw.reset();
+    assertEquals(0, cw.getBufferedSize());
+  }
+
+  @Test
+  public void testBinaryDictionaryChangedValues() throws IOException {
+    int COUNT = 100;
+    ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(200, 10000);
+    writeRepeatedWithReuse(COUNT, cw, "a");
+    BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    writeRepeatedWithReuse(COUNT, cw, "b");
+    BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    // now we will fall back
+    writeDistinct(COUNT, cw, "c");
+    BytesInput bytes3 = getBytesAndCheckEncoding(cw, PLAIN);
+
+    DictionaryValuesReader cr = initDicReader(cw, BINARY);
+    checkRepeated(COUNT, bytes1, cr, "a");
+    checkRepeated(COUNT, bytes2, cr, "b");
+    BinaryPlainValuesReader cr2 = new BinaryPlainValuesReader();
+    checkDistinct(COUNT, bytes3, cr2, "c");
+  }
+
+  @Test
+  public void testFirstPageFallBack() throws IOException {
+    int COUNT = 1000;
+    ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(10000, 10000);
+    writeDistinct(COUNT, cw, "a");
+    // not efficient so falls back
+    BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN);
+    writeRepeated(COUNT, cw, "b");
+    // still plain because we fell back on first page
+    BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN);
+
+    ValuesReader cr = new BinaryPlainValuesReader();
+    checkDistinct(COUNT, bytes1, cr, "a");
+    checkRepeated(COUNT, bytes2, cr, "b");
+
+  }
+
+  @Test
+  public void testSecondPageFallBack() throws IOException {
+    int COUNT = 1000;
+    ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(1000, 10000);
+    writeRepeated(COUNT, cw, "a");
+    BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    writeDistinct(COUNT, cw, "b");
+    // not efficient so falls back
+    BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN);
+    writeRepeated(COUNT, cw, "a");
+    // still plain because we fell back on previous page
+    BytesInput bytes3 = getBytesAndCheckEncoding(cw, PLAIN);
+
+    ValuesReader cr = initDicReader(cw, BINARY);
+    checkRepeated(COUNT, bytes1, cr, "a");
+    cr = new BinaryPlainValuesReader();
+    checkDistinct(COUNT, bytes2, cr, "b");
+    checkRepeated(COUNT, bytes3, cr, "a");
+  }
+
+  @Test
+  public void testLongDictionary() throws IOException {
+    int COUNT = 1000;
+    int COUNT2 = 2000;
+    final FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> cw = newPlainLongDictionaryValuesWriter(10000, 10000);
+    for (long i = 0; i < COUNT; i++) {
+      cw.writeLong(i % 50);
+    }
+    BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    assertEquals(50, cw.initialWriter.getDictionarySize());
+
+    for (long i = COUNT2; i > 0; i--) {
+      cw.writeLong(i % 50);
+    }
+    BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    assertEquals(50, cw.initialWriter.getDictionarySize());
+
+    DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64);
+
+    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    for (long i = 0; i < COUNT; i++) {
+      long back = cr.readLong();
+      assertEquals(i % 50, back);
+    }
+
+    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    for (long i = COUNT2; i > 0; i--) {
+      long back = cr.readLong();
+      assertEquals(i % 50, back);
+    }
+  }
+
+  private void roundTripLong(FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
+    int fallBackThreshold = maxDictionaryByteSize / 8;
+    for (long i = 0; i < 100; i++) {
+      cw.writeLong(i);
+      if (i < fallBackThreshold) {
+        assertEquals(cw.getEncoding(), PLAIN_DICTIONARY);
+      } else {
+        assertEquals(cw.getEncoding(), PLAIN);
+      }
+    }
+
+    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+
+    for (long i = 0; i < 100; i++) {
+      assertEquals(i, reader.readLong());
+    }
+  }
+
+  @Test
+  public void testLongDictionaryFallBack() throws IOException {
+    int slabSize = 100;
+    int maxDictionaryByteSize = 50;
+    final FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> cw = newPlainLongDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+    // Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
+    ValuesReader reader = new PlainValuesReader.LongPlainValuesReader();
+
+    roundTripLong(cw, reader, maxDictionaryByteSize);
+    //simulate cutting the page
+    cw.reset();
+    assertEquals(0,cw.getBufferedSize());
+    cw.resetDictionary();
+
+    roundTripLong(cw, reader, maxDictionaryByteSize);
+  }
+
+  @Test
+  public void testDoubleDictionary() throws IOException {
+
+    int COUNT = 1000;
+    int COUNT2 = 2000;
+    final FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> cw = newPlainDoubleDictionaryValuesWriter(10000, 10000);
+
+    for (double i = 0; i < COUNT; i++) {
+      cw.writeDouble(i % 50);
+    }
+
+    BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    assertEquals(50, cw.initialWriter.getDictionarySize());
+
+    for (double i = COUNT2; i > 0; i--) {
+      cw.writeDouble(i % 50);
+    }
+    BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    assertEquals(50, cw.initialWriter.getDictionarySize());
+
+    final DictionaryValuesReader cr = initDicReader(cw, DOUBLE);
+
+    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    for (double i = 0; i < COUNT; i++) {
+      double back = cr.readDouble();
+      assertEquals(i % 50, back, 0.0);
+    }
+
+    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    for (double i = COUNT2; i > 0; i--) {
+      double back = cr.readDouble();
+      assertEquals(i % 50, back, 0.0);
+    }
+
+  }
+
+  private void roundTripDouble(FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
+    int fallBackThreshold = maxDictionaryByteSize / 8;
+    for (double i = 0; i < 100; i++) {
+      cw.writeDouble(i);
+      if (i < fallBackThreshold) {
+        assertEquals(cw.getEncoding(), PLAIN_DICTIONARY);
+      } else {
+        assertEquals(cw.getEncoding(), PLAIN);
+      }
+    }
+
+    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+
+    for (double i = 0; i < 100; i++) {
+      assertEquals(i, reader.readDouble(), 0.00001);
+    }
+  }
+
+  @Test
+  public void testDoubleDictionaryFallBack() throws IOException {
+    int slabSize = 100;
+    int maxDictionaryByteSize = 50;
+    final FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> cw = newPlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+
+    // Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
+    ValuesReader reader = new PlainValuesReader.DoublePlainValuesReader();
+
+    roundTripDouble(cw, reader, maxDictionaryByteSize);
+    //simulate cutting the page
+    cw.reset();
+    assertEquals(0,cw.getBufferedSize());
+    cw.resetDictionary();
+
+    roundTripDouble(cw, reader, maxDictionaryByteSize);
+  }
+
+  @Test
+  public void testIntDictionary() throws IOException {
+
+    int COUNT = 2000;
+    int COUNT2 = 4000;
+    final FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> cw = newPlainIntegerDictionaryValuesWriter(10000, 10000);
+
+    for (int i = 0; i < COUNT; i++) {
+      cw.writeInteger(i % 50);
+    }
+    BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    assertEquals(50, cw.initialWriter.getDictionarySize());
+
+    for (int i = COUNT2; i > 0; i--) {
+      cw.writeInteger(i % 50);
+    }
+    BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    assertEquals(50, cw.initialWriter.getDictionarySize());
+
+    DictionaryValuesReader cr = initDicReader(cw, INT32);
+
+    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    for (int i = 0; i < COUNT; i++) {
+      int back = cr.readInteger();
+      assertEquals(i % 50, back);
+    }
+
+    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    for (int i = COUNT2; i > 0; i--) {
+      int back = cr.readInteger();
+      assertEquals(i % 50, back);
+    }
+
+  }
+
+  private void roundTripInt(FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
+    int fallBackThreshold = maxDictionaryByteSize / 4;
+    for (int i = 0; i < 100; i++) {
+      cw.writeInteger(i);
+      if (i < fallBackThreshold) {
+        assertEquals(cw.getEncoding(), PLAIN_DICTIONARY);
+      } else {
+        assertEquals(cw.getEncoding(), PLAIN);
+      }
+    }
+
+    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+
+    for (int i = 0; i < 100; i++) {
+      assertEquals(i, reader.readInteger());
+    }
+  }
+
+  @Test
+  public void testIntDictionaryFallBack() throws IOException {
+    int slabSize = 100;
+    int maxDictionaryByteSize = 50;
+    final FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> cw = newPlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+
+    // Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
+    ValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader();
+
+    roundTripInt(cw, reader, maxDictionaryByteSize);
+    //simulate cutting the page
+    cw.reset();
+    assertEquals(0,cw.getBufferedSize());
+    cw.resetDictionary();
+
+    roundTripInt(cw, reader, maxDictionaryByteSize);
+  }
+
+  @Test
+  public void testFloatDictionary() throws IOException {
+
+    int COUNT = 2000;
+    int COUNT2 = 4000;
+    final FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> cw = newPlainFloatDictionaryValuesWriter(10000, 10000);
+
+    for (float i = 0; i < COUNT; i++) {
+      cw.writeFloat(i % 50);
+    }
+    BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    assertEquals(50, cw.initialWriter.getDictionarySize());
+
+    for (float i = COUNT2; i > 0; i--) {
+      cw.writeFloat(i % 50);
+    }
+    BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    assertEquals(50, cw.initialWriter.getDictionarySize());
+
+    DictionaryValuesReader cr = initDicReader(cw, FLOAT);
+
+    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    for (float i = 0; i < COUNT; i++) {
+      float back = cr.readFloat();
+      assertEquals(i % 50, back, 0.0f);
+    }
+
+    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    for (float i = COUNT2; i > 0; i--) {
+      float back = cr.readFloat();
+      assertEquals(i % 50, back, 0.0f);
+    }
+
+  }
+
+  private void roundTripFloat(FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
+    int fallBackThreshold = maxDictionaryByteSize / 4;
+    for (float i = 0; i < 100; i++) {
+      cw.writeFloat(i);
+      if (i < fallBackThreshold) {
+        assertEquals(cw.getEncoding(), PLAIN_DICTIONARY);
+      } else {
+        assertEquals(cw.getEncoding(), PLAIN);
+      }
+    }
+
+    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+
+    for (float i = 0; i < 100; i++) {
+      assertEquals(i, reader.readFloat(), 0.00001);
+    }
+  }
+
+  @Test
+  public void testFloatDictionaryFallBack() throws IOException {
+    int slabSize = 100;
+    int maxDictionaryByteSize = 50;
+    final FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> cw = newPlainFloatDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+
+    // Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
+    ValuesReader reader = new PlainValuesReader.FloatPlainValuesReader();
+
+    roundTripFloat(cw, reader, maxDictionaryByteSize);
+    //simulate cutting the page
+    cw.reset();
+    assertEquals(0,cw.getBufferedSize());
+    cw.resetDictionary();
+
+    roundTripFloat(cw, reader, maxDictionaryByteSize);
+  }
+
+  @Test
+  public void testZeroValues() throws IOException {
+    FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> cw = newPlainIntegerDictionaryValuesWriter(100, 100);
+    cw.writeInteger(34);
+    cw.writeInteger(34);
+    getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
+    DictionaryValuesReader reader = initDicReader(cw, INT32);
+
+    // pretend there are 100 nulls. what matters is offset = bytes.length.
+    byte[] bytes = {0x00, 0x01, 0x02, 0x03}; // data doesn't matter
+    int offset = bytes.length;
+    reader.initFromPage(100, bytes, offset);
+  }
+
+  private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type)
+      throws IOException {
+    final DictionaryPage dictionaryPage = cw.createDictionaryPage().copy();
+    final ColumnDescriptor descriptor = new ColumnDescriptor(new String[] {"foo"}, type, 0, 0);
+    final Dictionary dictionary = PLAIN.initDictionary(descriptor, dictionaryPage);
+    final DictionaryValuesReader cr = new DictionaryValuesReader(dictionary);
+    return cr;
+  }
+
+  private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
+    cr.initFromPage(COUNT, bytes.toByteArray(), 0);
+    for (int i = 0; i < COUNT; i++) {
+      Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8());
+    }
+  }
+
+  private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
+    cr.initFromPage(COUNT, bytes.toByteArray(), 0);
+    for (int i = 0; i < COUNT; i++) {
+      Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8());
+    }
+  }
+
+  private void writeDistinct(int COUNT, ValuesWriter cw, String prefix) {
+    for (int i = 0; i < COUNT; i++) {
+      cw.writeBytes(Binary.fromString(prefix + i));
+    }
+  }
+
+  private void writeRepeated(int COUNT, ValuesWriter cw, String prefix) {
+    for (int i = 0; i < COUNT; i++) {
+      cw.writeBytes(Binary.fromString(prefix + i % 10));
+    }
+  }
+
+  private void writeRepeatedWithReuse(int COUNT, ValuesWriter cw, String prefix) {
+    Binary reused = Binary.fromString(prefix + "0");
+    for (int i = 0; i < COUNT; i++) {
+      Binary content = Binary.fromString(prefix + i % 10);
+      System.arraycopy(content.getBytes(), 0, reused.getBytes(), 0, reused.length());
+      cw.writeBytes(reused);
+    }
+  }
+
+  private BytesInput getBytesAndCheckEncoding(ValuesWriter cw, Encoding encoding)
+      throws IOException {
+    BytesInput bytes = BytesInput.copy(cw.getBytes());
+    assertEquals(encoding, cw.getEncoding());
+    cw.reset();
+    return bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
new file mode 100644
index 0000000..707a507
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
@@ -0,0 +1,99 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.rle;
+
+import java.io.ByteArrayInputStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Alex Levenson
+ */
+public class RunLengthBitPackingHybridIntegrationTest {
+
+  @Test
+  public void integrationTest() throws Exception {
+    for (int i = 0; i <= 32; i++) {
+      doIntegrationTest(i);
+    }
+  }
+
+  private void doIntegrationTest(int bitWidth) throws Exception {
+    long modValue = 1L << bitWidth;
+
+    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000);
+    int numValues = 0;
+
+    for (int i = 0; i < 100; i++) {
+      encoder.writeInt((int) (i % modValue));
+    }
+    numValues += 100;
+
+    for (int i = 0; i < 100; i++) {
+      encoder.writeInt((int) (77 % modValue));
+    }
+    numValues += 100;
+
+    for (int i = 0; i < 100; i++) {
+      encoder.writeInt((int) (88 % modValue));
+    }
+    numValues += 100;
+
+    for (int i = 0; i < 1000; i++) {
+      encoder.writeInt((int) (i % modValue));
+      encoder.writeInt((int) (i % modValue));
+      encoder.writeInt((int) (i % modValue));
+    }
+    numValues += 3000;
+
+    for (int i = 0; i < 1000; i++) {
+      encoder.writeInt((int) (17 % modValue));
+    }
+    numValues += 1000;
+
+    byte[] encodedBytes = encoder.toBytes().toByteArray();
+    ByteArrayInputStream in = new ByteArrayInputStream(encodedBytes);
+
+    RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
+
+    for (int i = 0; i < 100; i++) {
+      assertEquals(i % modValue, decoder.readInt());
+    }
+
+    for (int i = 0; i < 100; i++) {
+      assertEquals(77 % modValue, decoder.readInt());
+    }
+
+    for (int i = 0; i < 100; i++) {
+      assertEquals(88 % modValue, decoder.readInt());
+    }
+
+    for (int i = 0; i < 1000; i++) {
+      assertEquals(i % modValue, decoder.readInt());
+      assertEquals(i % modValue, decoder.readInt());
+      assertEquals(i % modValue, decoder.readInt());
+    }
+
+    for (int i = 0; i < 1000; i++) {
+      assertEquals(17 % modValue, decoder.readInt());
+    }
+  }
+}


Mime
View raw message