parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject incubator-parquet-mr git commit: PARQUET-52: refactor fallback mechanism
Date Tue, 25 Nov 2014 18:49:11 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 3aa6f1178 -> ad06e6114


PARQUET-52: refactor fallback mechanism

See: https://issues.apache.org/jira/browse/PARQUET-52
Context:
In the ValuesWriter API there is a mechanism to return the Encoding actually used which allows to fallback to a different encoding.
For example the dictionary encoding may fail if there are too many distinct values and the dictionary grows too big. In such cases the DictionaryValuesWriter was falling back to the Plain encoding.
This can happen as well if the space savings are not satisfying when writing the first page and we prefer to fallback to a more light weight encoding.
With Parquet 2.0 we are adding new encodings and the fall back is not necessarily Plain anymore.
This Pull Request decouple the fallback mechanism from Dictionary and Plain encodings and allows to reuse the fallback logic with other encodings.
One could imagine more than one level of fallback in the future by chaining the FallBackValuesWriter.

Author: julien <julien@twitter.com>

Closes #74 from julienledem/fallback and squashes the following commits:

b74a4ca [julien] Merge branch 'master' into fallback
d9abd62 [julien] better naming
aa90caf [julien] exclude values encoding from SemVer
10f295e [julien] better test setup
c516bd9 [julien] improve test
780c4c3 [julien] license header
f16311a [julien] javadoc
aeb8084 [julien] add more test; fix dic decoding
0793399 [julien] Merge branch 'master' into fallback
2638ec9 [julien] fix dictionary encoding labelling
2fd9372 [julien] consistent naming
cf7a734 [julien] rewrite ParquetProperties to enable proper fallback
bf1474a [julien] refactor fallback mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/ad06e611
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/ad06e611
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/ad06e611

Branch: refs/heads/master
Commit: ad06e61143d6ad3d883907e75100014b9554c357
Parents: 3aa6f11
Author: julien <julien@twitter.com>
Authored: Tue Nov 25 10:48:54 2014 -0800
Committer: julien <julien@twitter.com>
Committed: Tue Nov 25 10:48:54 2014 -0800

----------------------------------------------------------------------
 .../src/main/java/parquet/column/Encoding.java  |  87 +++--
 .../java/parquet/column/ParquetProperties.java  | 174 ++++++----
 .../parquet/column/values/RequiresFallback.java |  51 +++
 .../dictionary/DictionaryValuesWriter.java      | 328 +++++++------------
 .../dictionary/PlainValuesDictionary.java       |   4 +-
 .../values/fallback/FallbackValuesWriter.java   | 187 +++++++++++
 .../values/dictionary/TestDictionary.java       | 125 ++++---
 .../hadoop/example/GroupWriteSupport.java       |   6 +-
 .../parquet/hadoop/TestParquetFileWriter.java   |   4 +-
 .../java/parquet/hadoop/TestParquetWriter.java  | 111 +++++++
 .../src/test/java/parquet/hadoop/TestUtils.java |  22 ++
 pom.xml                                         |   1 +
 12 files changed, 749 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-column/src/main/java/parquet/column/Encoding.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/Encoding.java b/parquet-column/src/main/java/parquet/column/Encoding.java
index 57af085..adc2100 100644
--- a/parquet-column/src/main/java/parquet/column/Encoding.java
+++ b/parquet-column/src/main/java/parquet/column/Encoding.java
@@ -78,6 +78,29 @@ public enum Encoding {
         throw new ParquetDecodingException("no plain reader for type " + descriptor.getType());
       }
     }
+
+    @Override
+    public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
+      switch (descriptor.getType()) {
+      case BINARY:
+        return new PlainBinaryDictionary(dictionaryPage);
+      case FIXED_LEN_BYTE_ARRAY:
+        return new PlainBinaryDictionary(dictionaryPage, descriptor.getTypeLength());
+      case INT96:
+        return new PlainBinaryDictionary(dictionaryPage, 12);
+      case INT64:
+        return new PlainLongDictionary(dictionaryPage);
+      case DOUBLE:
+        return new PlainDoubleDictionary(dictionaryPage);
+      case INT32:
+        return new PlainIntegerDictionary(dictionaryPage);
+      case FLOAT:
+        return new PlainFloatDictionary(dictionaryPage);
+      default:
+        throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
+      }
+
+    }
   },
 
   /**
@@ -96,7 +119,7 @@ public enum Encoding {
   },
 
   /**
-   * This is no longer used, and has been replaced by {@link #RLE}
+   * @deprecated This is no longer used, and has been replaced by {@link #RLE}
    * which is combination of bit packing and rle
    */
   @Deprecated
@@ -107,44 +130,19 @@ public enum Encoding {
     }
   },
 
+  /**
+   * @deprecated now replaced by RLE_DICTIONARY for the data page encoding and PLAIN for the dictionary page encoding
+   */
+  @Deprecated
   PLAIN_DICTIONARY {
     @Override
     public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
-      switch (descriptor.getType()) {
-      case BINARY:
-      case FIXED_LEN_BYTE_ARRAY:
-      case INT96:
-      case INT64:
-      case DOUBLE:
-      case INT32:
-      case FLOAT:
-        return new DictionaryValuesReader(dictionary);
-      default:
-        throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
-      }
+      return RLE_DICTIONARY.getDictionaryBasedValuesReader(descriptor, valuesType, dictionary);
     }
 
     @Override
     public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
-      switch (descriptor.getType()) {
-      case BINARY:
-        return new PlainBinaryDictionary(dictionaryPage);
-      case FIXED_LEN_BYTE_ARRAY:
-        return new PlainBinaryDictionary(dictionaryPage, descriptor.getTypeLength());
-      case INT96:
-        return new PlainBinaryDictionary(dictionaryPage, 12);
-      case INT64:
-        return new PlainLongDictionary(dictionaryPage);
-      case DOUBLE:
-        return new PlainDoubleDictionary(dictionaryPage);
-      case INT32:
-        return new PlainIntegerDictionary(dictionaryPage);
-      case FLOAT:
-        return new PlainFloatDictionary(dictionaryPage);
-      default:
-        throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
-      }
-
+      return PLAIN.initDictionary(descriptor, dictionaryPage);
     }
 
     @Override
@@ -201,7 +199,30 @@ public enum Encoding {
   /**
    * Dictionary encoding: the ids are encoded using the RLE encoding
    */
-  RLE_DICTIONARY;
+  RLE_DICTIONARY {
+
+    @Override
+    public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
+      switch (descriptor.getType()) {
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+      case INT96:
+      case INT64:
+      case DOUBLE:
+      case INT32:
+      case FLOAT:
+        return new DictionaryValuesReader(dictionary);
+      default:
+        throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
+      }
+    }
+
+    @Override
+    public boolean usesDictionary() {
+      return true;
+    }
+
+  };
 
   int getMaxLevel(ColumnDescriptor descriptor, ValuesType valuesType) {
     int maxLevel;

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-column/src/main/java/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
index 26b900d..aea02ad 100644
--- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
@@ -1,16 +1,21 @@
 package parquet.column;
 
-import parquet.bytes.BytesUtils;
+import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
+import static parquet.column.Encoding.PLAIN;
+import static parquet.column.Encoding.PLAIN_DICTIONARY;
+import static parquet.column.Encoding.RLE_DICTIONARY;
 import parquet.column.values.ValuesWriter;
 import parquet.column.values.boundedint.DevNullValuesWriter;
 import parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
 import parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import parquet.column.values.dictionary.DictionaryValuesWriter;
 import parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
 import parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
+import parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
 import parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
 import parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
 import parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
+import parquet.column.values.fallback.FallbackValuesWriter;
 import parquet.column.values.plain.BooleanPlainValuesWriter;
 import parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
 import parquet.column.values.plain.PlainValuesWriter;
@@ -18,24 +23,24 @@ import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 
 /**
  * This class represents all the configurable Parquet properties.
- * 
+ *
  * @author amokashi
  *
  */
 public class ParquetProperties {
-  
+
   public enum WriterVersion {
     PARQUET_1_0 ("v1"),
     PARQUET_2_0 ("v2");
-    
+
     private final String shortName;
-    
+
     WriterVersion(String shortname) {
       this.shortName = shortname;
     }
-    
+
     public static WriterVersion fromString(String name) {
-      for(WriterVersion v : WriterVersion.values()) {
+      for (WriterVersion v : WriterVersion.values()) {
         if (v.shortName.equals(name)) {
           return v;
         }
@@ -53,81 +58,130 @@ public class ParquetProperties {
     this.writerVersion = writerVersion;
     this.enableDictionary = enableDict;
   }
-  
+
   public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel,  int initialSizePerCol) {
     if (maxLevel == 0) {
       return new DevNullValuesWriter();
     } else {
       return new RunLengthBitPackingHybridValuesWriter(
-          BytesUtils.getWidthFromMaxInt(maxLevel), initialSizePerCol);
+          getWidthFromMaxInt(maxLevel), initialSizePerCol);
     }
   }
 
-  public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) {
+  private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol) {
     switch (path.getType()) {
     case BOOLEAN:
-      if(writerVersion == WriterVersion.PARQUET_1_0) {
-        return new BooleanPlainValuesWriter();
-      } else if (writerVersion == WriterVersion.PARQUET_2_0) {
-        return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol);
-      }
-      break;
+      return new BooleanPlainValuesWriter();
+    case INT96:
+      return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol);
+    case FIXED_LEN_BYTE_ARRAY:
+      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
     case BINARY:
-      if(enableDictionary) {
-        return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
-      } else {
-        if (writerVersion == WriterVersion.PARQUET_1_0) {
-          return new PlainValuesWriter(initialSizePerCol);
-        } else if (writerVersion == WriterVersion.PARQUET_2_0) {
-          return new DeltaByteArrayWriter(initialSizePerCol);
-        } 
-      }
-      break;
     case INT32:
-      if(enableDictionary) {
-        return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
-      } else {
-        if(writerVersion == WriterVersion.PARQUET_1_0) {
-          return new PlainValuesWriter(initialSizePerCol);
-        } else if(writerVersion == WriterVersion.PARQUET_2_0) {
-          return new DeltaBinaryPackingValuesWriter(initialSizePerCol);
-        }
-      }
+    case INT64:
+    case DOUBLE:
+    case FLOAT:
+      return new PlainValuesWriter(initialSizePerCol);
+    default:
+      throw new IllegalArgumentException("Unknown type " + path.getType());
+    }
+  }
+
+  private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initialSizePerCol) {
+    Encoding encodingForDataPage;
+    Encoding encodingForDictionaryPage;
+    switch(writerVersion) {
+    case PARQUET_1_0:
+      encodingForDataPage = PLAIN_DICTIONARY;
+      encodingForDictionaryPage = PLAIN_DICTIONARY;
+      break;
+    case PARQUET_2_0:
+      encodingForDataPage = RLE_DICTIONARY;
+      encodingForDictionaryPage = PLAIN;
       break;
+    default:
+      throw new IllegalArgumentException("Unknown version: " + writerVersion);
+    }
+    switch (path.getType()) {
+    case BOOLEAN:
+      throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
+    case BINARY:
+      return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+    case INT32:
+      return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
     case INT64:
-      if(enableDictionary) {
-        return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
-      } else {
-        return new PlainValuesWriter(initialSizePerCol);
-      }
+      return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
     case INT96:
-      if (enableDictionary) {
-        return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, 12);
-      } else {
-        return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol);
-      }
+      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage);
     case DOUBLE:
-      if(enableDictionary) {
-        return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
-      } else {
-        return new PlainValuesWriter(initialSizePerCol);
-      }
+      return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
     case FLOAT:
-      if(enableDictionary) {
-        return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol);
-      } else {
-        return new PlainValuesWriter(initialSizePerCol);
+      return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+    case FIXED_LEN_BYTE_ARRAY:
+      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage);
+    default:
+      throw new IllegalArgumentException("Unknown type " + path.getType());
+    }
+  }
+
+  private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol) {
+    switch(writerVersion) {
+    case PARQUET_1_0:
+      return plainWriter(path, initialSizePerCol);
+    case PARQUET_2_0:
+      switch (path.getType()) {
+      case BOOLEAN:
+        return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol);
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+        return new DeltaByteArrayWriter(initialSizePerCol);
+      case INT32:
+        return new DeltaBinaryPackingValuesWriter(initialSizePerCol);
+      case INT96:
+      case INT64:
+      case DOUBLE:
+      case FLOAT:
+        return plainWriter(path, initialSizePerCol);
+      default:
+        throw new IllegalArgumentException("Unknown type " + path.getType());
       }
+    default:
+      throw new IllegalArgumentException("Unknown version: " + writerVersion);
+    }
+  }
+
+  private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol) {
+    ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol);
+    if (enableDictionary) {
+      return FallbackValuesWriter.of(
+          dictionaryWriter(path, initialSizePerCol),
+          writerToFallBackTo);
+    } else {
+     return writerToFallBackTo;
+    }
+  }
+
+  public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol) {
+    switch (path.getType()) {
+    case BOOLEAN: // no dictionary encoding for boolean
+      return writerToFallbackTo(path, initialSizePerCol);
     case FIXED_LEN_BYTE_ARRAY:
-      if (enableDictionary && (writerVersion == WriterVersion.PARQUET_2_0)) {
-        return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, initialSizePerCol, path.getTypeLength());
+      // dictionary encoding for that type was not enabled in PARQUET 1.0
+      if (writerVersion == WriterVersion.PARQUET_2_0) {
+        return dictWriterWithFallBack(path, initialSizePerCol);
       } else {
-        return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol);
+       return writerToFallbackTo(path, initialSizePerCol);
       }
+    case BINARY:
+    case INT32:
+    case INT64:
+    case INT96:
+    case DOUBLE:
+    case FLOAT:
+      return dictWriterWithFallBack(path, initialSizePerCol);
     default:
-      return new PlainValuesWriter(initialSizePerCol);
+      throw new IllegalArgumentException("Unknown type " + path.getType());
     }
-    return null;
   }
 
   public int getDictionaryPageSizeThreshold() {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-column/src/main/java/parquet/column/values/RequiresFallback.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/RequiresFallback.java b/parquet-column/src/main/java/parquet/column/values/RequiresFallback.java
new file mode 100644
index 0000000..f9e2d5d
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/values/RequiresFallback.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.values;
+
+import parquet.column.values.fallback.FallbackValuesWriter;
+
+/**
+ *
+ * Used to add extra behavior to a ValuesWriter that requires fallback
+ * @See {@link FallbackValuesWriter}
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface RequiresFallback {
+
+  /**
+   * In the case of a dictionary based encoding we will fallback if the dictionary becomes too big
+   * @return true to notify the parent that we should fallback to another encoding
+   */
+  boolean shouldFallBack();
+
+  /**
+   * Before writing the first page we will verify if the encoding is worth it.
+   * and fall back if a simpler encoding would be better in that case
+   * @param rawSize the size if encoded with plain
+   * @param encodedSize the size as encoded by the current encoding
+   * @return true if we keep this encoding
+   */
+  boolean isCompressionSatisfying(long rawSize, long encodedSize);
+
+  /**
+   * When falling back to a different encoding we must re-encode all the values seen so far
+   * @param writer the new encoder to write the current values to
+   */
+  void fallBackAllValuesTo(ValuesWriter writer);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java
index 4379360..624aa30 100644
--- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java
+++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesWriter.java
@@ -17,7 +17,6 @@ package parquet.column.values.dictionary;
 
 import static parquet.Log.DEBUG;
 import static parquet.bytes.BytesInput.concat;
-import static parquet.column.Encoding.PLAIN_DICTIONARY;
 import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap;
 import it.unimi.dsi.fastutil.doubles.Double2IntMap;
 import it.unimi.dsi.fastutil.doubles.DoubleIterator;
@@ -42,6 +41,7 @@ import parquet.bytes.BytesInput;
 import parquet.bytes.BytesUtils;
 import parquet.column.Encoding;
 import parquet.column.page.DictionaryPage;
+import parquet.column.values.RequiresFallback;
 import parquet.column.values.ValuesWriter;
 import parquet.column.values.dictionary.IntList.IntIterator;
 import parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
@@ -57,18 +57,21 @@ import parquet.io.api.Binary;
  * @author Julien Le Dem
  *
  */
-public abstract class DictionaryValuesWriter extends ValuesWriter {
+public abstract class DictionaryValuesWriter extends ValuesWriter implements RequiresFallback {
   private static final Log LOG = Log.getLog(DictionaryValuesWriter.class);
 
   /* max entries allowed for the dictionary will fail over to plain encoding if reached */
   private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1;
 
+  /* encoding to label the data page */
+  private final Encoding encodingForDataPage;
+
+  /* encoding to label the dictionary page */
+  protected final Encoding encodingForDictionaryPage;
+
   /* maximum size in bytes allowed for the dictionary will fail over to plain encoding if reached */
   protected final int maxDictionaryByteSize;
 
-  /* contains the values encoded in plain if the dictionary grows too big */
-  protected final ValuesWriter plainValuesWriter;
-
   /* will become true if the dictionary becomes too big */
   protected boolean dictionaryTooBig;
 
@@ -84,51 +87,34 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
   /* dictionary encoded values */
   protected IntList encodedValues = new IntList();
 
-  /* size of raw data, even if dictionary is used, it will not have effect on raw data size, it is used to decide
-   * if fall back to plain encoding is better by comparing rawDataByteSize with Encoded data size
-   * It's also used in getBufferedSize, so the page will be written based on raw data size
-   */
-  protected long rawDataByteSize = 0;
-
-  /** indicates if this is the first page being processed */
-  protected boolean firstPage = true;
-
   /**
    * @param maxDictionaryByteSize
-   * @param initialSize
    */
-  protected DictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+  protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
     this.maxDictionaryByteSize = maxDictionaryByteSize;
-    this.plainValuesWriter = new PlainValuesWriter(initialSize);
+    this.encodingForDataPage = encodingForDataPage;
+    this.encodingForDictionaryPage = encodingForDictionaryPage;
   }
 
-  /**
-   * Construct a DictionaryValuesWriter for fixed-length byte array values.
-   *
-   * @param maxDictionaryByteSize
-   * @param initialSize
-   * @param fixedLength Fixed length for byte arrays
-   */
-  protected DictionaryValuesWriter(int maxDictionaryByteSize, int initialSize, int fixedLength) {
-    this.maxDictionaryByteSize = maxDictionaryByteSize;
-    this.plainValuesWriter = new FixedLenByteArrayPlainValuesWriter(fixedLength, initialSize);
+  protected DictionaryPage dictPage(ValuesWriter dictionaryEncoder) {
+    return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage);
   }
 
-  /**
-   * check the size constraints of the dictionary and fail over to plain values encoding if threshold reached
-   */
-  protected void checkAndFallbackIfNeeded() {
-    if (dictionaryByteSize > maxDictionaryByteSize || getDictionarySize() > MAX_DICTIONARY_ENTRIES) {
-      // if the dictionary reaches the max byte size or the values can not be encoded on 4 bytes anymore.
-      fallBackToPlainEncoding();
-    }
+  @Override
+  public boolean shouldFallBack() {
+    // if the dictionary reaches the max byte size or the values can not be encoded on 4 bytes anymore.
+    return dictionaryByteSize > maxDictionaryByteSize
+        || getDictionarySize() > MAX_DICTIONARY_ENTRIES;
   }
 
-  private void fallBackToPlainEncoding() {
-    if (DEBUG)
-      LOG.debug("dictionary is now too big, falling back to plain: " + dictionaryByteSize + "B and " + getDictionarySize() + " entries");
-    dictionaryTooBig = true;
-    fallBackDictionaryEncodedData();
+  @Override
+  public boolean isCompressionSatisfying(long rawSize, long encodedSize) {
+    return (encodedSize + dictionaryByteSize) < rawSize;
+  }
+
+  @Override
+  public void fallBackAllValuesTo(ValuesWriter writer) {
+    fallBackDictionaryEncodedData(writer);
     if (lastUsedDictionarySize == 0) {
       // if we never used the dictionary
       // we free dictionary encoded data
@@ -138,70 +124,53 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
     }
   }
 
-  protected abstract void fallBackDictionaryEncodedData();
+  abstract protected void fallBackDictionaryEncodedData(ValuesWriter writer);
 
   @Override
   public long getBufferedSize() {
-    // use raw data size to decide if we want to flush the page
-    // so the acutual size of the page written could be much more smaller
-    // due to dictionary encoding. This prevents page being to big when fallback happens.
-    return rawDataByteSize;
+    return encodedValues.size() * 4;
   }
 
   @Override
   public long getAllocatedSize() {
     // size used in memory
-    return encodedValues.size() * 4 + dictionaryByteSize + plainValuesWriter.getAllocatedSize();
+    return encodedValues.size() * 4 + dictionaryByteSize;
   }
 
   @Override
   public BytesInput getBytes() {
-    if (!dictionaryTooBig && getDictionarySize() > 0) {
-      int maxDicId = getDictionarySize() - 1;
-      if (DEBUG) LOG.debug("max dic id " + maxDicId);
-      int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
-
-      // TODO: what is a good initialCapacity?
-      RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024);
-      IntIterator iterator = encodedValues.iterator();
-      try {
-        while (iterator.hasNext()) {
-          encoder.writeInt(iterator.next());
-        }
-        // encodes the bit width
-        byte[] bytesHeader = new byte[] { (byte) bitWidth };
-        BytesInput rleEncodedBytes = encoder.toBytes();
-        if (DEBUG) LOG.debug("rle encoded bytes " + rleEncodedBytes.size());
-        BytesInput bytes = concat(BytesInput.from(bytesHeader), rleEncodedBytes);
-        if (firstPage && ((bytes.size() + dictionaryByteSize) > rawDataByteSize)) {
-          fallBackToPlainEncoding();
-        } else {
-          // remember size of dictionary when we last wrote a page
-          lastUsedDictionarySize = getDictionarySize();
-          lastUsedDictionaryByteSize = dictionaryByteSize;
-          return bytes;
-        }
-      } catch (IOException e) {
-        throw new ParquetEncodingException("could not encode the values", e);
+    int maxDicId = getDictionarySize() - 1;
+    if (DEBUG) LOG.debug("max dic id " + maxDicId);
+    int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
+    // TODO: what is a good initialCapacity?
+    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 64 * 1024);
+    IntIterator iterator = encodedValues.iterator();
+    try {
+      while (iterator.hasNext()) {
+        encoder.writeInt(iterator.next());
       }
+      // encodes the bit width
+      byte[] bytesHeader = new byte[] { (byte) bitWidth };
+      BytesInput rleEncodedBytes = encoder.toBytes();
+      if (DEBUG) LOG.debug("rle encoded bytes " + rleEncodedBytes.size());
+      BytesInput bytes = concat(BytesInput.from(bytesHeader), rleEncodedBytes);
+      // remember size of dictionary when we last wrote a page
+      lastUsedDictionarySize = getDictionarySize();
+      lastUsedDictionaryByteSize = dictionaryByteSize;
+      return bytes;
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not encode the values", e);
     }
-    return plainValuesWriter.getBytes();
   }
 
   @Override
   public Encoding getEncoding() {
-    firstPage = false;
-    if (!dictionaryTooBig && getDictionarySize() > 0) {
-      return PLAIN_DICTIONARY;
-    }
-    return plainValuesWriter.getEncoding();
+    return encodingForDataPage;
   }
 
   @Override
   public void reset() {
     encodedValues = new IntList();
-    plainValuesWriter.reset();
-    rawDataByteSize = 0;
   }
 
   @Override
@@ -225,10 +194,11 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
   @Override
   public String memUsageString(String prefix) {
     return String.format(
-        "%s DictionaryValuesWriter{\n%s\n%s\n%s\n%s}\n",
+        "%s DictionaryValuesWriter{\n"
+          + "%s\n"
+          + "%s\n"
+        + "%s}\n",
         prefix,
-        plainValuesWriter.
-        memUsageString(prefix + " plain:"),
         prefix + " dict:" + dictionaryByteSize,
         prefix + " values:" + String.valueOf(encodedValues.size() * 4),
         prefix
@@ -245,38 +215,22 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
 
     /**
      * @param maxDictionaryByteSize
-     * @param initialSize
      */
-    public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-      super(maxDictionaryByteSize, initialSize);
-      binaryDictionaryContent.defaultReturnValue(-1);
-    }
-
-    /**
-     * Constructor only used by subclasses for fixed-length byte arrays.
-     */
-    protected PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize, int length) {
-      super(maxDictionaryByteSize, initialSize, length);
+    public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
       binaryDictionaryContent.defaultReturnValue(-1);
     }
 
     @Override
     public void writeBytes(Binary v) {
-      if (!dictionaryTooBig) {
-        int id = binaryDictionaryContent.getInt(v);
-        if (id == -1) {
-          id = binaryDictionaryContent.size();
-          binaryDictionaryContent.put(copy(v), id);
-          // length as int (4 bytes) + actual bytes
-          dictionaryByteSize += 4 + v.length();
-        }
-        encodedValues.add(id);
-        checkAndFallbackIfNeeded();
-      } else {
-        plainValuesWriter.writeBytes(v);
+      int id = binaryDictionaryContent.getInt(v);
+      if (id == -1) {
+        id = binaryDictionaryContent.size();
+        binaryDictionaryContent.put(copy(v), id);
+        // length as int (4 bytes) + actual bytes
+        dictionaryByteSize += 4 + v.length();
       }
-      //for rawdata, length(4 bytes int) is stored, followed by the binary content itself
-      rawDataByteSize += v.length() + 4;
+      encodedValues.add(id);
     }
 
     @Override
@@ -290,9 +244,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
           Binary entry = binaryIterator.next();
           dictionaryEncoder.writeBytes(entry);
         }
-        return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, PLAIN_DICTIONARY);
+        return dictPage(dictionaryEncoder);
       }
-      return plainValuesWriter.createDictionaryPage();
+      return null;
     }
 
     @Override
@@ -306,7 +260,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
     }
 
     @Override
-    protected void fallBackDictionaryEncodedData() {
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
       //build reverse dictionary
       Binary[] reverseDictionary = new Binary[getDictionarySize()];
       for (Object2IntMap.Entry<Binary> entry : binaryDictionaryContent.object2IntEntrySet()) {
@@ -317,7 +271,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
       IntIterator iterator = encodedValues.iterator();
       while (iterator.hasNext()) {
         int id = iterator.next();
-        plainValuesWriter.writeBytes(reverseDictionary[id]);
+        writer.writeBytes(reverseDictionary[id]);
       }
     }
 
@@ -338,26 +292,20 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
      * @param maxDictionaryByteSize
      * @param initialSize
      */
-    public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize, int length) {
-      super(maxDictionaryByteSize, initialSize, length);
+    public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
       this.length = length;
     }
 
     @Override
     public void writeBytes(Binary value) {
-      if (!dictionaryTooBig) {
-        int id = binaryDictionaryContent.getInt(value);
-        if (id == -1) {
-          id = binaryDictionaryContent.size();
-          binaryDictionaryContent.put(copy(value), id);
-          dictionaryByteSize += length;
-        }
-        encodedValues.add(id);
-        checkAndFallbackIfNeeded();
-      } else {
-        plainValuesWriter.writeBytes(value);
+      int id = binaryDictionaryContent.getInt(value);
+      if (id == -1) {
+        id = binaryDictionaryContent.size();
+        binaryDictionaryContent.put(copy(value), id);
+        dictionaryByteSize += length;
       }
-      rawDataByteSize += length;
+      encodedValues.add(id);
     }
 
     @Override
@@ -371,9 +319,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
           Binary entry = binaryIterator.next();
           dictionaryEncoder.writeBytes(entry);
         }
-        return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, PLAIN_DICTIONARY);
+        return dictPage(dictionaryEncoder);
       }
-      return plainValuesWriter.createDictionaryPage();
+      return null;
     }
   }
 
@@ -389,26 +337,20 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
      * @param maxDictionaryByteSize
      * @param initialSize
      */
-    public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-      super(maxDictionaryByteSize, initialSize);
+    public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
       longDictionaryContent.defaultReturnValue(-1);
     }
 
     @Override
     public void writeLong(long v) {
-      if (!dictionaryTooBig) {
-        int id = longDictionaryContent.get(v);
-        if (id == -1) {
-          id = longDictionaryContent.size();
-          longDictionaryContent.put(v, id);
-          dictionaryByteSize += 8;
-        }
-        encodedValues.add(id);
-        checkAndFallbackIfNeeded();
-      } else {
-        plainValuesWriter.writeLong(v);
+      int id = longDictionaryContent.get(v);
+      if (id == -1) {
+        id = longDictionaryContent.size();
+        longDictionaryContent.put(v, id);
+        dictionaryByteSize += 8;
       }
-      rawDataByteSize += 8;
+      encodedValues.add(id);
     }
 
     @Override
@@ -421,9 +363,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
         for (int i = 0; i < lastUsedDictionarySize; i++) {
           dictionaryEncoder.writeLong(longIterator.nextLong());
         }
-        return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, PLAIN_DICTIONARY);
+        return dictPage(dictionaryEncoder);
       }
-      return plainValuesWriter.createDictionaryPage();
+      return null;
     }
 
     @Override
@@ -437,7 +379,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
     }
 
     @Override
-    protected void fallBackDictionaryEncodedData() {
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
       //build reverse dictionary
       long[] reverseDictionary = new long[getDictionarySize()];
       ObjectIterator<Long2IntMap.Entry> entryIterator = longDictionaryContent.long2IntEntrySet().iterator();
@@ -450,7 +392,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
       IntIterator iterator = encodedValues.iterator();
       while (iterator.hasNext()) {
         int id = iterator.next();
-        plainValuesWriter.writeLong(reverseDictionary[id]);
+        writer.writeLong(reverseDictionary[id]);
       }
     }
   }
@@ -467,26 +409,20 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
      * @param maxDictionaryByteSize
      * @param initialSize
      */
-    public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-      super(maxDictionaryByteSize, initialSize);
+    public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
       doubleDictionaryContent.defaultReturnValue(-1);
     }
 
     @Override
     public void writeDouble(double v) {
-      if (!dictionaryTooBig) {
-        int id = doubleDictionaryContent.get(v);
-        if (id == -1) {
-          id = doubleDictionaryContent.size();
-          doubleDictionaryContent.put(v, id);
-          dictionaryByteSize += 8;
-        }
-        encodedValues.add(id);
-        checkAndFallbackIfNeeded();
-      } else {
-        plainValuesWriter.writeDouble(v);
+      int id = doubleDictionaryContent.get(v);
+      if (id == -1) {
+        id = doubleDictionaryContent.size();
+        doubleDictionaryContent.put(v, id);
+        dictionaryByteSize += 8;
       }
-      rawDataByteSize += 8;
+      encodedValues.add(id);
     }
 
     @Override
@@ -499,9 +435,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
         for (int i = 0; i < lastUsedDictionarySize; i++) {
           dictionaryEncoder.writeDouble(doubleIterator.nextDouble());
         }
-        return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, PLAIN_DICTIONARY);
+        return dictPage(dictionaryEncoder);
       }
-      return plainValuesWriter.createDictionaryPage();
+      return null;
     }
 
     @Override
@@ -515,7 +451,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
     }
 
     @Override
-    protected void fallBackDictionaryEncodedData() {
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
       //build reverse dictionary
       double[] reverseDictionary = new double[getDictionarySize()];
       ObjectIterator<Double2IntMap.Entry> entryIterator = doubleDictionaryContent.double2IntEntrySet().iterator();
@@ -528,7 +464,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
       IntIterator iterator = encodedValues.iterator();
       while (iterator.hasNext()) {
         int id = iterator.next();
-        plainValuesWriter.writeDouble(reverseDictionary[id]);
+        writer.writeDouble(reverseDictionary[id]);
       }
     }
   }
@@ -545,28 +481,20 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
      * @param maxDictionaryByteSize
      * @param initialSize
      */
-    public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-      super(maxDictionaryByteSize, initialSize);
+    public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
       intDictionaryContent.defaultReturnValue(-1);
     }
 
     @Override
     public void writeInteger(int v) {
-      if (!dictionaryTooBig) {
-        int id = intDictionaryContent.get(v);
-        if (id == -1) {
-          id = intDictionaryContent.size();
-          intDictionaryContent.put(v, id);
-          dictionaryByteSize += 4;
-        }
-        encodedValues.add(id);
-        checkAndFallbackIfNeeded();
-      } else {
-        plainValuesWriter.writeInteger(v);
+      int id = intDictionaryContent.get(v);
+      if (id == -1) {
+        id = intDictionaryContent.size();
+        intDictionaryContent.put(v, id);
+        dictionaryByteSize += 4;
       }
-
-      //Each integer takes 4 bytes as raw data(plain encoding)
-      rawDataByteSize += 4;
+      encodedValues.add(id);
     }
 
     @Override
@@ -579,9 +507,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
         for (int i = 0; i < lastUsedDictionarySize; i++) {
           dictionaryEncoder.writeInteger(intIterator.nextInt());
         }
-        return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, PLAIN_DICTIONARY);
+        return dictPage(dictionaryEncoder);
       }
-      return plainValuesWriter.createDictionaryPage();
+      return null;
     }
 
     @Override
@@ -595,7 +523,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
     }
 
     @Override
-    protected void fallBackDictionaryEncodedData() {
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
       //build reverse dictionary
       int[] reverseDictionary = new int[getDictionarySize()];
       ObjectIterator<Int2IntMap.Entry> entryIterator = intDictionaryContent.int2IntEntrySet().iterator();
@@ -608,7 +536,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
       IntIterator iterator = encodedValues.iterator();
       while (iterator.hasNext()) {
         int id = iterator.next();
-        plainValuesWriter.writeInteger(reverseDictionary[id]);
+        writer.writeInteger(reverseDictionary[id]);
       }
     }
   }
@@ -625,26 +553,20 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
      * @param maxDictionaryByteSize
      * @param initialSize
      */
-    public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-      super(maxDictionaryByteSize, initialSize);
+    public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
       floatDictionaryContent.defaultReturnValue(-1);
     }
 
     @Override
     public void writeFloat(float v) {
-      if (!dictionaryTooBig) {
-        int id = floatDictionaryContent.get(v);
-        if (id == -1) {
-          id = floatDictionaryContent.size();
-          floatDictionaryContent.put(v, id);
-          dictionaryByteSize += 4;
-        }
-        encodedValues.add(id);
-        checkAndFallbackIfNeeded();
-      } else {
-        plainValuesWriter.writeFloat(v);
+      int id = floatDictionaryContent.get(v);
+      if (id == -1) {
+        id = floatDictionaryContent.size();
+        floatDictionaryContent.put(v, id);
+        dictionaryByteSize += 4;
       }
-      rawDataByteSize += 4;
+      encodedValues.add(id);
     }
 
     @Override
@@ -657,9 +579,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
         for (int i = 0; i < lastUsedDictionarySize; i++) {
           dictionaryEncoder.writeFloat(floatIterator.nextFloat());
         }
-        return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, PLAIN_DICTIONARY);
+        return dictPage(dictionaryEncoder);
       }
-      return plainValuesWriter.createDictionaryPage();
+      return null;
     }
 
     @Override
@@ -673,7 +595,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
     }
 
     @Override
-    protected void fallBackDictionaryEncodedData() {
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
       //build reverse dictionary
       float[] reverseDictionary = new float[getDictionarySize()];
       ObjectIterator<Float2IntMap.Entry> entryIterator = floatDictionaryContent.float2IntEntrySet().iterator();
@@ -686,7 +608,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter {
       IntIterator iterator = encodedValues.iterator();
       while (iterator.hasNext()) {
         int id = iterator.next();
-        plainValuesWriter.writeFloat(reverseDictionary[id]);
+        writer.writeFloat(reverseDictionary[id]);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java
index 33deab8..23a51d4 100644
--- a/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java
+++ b/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -17,6 +17,7 @@ package parquet.column.values.dictionary;
 
 import static parquet.bytes.BytesUtils.readIntLittleEndian;
 import static parquet.column.Encoding.PLAIN_DICTIONARY;
+import static parquet.column.Encoding.PLAIN;
 
 import java.io.IOException;
 
@@ -42,7 +43,8 @@ public abstract class PlainValuesDictionary extends Dictionary {
    */
   protected PlainValuesDictionary(DictionaryPage dictionaryPage) throws IOException {
     super(dictionaryPage.getEncoding());
-    if (dictionaryPage.getEncoding() != PLAIN_DICTIONARY) {
+    if (dictionaryPage.getEncoding() != PLAIN_DICTIONARY
+        && dictionaryPage.getEncoding() != PLAIN) {
       throw new ParquetDecodingException("Dictionary data encoding type not supported: " + dictionaryPage.getEncoding());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-column/src/main/java/parquet/column/values/fallback/FallbackValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/fallback/FallbackValuesWriter.java
new file mode 100644
index 0000000..a4d1c7c
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/values/fallback/FallbackValuesWriter.java
@@ -0,0 +1,187 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.values.fallback;
+
+import parquet.bytes.BytesInput;
+import parquet.column.Encoding;
+import parquet.column.page.DictionaryPage;
+import parquet.column.values.RequiresFallback;
+import parquet.column.values.ValuesWriter;
+import parquet.io.api.Binary;
+
+public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> extends ValuesWriter {
+
+  public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(I initialWriter, F fallBackWriter) {
+    return new FallbackValuesWriter<I, F>(initialWriter, fallBackWriter);
+  }
+
+  /** writer to start with */
+  public final I initialWriter;
+  /** fallback */
+  public final F fallBackWriter;
+
+  private boolean fellBackAlready = false;
+
+  /** writer currently written to */
+  private ValuesWriter currentWriter;
+
+  private boolean initialUsedAndHadDictionary = false;
+
+  /* size of raw data, even if dictionary is used, it will not have effect on raw data size, it is used to decide
+   * if fall back to plain encoding is better by comparing rawDataByteSize with Encoded data size
+   * It's also used in getBufferedSize, so the page will be written based on raw data size
+   */
+  private long rawDataByteSize = 0;
+
+  /** indicates if this is the first page being processed */
+  private boolean firstPage = true;
+
+  public FallbackValuesWriter(I initialWriter, F fallBackWriter) {
+    super();
+    this.initialWriter = initialWriter;
+    this.fallBackWriter = fallBackWriter;
+    this.currentWriter = initialWriter;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    // use raw data size to decide if we want to flush the page
+    // so the actual size of the page written could be much more smaller
+    // due to dictionary encoding. This prevents page being too big when fallback happens.
+    return rawDataByteSize;
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    if (!fellBackAlready && firstPage) {
+      // we use the first page to decide if we're going to use this encoding
+      BytesInput bytes = initialWriter.getBytes();
+      if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
+        fallBack();
+      } else {
+        return bytes;
+      }
+    }
+    return currentWriter.getBytes();
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    Encoding encoding = currentWriter.getEncoding();
+    if (!fellBackAlready && !initialUsedAndHadDictionary) {
+      initialUsedAndHadDictionary = encoding.usesDictionary();
+    }
+    return encoding;
+  }
+
+  @Override
+  public void reset() {
+    rawDataByteSize = 0;
+    firstPage = false;
+    currentWriter.reset();
+  }
+
+  public DictionaryPage createDictionaryPage() {
+    if (initialUsedAndHadDictionary) {
+      return initialWriter.createDictionaryPage();
+    } else {
+      return currentWriter.createDictionaryPage();
+    }
+  }
+
+  public void resetDictionary() {
+    if (initialUsedAndHadDictionary) {
+      initialWriter.resetDictionary();
+    } else {
+      currentWriter.resetDictionary();
+    }
+    currentWriter = initialWriter;
+    fellBackAlready = false;
+    initialUsedAndHadDictionary = false;
+    firstPage = true;
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return currentWriter.getAllocatedSize();
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return String.format(
+        "%s FallbackValuesWriter{\n"
+          + "%s\n"
+          + "%s\n"
+        + "%s}\n",
+        prefix,
+        initialWriter.memUsageString(prefix + " initial:"),
+        fallBackWriter.memUsageString(prefix + " fallback:"),
+        prefix
+        );
+  }
+
+  private void checkFallback() {
+    if (!fellBackAlready && initialWriter.shouldFallBack()) {
+      fallBack();
+    }
+  }
+
+  private void fallBack() {
+    fellBackAlready = true;
+    initialWriter.fallBackAllValuesTo(fallBackWriter);
+    currentWriter = fallBackWriter;
+  }
+
+  // passthrough writing the value
+
+  public void writeByte(int value) {
+    rawDataByteSize += 1;
+    currentWriter.writeByte(value);
+    checkFallback();
+  }
+
+  public void writeBytes(Binary v) {
+    //for rawdata, length(4 bytes int) is stored, followed by the binary content itself
+    rawDataByteSize += v.length() + 4;
+    currentWriter.writeBytes(v);
+    checkFallback();
+  }
+
+  public void writeInteger(int v) {
+    rawDataByteSize += 4;
+    currentWriter.writeInteger(v);
+    checkFallback();
+  }
+
+  public void writeLong(long v) {
+    rawDataByteSize += 8;
+    currentWriter.writeLong(v);
+    checkFallback();
+  }
+
+  public void writeFloat(float v) {
+    rawDataByteSize += 4;
+    currentWriter.writeFloat(v);
+    checkFallback();
+  }
+
+  public void writeDouble(double v) {
+    rawDataByteSize += 8;
+    currentWriter.writeDouble(v);
+    checkFallback();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java
index a5d6e1f..5f2894e 100644
--- a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java
@@ -20,7 +20,6 @@ import static parquet.column.Encoding.PLAIN;
 import static parquet.column.Encoding.PLAIN_DICTIONARY;
 import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
 import static parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
 import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 
@@ -41,17 +40,43 @@ import parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictio
 import parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
 import parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
 import parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
+import parquet.column.values.fallback.FallbackValuesWriter;
 import parquet.column.values.plain.BinaryPlainValuesReader;
 import parquet.column.values.plain.PlainValuesReader;
+import parquet.column.values.plain.PlainValuesWriter;
 import parquet.io.api.Binary;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 public class TestDictionary {
 
+  private <I extends DictionaryValuesWriter> FallbackValuesWriter<I, PlainValuesWriter> plainFallBack(I dvw, int initialSize) {
+    return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize));
+  }
+
+  private FallbackValuesWriter<PlainBinaryDictionaryValuesWriter, PlainValuesWriter> newPlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  private FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> newPlainLongDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  private FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> newPlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  private FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> newPlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
+  private FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> newPlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
+    return plainFallBack(new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+  }
+
   @Test
   public void testBinaryDictionary() throws IOException {
     int COUNT = 100;
-    ValuesWriter cw = new PlainBinaryDictionaryValuesWriter(200, 10000);
+    ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(200, 10000);
     writeRepeated(COUNT, cw, "a");
     BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
     writeRepeated(COUNT, cw, "b");
@@ -71,17 +96,17 @@ public class TestDictionary {
   public void testBinaryDictionaryFallBack() throws IOException {
     int slabSize = 100;
     int maxDictionaryByteSize = 50;
-    final DictionaryValuesWriter cw = new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+    final ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
     int fallBackThreshold = maxDictionaryByteSize;
     int dataSize=0;
     for (long i = 0; i < 100; i++) {
       Binary binary = Binary.fromString("str" + i);
       cw.writeBytes(binary);
-      dataSize+=(binary.length()+4);
+      dataSize += (binary.length() + 4);
       if (dataSize < fallBackThreshold) {
-        assertEquals( PLAIN_DICTIONARY,cw.getEncoding());
+        assertEquals(PLAIN_DICTIONARY, cw.getEncoding());
       } else {
-        assertEquals(PLAIN,cw.getEncoding());
+        assertEquals(PLAIN, cw.getEncoding());
       }
     }
 
@@ -101,7 +126,7 @@ public class TestDictionary {
   @Test
   public void testBinaryDictionaryChangedValues() throws IOException {
     int COUNT = 100;
-    ValuesWriter cw = new PlainBinaryDictionaryValuesWriter(200, 10000);
+    ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(200, 10000);
     writeRepeatedWithReuse(COUNT, cw, "a");
     BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
     writeRepeatedWithReuse(COUNT, cw, "b");
@@ -120,7 +145,7 @@ public class TestDictionary {
   @Test
   public void testFirstPageFallBack() throws IOException {
     int COUNT = 1000;
-    ValuesWriter cw = new PlainBinaryDictionaryValuesWriter(10000, 10000);
+    ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(10000, 10000);
     writeDistinct(COUNT, cw, "a");
     // not efficient so falls back
     BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN);
@@ -136,9 +161,8 @@ public class TestDictionary {
 
   @Test
   public void testSecondPageFallBack() throws IOException {
-
     int COUNT = 1000;
-    ValuesWriter cw = new PlainBinaryDictionaryValuesWriter(1000, 10000);
+    ValuesWriter cw = newPlainBinaryDictionaryValuesWriter(1000, 10000);
     writeRepeated(COUNT, cw, "a");
     BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
     writeDistinct(COUNT, cw, "b");
@@ -157,21 +181,20 @@ public class TestDictionary {
 
   @Test
   public void testLongDictionary() throws IOException {
-
     int COUNT = 1000;
     int COUNT2 = 2000;
-    final DictionaryValuesWriter cw = new PlainLongDictionaryValuesWriter(10000, 10000);
+    final FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> cw = newPlainLongDictionaryValuesWriter(10000, 10000);
     for (long i = 0; i < COUNT; i++) {
       cw.writeLong(i % 50);
     }
     BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
-    assertEquals(50, cw.getDictionarySize());
+    assertEquals(50, cw.initialWriter.getDictionarySize());
 
     for (long i = COUNT2; i > 0; i--) {
       cw.writeLong(i % 50);
     }
     BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
-    assertEquals(50, cw.getDictionarySize());
+    assertEquals(50, cw.initialWriter.getDictionarySize());
 
     DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64);
 
@@ -187,8 +210,8 @@ public class TestDictionary {
       assertEquals(i % 50, back);
     }
   }
-  
-  private void roundTripLong(DictionaryValuesWriter cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
+
+  private void roundTripLong(FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
     int fallBackThreshold = maxDictionaryByteSize / 8;
     for (long i = 0; i < 100; i++) {
       cw.writeLong(i);
@@ -210,16 +233,16 @@ public class TestDictionary {
   public void testLongDictionaryFallBack() throws IOException {
     int slabSize = 100;
     int maxDictionaryByteSize = 50;
-    final DictionaryValuesWriter cw = new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+    final FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> cw = newPlainLongDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
     // Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
     ValuesReader reader = new PlainValuesReader.LongPlainValuesReader();
-    
+
     roundTripLong(cw, reader, maxDictionaryByteSize);
     //simulate cutting the page
     cw.reset();
     assertEquals(0,cw.getBufferedSize());
     cw.resetDictionary();
-  
+
     roundTripLong(cw, reader, maxDictionaryByteSize);
   }
 
@@ -228,20 +251,20 @@ public class TestDictionary {
 
     int COUNT = 1000;
     int COUNT2 = 2000;
-    final DictionaryValuesWriter cw = new PlainDoubleDictionaryValuesWriter(10000, 10000);
+    final FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> cw = newPlainDoubleDictionaryValuesWriter(10000, 10000);
 
     for (double i = 0; i < COUNT; i++) {
       cw.writeDouble(i % 50);
     }
 
     BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
-    assertEquals(50, cw.getDictionarySize());
+    assertEquals(50, cw.initialWriter.getDictionarySize());
 
     for (double i = COUNT2; i > 0; i--) {
       cw.writeDouble(i % 50);
     }
     BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
-    assertEquals(50, cw.getDictionarySize());
+    assertEquals(50, cw.initialWriter.getDictionarySize());
 
     final DictionaryValuesReader cr = initDicReader(cw, DOUBLE);
 
@@ -258,8 +281,8 @@ public class TestDictionary {
     }
 
   }
-  
-  private void roundTripDouble(DictionaryValuesWriter cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
+
+  private void roundTripDouble(FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
     int fallBackThreshold = maxDictionaryByteSize / 8;
     for (double i = 0; i < 100; i++) {
       cw.writeDouble(i);
@@ -276,22 +299,22 @@ public class TestDictionary {
       assertEquals(i, reader.readDouble(), 0.00001);
     }
   }
-  
+
   @Test
   public void testDoubleDictionaryFallBack() throws IOException {
     int slabSize = 100;
     int maxDictionaryByteSize = 50;
-    final DictionaryValuesWriter cw = new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
-    
+    final FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> cw = newPlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+
     // Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
     ValuesReader reader = new PlainValuesReader.DoublePlainValuesReader();
-    
+
     roundTripDouble(cw, reader, maxDictionaryByteSize);
     //simulate cutting the page
     cw.reset();
     assertEquals(0,cw.getBufferedSize());
     cw.resetDictionary();
-  
+
     roundTripDouble(cw, reader, maxDictionaryByteSize);
   }
 
@@ -300,19 +323,19 @@ public class TestDictionary {
 
     int COUNT = 2000;
     int COUNT2 = 4000;
-    final DictionaryValuesWriter cw = new PlainIntegerDictionaryValuesWriter(10000, 10000);
+    final FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> cw = newPlainIntegerDictionaryValuesWriter(10000, 10000);
 
     for (int i = 0; i < COUNT; i++) {
       cw.writeInteger(i % 50);
     }
     BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
-    assertEquals(50, cw.getDictionarySize());
+    assertEquals(50, cw.initialWriter.getDictionarySize());
 
     for (int i = COUNT2; i > 0; i--) {
       cw.writeInteger(i % 50);
     }
     BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
-    assertEquals(50, cw.getDictionarySize());
+    assertEquals(50, cw.initialWriter.getDictionarySize());
 
     DictionaryValuesReader cr = initDicReader(cw, INT32);
 
@@ -329,8 +352,8 @@ public class TestDictionary {
     }
 
   }
-  
-  private void roundTripInt(DictionaryValuesWriter cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
+
+  private void roundTripInt(FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
     int fallBackThreshold = maxDictionaryByteSize / 4;
     for (int i = 0; i < 100; i++) {
       cw.writeInteger(i);
@@ -347,22 +370,22 @@ public class TestDictionary {
       assertEquals(i, reader.readInteger());
     }
   }
-  
+
   @Test
   public void testIntDictionaryFallBack() throws IOException {
     int slabSize = 100;
     int maxDictionaryByteSize = 50;
-    final DictionaryValuesWriter cw = new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
-    
+    final FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> cw = newPlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+
     // Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
     ValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader();
-    
+
     roundTripInt(cw, reader, maxDictionaryByteSize);
     //simulate cutting the page
     cw.reset();
     assertEquals(0,cw.getBufferedSize());
     cw.resetDictionary();
-  
+
     roundTripInt(cw, reader, maxDictionaryByteSize);
   }
 
@@ -371,19 +394,19 @@ public class TestDictionary {
 
     int COUNT = 2000;
     int COUNT2 = 4000;
-    final DictionaryValuesWriter cw = new PlainFloatDictionaryValuesWriter(10000, 10000);
+    final FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> cw = newPlainFloatDictionaryValuesWriter(10000, 10000);
 
     for (float i = 0; i < COUNT; i++) {
       cw.writeFloat(i % 50);
     }
     BytesInput bytes1 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
-    assertEquals(50, cw.getDictionarySize());
+    assertEquals(50, cw.initialWriter.getDictionarySize());
 
     for (float i = COUNT2; i > 0; i--) {
       cw.writeFloat(i % 50);
     }
     BytesInput bytes2 = getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
-    assertEquals(50, cw.getDictionarySize());
+    assertEquals(50, cw.initialWriter.getDictionarySize());
 
     DictionaryValuesReader cr = initDicReader(cw, FLOAT);
 
@@ -400,8 +423,8 @@ public class TestDictionary {
     }
 
   }
-  
-  private void roundTripFloat(DictionaryValuesWriter cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
+
+  private void roundTripFloat(FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> cw,  ValuesReader reader, int maxDictionaryByteSize) throws IOException {
     int fallBackThreshold = maxDictionaryByteSize / 4;
     for (float i = 0; i < 100; i++) {
       cw.writeFloat(i);
@@ -418,28 +441,28 @@ public class TestDictionary {
       assertEquals(i, reader.readFloat(), 0.00001);
     }
   }
-  
+
   @Test
   public void testFloatDictionaryFallBack() throws IOException {
     int slabSize = 100;
     int maxDictionaryByteSize = 50;
-    final DictionaryValuesWriter cw = new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
-    
+    final FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> cw = newPlainFloatDictionaryValuesWriter(maxDictionaryByteSize, slabSize);
+
     // Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
     ValuesReader reader = new PlainValuesReader.FloatPlainValuesReader();
-    
+
     roundTripFloat(cw, reader, maxDictionaryByteSize);
     //simulate cutting the page
     cw.reset();
     assertEquals(0,cw.getBufferedSize());
     cw.resetDictionary();
-  
+
     roundTripFloat(cw, reader, maxDictionaryByteSize);
   }
 
   @Test
   public void testZeroValues() throws IOException {
-    DictionaryValuesWriter cw = new PlainIntegerDictionaryValuesWriter(100, 100);
+    FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> cw = newPlainIntegerDictionaryValuesWriter(100, 100);
     cw.writeInteger(34);
     cw.writeInteger(34);
     getBytesAndCheckEncoding(cw, PLAIN_DICTIONARY);
@@ -455,7 +478,7 @@ public class TestDictionary {
       throws IOException {
     final DictionaryPage dictionaryPage = cw.createDictionaryPage().copy();
     final ColumnDescriptor descriptor = new ColumnDescriptor(new String[] {"foo"}, type, 0, 0);
-    final Dictionary dictionary = PLAIN_DICTIONARY.initDictionary(descriptor, dictionaryPage);
+    final Dictionary dictionary = PLAIN.initDictionary(descriptor, dictionaryPage);
     final DictionaryValuesReader cr = new DictionaryValuesReader(dictionary);
     return cr;
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-hadoop/src/main/java/parquet/hadoop/example/GroupWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/example/GroupWriteSupport.java b/parquet-hadoop/src/main/java/parquet/hadoop/example/GroupWriteSupport.java
index 3e83860..e660d8e 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/example/GroupWriteSupport.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/example/GroupWriteSupport.java
@@ -15,10 +15,14 @@
  */
 package parquet.hadoop.example;
 
+import static parquet.Preconditions.checkNotNull;
+import static parquet.schema.MessageTypeParser.parseMessageType;
+
 import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 
+import parquet.Preconditions;
 import parquet.example.data.Group;
 import parquet.example.data.GroupWriter;
 import parquet.hadoop.api.WriteSupport;
@@ -35,7 +39,7 @@ public class GroupWriteSupport extends WriteSupport<Group> {
   }
 
   public static MessageType getSchema(Configuration configuration) {
-    return MessageTypeParser.parseMessageType(configuration.get(PARQUET_EXAMPLE_SCHEMA));
+    return parseMessageType(checkNotNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA));
   }
 
   private MessageType schema;

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
index 908d2b1..c86753e 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static parquet.column.Encoding.BIT_PACKED;
 import static parquet.column.Encoding.PLAIN;
+import static parquet.hadoop.TestUtils.enforceEmptyDir;
 import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static parquet.schema.Type.Repetition.OPTIONAL;
 import static parquet.schema.Type.Repetition.REPEATED;
@@ -269,8 +270,7 @@ public class TestParquetFileWriter {
     Configuration configuration = new Configuration();
 
     final FileSystem fs = testDirPath.getFileSystem(configuration);
-    fs.delete(testDirPath, true);
-    fs.mkdirs(testDirPath);
+    enforceEmptyDir(configuration, testDirPath);
 
     MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}");
     createFile(configuration, new Path(testDirPath, "part0"), schema);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriter.java
new file mode 100644
index 0000000..aa78dbb
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetWriter.java
@@ -0,0 +1,111 @@
+package parquet.hadoop;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static parquet.column.Encoding.DELTA_BYTE_ARRAY;
+import static parquet.column.Encoding.PLAIN;
+import static parquet.column.Encoding.PLAIN_DICTIONARY;
+import static parquet.column.Encoding.RLE_DICTIONARY;
+import static parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static parquet.hadoop.ParquetFileReader.readFooter;
+import static parquet.hadoop.TestUtils.enforceEmptyDir;
+import static parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import parquet.column.Encoding;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.example.data.Group;
+import parquet.example.data.simple.SimpleGroupFactory;
+import parquet.hadoop.example.GroupReadSupport;
+import parquet.hadoop.example.GroupWriteSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.io.api.Binary;
+import parquet.schema.MessageType;
+
+public class TestParquetWriter {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path root = new Path("target/tests/TestParquetWriter/");
+    enforceEmptyDir(conf, root);
+    MessageType schema = parseMessageType(
+        "message test { "
+        + "required binary binary_field; "
+        + "required int32 int32_field; "
+        + "required int64 int64_field; "
+        + "required boolean boolean_field; "
+        + "required float float_field; "
+        + "required double double_field; "
+        + "required fixed_len_byte_array(3) flba_field; "
+        + "required int96 int96_field; "
+        + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    Map<String, Encoding> expected = new HashMap<String, Encoding>();
+    expected.put("10-" + PARQUET_1_0, PLAIN_DICTIONARY);
+    expected.put("1000-" + PARQUET_1_0, PLAIN);
+    expected.put("10-" + PARQUET_2_0, RLE_DICTIONARY);
+    expected.put("1000-" + PARQUET_2_0, DELTA_BYTE_ARRAY);
+    for (int modulo : asList(10, 1000)) {
+      for (WriterVersion version : WriterVersion.values()) {
+        Path file = new Path(root, version.name() + "_" + modulo);
+        ParquetWriter<Group> writer = new ParquetWriter<Group>(
+            file,
+            new GroupWriteSupport(),
+            UNCOMPRESSED, 1024, 1024, 512, true, false, version, conf);
+        for (int i = 0; i < 1000; i++) {
+          writer.write(
+              f.newGroup()
+              .append("binary_field", "test" + (i % modulo))
+              .append("int32_field", 32)
+              .append("int64_field", 64l)
+              .append("boolean_field", true)
+              .append("float_field", 1.0f)
+              .append("double_field", 2.0d)
+              .append("flba_field", "foo")
+              .append("int96_field", Binary.fromByteArray(new byte[12])));
+        }
+        writer.close();
+        ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).withConf(conf).build();
+        for (int i = 0; i < 1000; i++) {
+          Group group = reader.read();
+          assertEquals("test" + (i % modulo), group.getBinary("binary_field", 0).toStringUsingUTF8());
+          assertEquals(32, group.getInteger("int32_field", 0));
+          assertEquals(64l, group.getLong("int64_field", 0));
+          assertEquals(true, group.getBoolean("boolean_field", 0));
+          assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
+          assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
+          assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8());
+          assertEquals(Binary.fromByteArray(new byte[12]), group.getInt96("int96_field", 0));
+        }
+        reader.close();
+        ParquetMetadata footer = readFooter(conf, file, NO_FILTER);
+        for (BlockMetaData blockMetaData : footer.getBlocks()) {
+          for (ColumnChunkMetaData column : blockMetaData.getColumns()) {
+            if (column.getPath().toDotString().equals("binary_field")) {
+              String key = modulo + "-" + version;
+              Encoding expectedEncoding = expected.get(key);
+              assertTrue(
+                  key + ":" + column.getEncodings() + " should contain " + expectedEncoding,
+                  column.getEncodings().contains(expectedEncoding));
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/parquet-hadoop/src/test/java/parquet/hadoop/TestUtils.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestUtils.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestUtils.java
new file mode 100644
index 0000000..c20cba0
--- /dev/null
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestUtils.java
@@ -0,0 +1,22 @@
+package parquet.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestUtils {
+
+  public static void enforceEmptyDir(Configuration conf, Path path) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    if (fs.exists(path)) {
+      if (!fs.delete(path, true)) {
+        throw new IOException("can not delete path " + path);
+      }
+    }
+    if (!fs.mkdirs(path)) {
+      throw new IOException("can not create path " + path);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad06e611/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 95bacea..5b419fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,6 +215,7 @@
                    <previousVersion>${previous.version}</previousVersion>
                    <excludes>
                      <exclude>parquet/org/**</exclude>
+                     <exclude>parquet/column/values/**</exclude>
                      <exclude>parquet/hadoop/ParquetInputSplit</exclude>
                    </excludes>
                  </requireBackwardCompatibility>


Mime
View raw message