parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alexleven...@apache.org
Subject parquet-mr git commit: PARQUET-601: Add support to configure the encoding used by ValueWriters
Date Thu, 11 Aug 2016 20:30:48 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master b301d1270 -> 30aa91012


PARQUET-601: Add support to configure the encoding used by ValueWriters

### Context:
Parquet is currently structured to choose the appropriate value writer based on the type of the column as well as the Parquet version. As of now, the writer(s) (and hence encoding) for each data type is hard coded in the Parquet source code.

This PR adds support for being able to override the encodings per type via config. That allows users to experiment with various encoding strategies manually as well as enables them to override the hardcoded defaults if they don't suit their use case.

We can override encodings per data type (int32 / int64 / ...).
Something on the lines of:
```
parquet.writer.encoding-override.<type> = "encoding1[,encoding2]"
```

As an example:
```
"parquet.writer.encoding-override.int32" = "plain"
(Chooses Plain encoding and hence the PlainValuesWriter).
```

When a primary + fallback need to be specified, we can do the following:
```
"parquet.writer.encoding-override.binary" = "rle_dictionary,delta_byte_array"
(Chooses RLE_DICTIONARY encoding as the initial encoding and DELTA_BYTE_ARRAY encoding as the fallback and hence creates a FallbackWriter(PlainBinaryDictionaryValuesWriter, DeltaByteArrayWriter).
```

In such cases we can mandate that the first encoding listed must allow for Fallbacks by implementing [RequiresFallback](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java#L31).

### PR notes:

- Restructured the ValuesWriter creation code. Pulled it out of ParquetProperties into a new class and refactored the flow based on type as it was getting hard to follow and I felt adding the overrides would make it harder. Added a bunch of unit tests to verify the ValuesWriter we create for combinations of type, parquet version and dictionary on / off.
- Added unit tests to verify parsing of the encoding overrides + creation of ValuesWriters based on these overrides.
- Manually tested some encoding overrides scenarios out on Hadoop (both parquet v1, v2).

Author: Piyush Narang <pnarang@twitter.com>

Closes #342 from piyushnarang/master and squashes the following commits:

3ebab28 [Piyush Narang] Remove Configurable
149bb98 [Piyush Narang] Switch to getValuesWriterFactory call to non-static
0b78e04 [Piyush Narang] Address Ryan's feedback
1da6ca3 [Piyush Narang] Merge branch 'master' into piyush/dynamic-encoding-overrides
f021ed2 [Piyush Narang] Tweak comment in ValuesWriterFactory
cb02ea0 [Piyush Narang] Fix review comments
bf4bc6d [Piyush Narang] Add support for Config setting in ValuesWriter factory
8a852a3 [Piyush Narang] Log values writer factory chosen
e4b61a4 [Piyush Narang] Tweak factory instantiation a bit
b46cccd [Piyush Narang] Add class based factory override
6a5428f [Piyush Narang] Clean up some stuff in ValuesWriterFactory
0f8cd09 [Piyush Narang] Refactor mockito version
9ead61d [Piyush Narang] Add guava test dep
5c636c7 [Piyush Narang] Add encoding-overrides config to ParquetOutputFormat config
b9d6c13 [Piyush Narang] Refactor code in ValuesWriterFactory a bit
ff4c90d [Piyush Narang] Pull out value writer creation to ValuesWriterFactory and add unit tests


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/30aa9101
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/30aa9101
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/30aa9101

Branch: refs/heads/master
Commit: 30aa91012cf6019bb9720609c1d03b5386a87ffb
Parents: b301d12
Author: Piyush Narang <pnarang@twitter.com>
Authored: Thu Aug 11 13:30:43 2016 -0700
Committer: Alex Levenson <alexlevenson@twitter.com>
Committed: Thu Aug 11 13:30:43 2016 -0700

----------------------------------------------------------------------
 parquet-avro/pom.xml                            |   2 +-
 parquet-cascading/pom.xml                       |   2 +-
 parquet-cascading3/pom.xml                      |   2 +-
 parquet-column/pom.xml                          |  12 +
 .../parquet/column/ParquetProperties.java       | 176 ++--------
 .../factory/DefaultV1ValuesWriterFactory.java   | 111 ++++++
 .../factory/DefaultV2ValuesWriterFactory.java   | 115 ++++++
 .../factory/DefaultValuesWriterFactory.java     |  87 +++++
 .../values/factory/ValuesWriterFactory.java     |  47 +++
 .../factory/DefaultValuesWriterFactoryTest.java | 350 +++++++++++++++++++
 parquet-hadoop/pom.xml                          |   4 +-
 .../parquet/hadoop/ParquetOutputFormat.java     |  23 +-
 parquet-pig/pom.xml                             |   2 +-
 parquet-protobuf/pom.xml                        |   2 +-
 parquet-tools/pom.xml                           |   2 +-
 pom.xml                                         |   2 +
 16 files changed, 782 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 50c37db..109cc38 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -67,7 +67,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>11.0</version>
+      <version>${guava.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-cascading/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml
index 0e1e1e1..0573aba 100644
--- a/parquet-cascading/pom.xml
+++ b/parquet-cascading/pom.xml
@@ -77,7 +77,7 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
-      <version>1.9.5</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-cascading3/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml
index 67b5e09..9aa8991 100644
--- a/parquet-cascading3/pom.xml
+++ b/parquet-cascading3/pom.xml
@@ -88,7 +88,7 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
-      <version>1.9.5</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index ccceafa..014921c 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -83,6 +83,18 @@
       <version>${slf4j.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index e3881f8..e746811 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -24,30 +24,15 @@ import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 
 import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
-import static org.apache.parquet.column.Encoding.PLAIN;
-import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
-import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.impl.ColumnWriteStoreV2;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
-import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
-import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
-import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
-import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
-import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
-import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.apache.parquet.column.values.factory.ValuesWriterFactory;
 import org.apache.parquet.schema.MessageType;
 
 /**
@@ -66,6 +51,8 @@ public class ParquetProperties {
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
 
+  public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
+
   private static final int MIN_SLAB_SIZE = 64;
 
   public enum WriterVersion {
@@ -89,6 +76,7 @@ public class ParquetProperties {
     }
   }
 
+  private final int initialSlabSize;
   private final int pageSizeThreshold;
   private final int dictionaryPageSizeThreshold;
   private final WriterVersion writerVersion;
@@ -97,14 +85,14 @@ public class ParquetProperties {
   private final int maxRowCountForPageSizeCheck;
   private final boolean estimateNextSizeCheck;
   private final ByteBufferAllocator allocator;
-
-  private final int initialSlabSize;
+  private final ValuesWriterFactory valuesWriterFactory;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
-                            int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator) {
+                            int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
+                            ValuesWriterFactory writerFactory) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
-        .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
+      .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
     this.dictionaryPageSizeThreshold = dictPageSize;
     this.writerVersion = writerVersion;
     this.enableDictionary = enableDict;
@@ -112,6 +100,8 @@ public class ParquetProperties {
     this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
     this.estimateNextSizeCheck = estimateNextSizeCheck;
     this.allocator = allocator;
+
+    this.valuesWriterFactory = writerFactory;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -144,128 +134,18 @@ public class ParquetProperties {
         getWidthFromMaxInt(maxLevel), MIN_SLAB_SIZE, pageSizeThreshold, allocator);
   }
 
-  private ValuesWriter plainWriter(ColumnDescriptor path) {
-    switch (path.getType()) {
-    case BOOLEAN:
-      return new BooleanPlainValuesWriter();
-    case INT96:
-      return new FixedLenByteArrayPlainValuesWriter(12, initialSlabSize, pageSizeThreshold, allocator);
-    case FIXED_LEN_BYTE_ARRAY:
-      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSlabSize, pageSizeThreshold, allocator);
-    case BINARY:
-    case INT32:
-    case INT64:
-    case DOUBLE:
-    case FLOAT:
-      return new PlainValuesWriter(initialSlabSize, pageSizeThreshold, allocator);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
-  }
-
-  @SuppressWarnings("deprecation")
-  private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path) {
-    Encoding encodingForDataPage;
-    Encoding encodingForDictionaryPage;
-    switch(writerVersion) {
-    case PARQUET_1_0:
-      encodingForDataPage = PLAIN_DICTIONARY;
-      encodingForDictionaryPage = PLAIN_DICTIONARY;
-      break;
-    case PARQUET_2_0:
-      encodingForDataPage = RLE_DICTIONARY;
-      encodingForDictionaryPage = PLAIN;
-      break;
-    default:
-      throw new IllegalArgumentException("Unknown version: " + writerVersion);
-    }
-    switch (path.getType()) {
-    case BOOLEAN:
-      throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
-    case BINARY:
-      return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case INT32:
-      return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case INT64:
-      return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case INT96:
-      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case DOUBLE:
-      return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case FLOAT:
-      return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case FIXED_LEN_BYTE_ARRAY:
-      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
-  }
-
-  private ValuesWriter writerToFallbackTo(ColumnDescriptor path) {
-    switch(writerVersion) {
-    case PARQUET_1_0:
-      return plainWriter(path);
-    case PARQUET_2_0:
-      switch (path.getType()) {
-      case BOOLEAN:
-        return new RunLengthBitPackingHybridValuesWriter(1, initialSlabSize, pageSizeThreshold, allocator);
-      case BINARY:
-      case FIXED_LEN_BYTE_ARRAY:
-        return new DeltaByteArrayWriter(initialSlabSize, pageSizeThreshold, allocator);
-      case INT32:
-        return new DeltaBinaryPackingValuesWriterForInteger(initialSlabSize, pageSizeThreshold, allocator);
-      case INT64:
-        return new DeltaBinaryPackingValuesWriterForLong(initialSlabSize, pageSizeThreshold, allocator);
-      case INT96:
-      case DOUBLE:
-      case FLOAT:
-        return plainWriter(path);
-      default:
-        throw new IllegalArgumentException("Unknown type " + path.getType());
-      }
-    default:
-      throw new IllegalArgumentException("Unknown version: " + writerVersion);
-    }
-  }
-
-  private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path) {
-    ValuesWriter writerToFallBackTo = writerToFallbackTo(path);
-    if (enableDictionary) {
-      return FallbackValuesWriter.of(
-          dictionaryWriter(path),
-          writerToFallBackTo);
-    } else {
-     return writerToFallBackTo;
-    }
-  }
-
   public ValuesWriter newValuesWriter(ColumnDescriptor path) {
-    switch (path.getType()) {
-    case BOOLEAN: // no dictionary encoding for boolean
-      return writerToFallbackTo(path);
-    case FIXED_LEN_BYTE_ARRAY:
-      // dictionary encoding for that type was not enabled in PARQUET 1.0
-      if (writerVersion == WriterVersion.PARQUET_2_0) {
-        return dictWriterWithFallBack(path);
-      } else {
-       return writerToFallbackTo(path);
-      }
-    case BINARY:
-    case INT32:
-    case INT64:
-    case INT96:
-    case DOUBLE:
-    case FLOAT:
-      return dictWriterWithFallBack(path);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
+    return valuesWriterFactory.newValuesWriter(path);
   }
 
   public int getPageSizeThreshold() {
     return pageSizeThreshold;
   }
 
+  public int getInitialSlabSize() {
+    return initialSlabSize;
+  }
+
   public int getDictionaryPageSizeThreshold() {
     return dictionaryPageSizeThreshold;
   }
@@ -302,6 +182,10 @@ public class ParquetProperties {
     return maxRowCountForPageSizeCheck;
   }
 
+  public ValuesWriterFactory getValuesWriterFactory() {
+    return valuesWriterFactory;
+  }
+
   public boolean estimateNextSizeCheck() {
     return estimateNextSizeCheck;
   }
@@ -323,6 +207,7 @@ public class ParquetProperties {
     private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
     private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+    private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
 
     private Builder() {
     }
@@ -411,10 +296,25 @@ public class ParquetProperties {
       return this;
     }
 
+    public Builder withValuesWriterFactory(ValuesWriterFactory factory) {
+      Preconditions.checkNotNull(factory, "ValuesWriterFactory");
+      this.valuesWriterFactory = factory;
+      return this;
+    }
+
     public ParquetProperties build() {
-      return new ParquetProperties(writerVersion, pageSize, dictPageSize,
+      ParquetProperties properties =
+        new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator);
+          estimateNextSizeCheck, allocator, valuesWriterFactory);
+      // we pass a constructed but uninitialized factory to ParquetProperties above as currently
+      // creation of ValuesWriters is invoked from within ParquetProperties. In the future
+      // we'd like to decouple that and won't need to pass an object to properties and then pass the
+      // properties to the object.
+      valuesWriterFactory.initialize(properties);
+
+      return properties;
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
new file mode 100644
index 0000000..ffbd950
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+
+public class DefaultV1ValuesWriterFactory implements ValuesWriterFactory {
+
+  private ParquetProperties parquetProperties;
+
+  @Override
+  public void initialize(ParquetProperties properties) {
+    this.parquetProperties = properties;
+  }
+
+  private Encoding getEncodingForDataPage() {
+    return PLAIN_DICTIONARY;
+  }
+
+  private Encoding getEncodingForDictionaryPage() {
+    return PLAIN_DICTIONARY;
+  }
+
+  @Override
+  public ValuesWriter newValuesWriter(ColumnDescriptor descriptor) {
+    switch (descriptor.getType()) {
+      case BOOLEAN:
+        return getBooleanValuesWriter();
+      case FIXED_LEN_BYTE_ARRAY:
+        return getFixedLenByteArrayValuesWriter(descriptor);
+      case BINARY:
+        return getBinaryValuesWriter(descriptor);
+      case INT32:
+        return getInt32ValuesWriter(descriptor);
+      case INT64:
+        return getInt64ValuesWriter(descriptor);
+      case INT96:
+        return getInt96ValuesWriter(descriptor);
+      case DOUBLE:
+        return getDoubleValuesWriter(descriptor);
+      case FLOAT:
+        return getFloatValuesWriter(descriptor);
+      default:
+        throw new IllegalArgumentException("Unknown type " + descriptor.getType());
+    }
+  }
+
+  private ValuesWriter getBooleanValuesWriter() {
+    // no dictionary encoding for boolean
+    return new BooleanPlainValuesWriter();
+  }
+
+  private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path) {
+    // dictionary encoding was not enabled in PARQUET 1.0
+    return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+  }
+
+  private ValuesWriter getBinaryValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new PlainValuesWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new PlainValuesWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new PlainValuesWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getInt96ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new FixedLenByteArrayPlainValuesWriter(12, parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new PlainValuesWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getFloatValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new PlainValuesWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
new file mode 100644
index 0000000..8348a02
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+
+public class DefaultV2ValuesWriterFactory implements ValuesWriterFactory {
+
+  private ParquetProperties parquetProperties;
+
+  @Override
+  public void initialize(ParquetProperties properties) {
+    this.parquetProperties = properties;
+  }
+
+  private Encoding getEncodingForDataPage() {
+    return RLE_DICTIONARY;
+  }
+
+  private Encoding getEncodingForDictionaryPage() {
+    return PLAIN;
+  }
+
+  @Override
+  public ValuesWriter newValuesWriter(ColumnDescriptor descriptor) {
+    switch (descriptor.getType()) {
+      case BOOLEAN:
+        return getBooleanValuesWriter();
+      case FIXED_LEN_BYTE_ARRAY:
+        return getFixedLenByteArrayValuesWriter(descriptor);
+      case BINARY:
+        return getBinaryValuesWriter(descriptor);
+      case INT32:
+        return getInt32ValuesWriter(descriptor);
+      case INT64:
+        return getInt64ValuesWriter(descriptor);
+      case INT96:
+        return getInt96ValuesWriter(descriptor);
+      case DOUBLE:
+        return getDoubleValuesWriter(descriptor);
+      case FLOAT:
+        return getFloatValuesWriter(descriptor);
+      default:
+        throw new IllegalArgumentException("Unknown type " + descriptor.getType());
+    }
+  }
+
+  private ValuesWriter getBooleanValuesWriter() {
+    // no dictionary encoding for boolean
+    return new RunLengthBitPackingHybridValuesWriter(1, parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+  }
+
+  private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new DeltaByteArrayWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getBinaryValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new DeltaByteArrayWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new DeltaBinaryPackingValuesWriterForInteger(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new DeltaBinaryPackingValuesWriterForLong(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getInt96ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new FixedLenByteArrayPlainValuesWriter(12, parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new PlainValuesWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+
+  private ValuesWriter getFloatValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new PlainValuesWriter(parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
new file mode 100644
index 0000000..6584894
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+
+/**
+ * Handles ValuesWriter creation statically based on the types of the columns and the writer version.
+ */
+public class DefaultValuesWriterFactory implements ValuesWriterFactory {
+
+  private ValuesWriterFactory delegateFactory;
+
+  private static final ValuesWriterFactory DEFAULT_V1_WRITER_FACTORY = new DefaultV1ValuesWriterFactory();
+  private static final ValuesWriterFactory DEFAULT_V2_WRITER_FACTORY = new DefaultV2ValuesWriterFactory();
+
+  @Override
+  public void initialize(ParquetProperties properties) {
+    if (properties.getWriterVersion() == WriterVersion.PARQUET_1_0) {
+      delegateFactory = DEFAULT_V1_WRITER_FACTORY;
+    } else {
+      delegateFactory = DEFAULT_V2_WRITER_FACTORY;
+    }
+
+    delegateFactory.initialize(properties);
+  }
+
+  @Override
+  public ValuesWriter newValuesWriter(ColumnDescriptor descriptor) {
+    return delegateFactory.newValuesWriter(descriptor);
+  }
+
+  static DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, ParquetProperties properties, Encoding dictPageEncoding, Encoding dataPageEncoding) {
+    switch (path.getType()) {
+      case BOOLEAN:
+        throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
+      case BINARY:
+        return new DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case INT32:
+        return new DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case INT64:
+        return new DictionaryValuesWriter.PlainLongDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case INT96:
+        return new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), 12, dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case DOUBLE:
+        return new DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case FLOAT:
+        return new DictionaryValuesWriter.PlainFloatDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case FIXED_LEN_BYTE_ARRAY:
+        return new DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), path.getTypeLength(), dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      default:
+        throw new IllegalArgumentException("Unknown type " + path.getType());
+    }
+  }
+
+  static ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, ParquetProperties parquetProperties, Encoding dictPageEncoding, Encoding dataPageEncoding, ValuesWriter writerToFallBackTo) {
+    if (parquetProperties.isEnableDictionary()) {
+      return FallbackValuesWriter.of(
+        dictionaryWriter(path, parquetProperties, dictPageEncoding, dataPageEncoding),
+        writerToFallBackTo);
+    } else {
+      return writerToFallBackTo;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/values/factory/ValuesWriterFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/ValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/ValuesWriterFactory.java
new file mode 100644
index 0000000..8f06e7b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/ValuesWriterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.ValuesWriter;
+
+/**
+ * Can be overridden to allow users to manually test different strategies to create ValuesWriters.
+ * To do this, the ValuesWriterFactory to be used must be passed to the {@link org.apache.parquet.column.ParquetProperties.Builder}.
+ * <ul>Lifecycle of ValuesWriterFactories is:
+ * <li> Initialized while creating a {@link org.apache.parquet.column.ParquetProperties} using the Builder</li>
+ * <li> If the factory must read Hadoop config, it needs to implement the Configurable interface.
+ * In addition to that, ParquetOutputFormat needs to be updated to pass in the Hadoop config via the setConf()
+ * method on the Configurable interface.</li>
+ * <li> newValuesWriter is called once per column for every block of data.</li>
+ * </ul>
+ */
+public interface ValuesWriterFactory {
+
+  /**
+   * Used to initialize the factory. This method is called before newValuesWriter()
+   */
+  void initialize(ParquetProperties parquetProperties);
+
+  /**
+   * Creates a ValuesWriter to write values for the given column.
+   */
+  ValuesWriter newValuesWriter(ColumnDescriptor descriptor);
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
new file mode 100644
index 0000000..d6865e2
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.*;
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DefaultValuesWriterFactoryTest {
+
+  @Test
+  public void testBoolean() {
+    doTestValueWriter(
+      PrimitiveTypeName.BOOLEAN,
+      WriterVersion.PARQUET_1_0,
+      true,
+      BooleanPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testBoolean_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.BOOLEAN,
+      WriterVersion.PARQUET_2_0,
+      true,
+      RunLengthBitPackingHybridValuesWriter.class);
+  }
+
+  @Test
+  public void testFixedLenByteArray() {
+    doTestValueWriter(
+      PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+      WriterVersion.PARQUET_1_0,
+      true,
+      FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFixedLenByteArray_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+      WriterVersion.PARQUET_2_0,
+      true,
+      DictionaryValuesWriter.class, DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testFixedLenByteArray_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+      WriterVersion.PARQUET_2_0,
+      false,
+      DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testBinary() {
+    doTestValueWriter(
+      PrimitiveTypeName.BINARY,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainBinaryDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testBinary_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.BINARY,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testBinary_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.BINARY,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainBinaryDictionaryValuesWriter.class, DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testBinary_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.BINARY,
+      WriterVersion.PARQUET_2_0,
+      false,
+      DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testInt32() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT32,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainIntegerDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT32,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT32,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainIntegerDictionaryValuesWriter.class, DeltaBinaryPackingValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT32,
+      WriterVersion.PARQUET_2_0,
+      false,
+      DeltaBinaryPackingValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT64,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainLongDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT64,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT64,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainLongDictionaryValuesWriter.class, DeltaBinaryPackingValuesWriterForLong.class);
+  }
+
+  @Test
+  public void testInt64_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT64,
+      WriterVersion.PARQUET_2_0,
+      false,
+      DeltaBinaryPackingValuesWriterForLong.class);
+  }
+
+  @Test
+  public void testInt96() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT96,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainFixedLenArrayDictionaryValuesWriter.class, FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt96_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT96,
+      WriterVersion.PARQUET_1_0,
+      false,
+      FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt96_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT96,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainFixedLenArrayDictionaryValuesWriter.class, FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt96_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT96,
+      WriterVersion.PARQUET_2_0,
+      false,
+      FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testDouble() {
+    doTestValueWriter(
+      PrimitiveTypeName.DOUBLE,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainDoubleDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testDouble_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.DOUBLE,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testDouble_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.DOUBLE,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainDoubleDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testDouble_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.DOUBLE,
+      WriterVersion.PARQUET_2_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFloat() {
+    doTestValueWriter(
+      PrimitiveTypeName.FLOAT,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainFloatDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFloat_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.FLOAT,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFloat_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.FLOAT,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainFloatDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFloat_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.FLOAT,
+      WriterVersion.PARQUET_2_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  private void doTestValueWriter(PrimitiveTypeName typeName, WriterVersion version, boolean enableDictionary, Class<? extends ValuesWriter> expectedValueWriterClass) {
+    ColumnDescriptor mockPath = getMockColumn(typeName);
+    ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary);
+    ValuesWriter writer = factory.newValuesWriter(mockPath);
+
+    validateWriterType(writer, expectedValueWriterClass);
+  }
+
+  private void doTestValueWriter(PrimitiveTypeName typeName, WriterVersion version, boolean enableDictionary, Class<? extends ValuesWriter> initialValueWriterClass, Class<? extends ValuesWriter> fallbackValueWriterClass) {
+    ColumnDescriptor mockPath = getMockColumn(typeName);
+    ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary);
+    ValuesWriter writer = factory.newValuesWriter(mockPath);
+
+    validateFallbackWriter(writer, initialValueWriterClass, fallbackValueWriterClass);
+  }
+
+  private ColumnDescriptor getMockColumn(PrimitiveTypeName typeName) {
+    ColumnDescriptor mockPath = mock(ColumnDescriptor.class);
+    when(mockPath.getType()).thenReturn(typeName);
+    return mockPath;
+  }
+
+  private ValuesWriterFactory getDefaultFactory(WriterVersion writerVersion, boolean enableDictionary) {
+    ValuesWriterFactory factory = new DefaultValuesWriterFactory();
+    ParquetProperties.builder()
+      .withDictionaryEncoding(enableDictionary)
+      .withWriterVersion(writerVersion)
+      .withValuesWriterFactory(factory)
+      .build();
+
+    return factory;
+  }
+
+  private void validateWriterType(ValuesWriter writer, Class<? extends ValuesWriter> valuesWriterClass) {
+    assertTrue("Not instance of: " + valuesWriterClass.getName(), valuesWriterClass.isInstance(writer));
+  }
+
+  private void validateFallbackWriter(ValuesWriter writer, Class<? extends ValuesWriter> initialWriterClass, Class<? extends ValuesWriter> fallbackWriterClass) {
+    validateWriterType(writer, FallbackValuesWriter.class);
+
+    FallbackValuesWriter wr = (FallbackValuesWriter) writer;
+    validateWriterType(wr.initialWriter, initialWriterClass);
+    validateWriterType(wr.fallBackWriter, fallbackWriterClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index 4c43182..79ca7f4 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -83,13 +83,13 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>11.0</version>
+      <version>${guava.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
-      <version>1.9.5</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 6cfa8e9..d05d41f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -317,7 +317,6 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     return conf.getInt(MAX_PADDING_BYTES, DEFAULT_MAX_PADDING_SIZE);
   }
 
-
   private WriteSupport<T> writeSupport;
   private ParquetOutputCommitter committer;
 
@@ -374,16 +373,18 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     int maxPaddingSize = getMaxPaddingSize(conf);
     boolean validating = getValidation(conf);
 
-    if (INFO) LOG.info("Parquet block size to " + blockSize);
-    if (INFO) LOG.info("Parquet page size to " + props.getPageSizeThreshold());
-    if (INFO) LOG.info("Parquet dictionary page size to " + props.getDictionaryPageSizeThreshold());
-    if (INFO) LOG.info("Dictionary is " + (props.isEnableDictionary() ? "on" : "off"));
-    if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
-    if (INFO) LOG.info("Writer version is: " + props.getWriterVersion());
-    if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes");
-    if (INFO) LOG.info("Page size checking is: " + (props.estimateNextSizeCheck() ? "estimated" : "constant"));
-    if (INFO) LOG.info("Min row count for page size check is: " + props.getMinRowCountForPageSizeCheck());
-    if (INFO) LOG.info("Max row count for page size check is: " + props.getMaxRowCountForPageSizeCheck());
+    if (INFO) {
+      LOG.info("Parquet block size to " + blockSize);
+      LOG.info("Parquet page size to " + props.getPageSizeThreshold());
+      LOG.info("Parquet dictionary page size to " + props.getDictionaryPageSizeThreshold());
+      LOG.info("Dictionary is " + (props.isEnableDictionary() ? "on" : "off"));
+      LOG.info("Validation is " + (validating ? "on" : "off"));
+      LOG.info("Writer version is: " + props.getWriterVersion());
+      LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes");
+      LOG.info("Page size checking is: " + (props.estimateNextSizeCheck() ? "estimated" : "constant"));
+      LOG.info("Min row count for page size check is: " + props.getMinRowCountForPageSizeCheck());
+      LOG.info("Max row count for page size check is: " + props.getMaxRowCountForPageSizeCheck());
+    }
 
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-pig/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml
index 9b6371e..4142c3b 100644
--- a/parquet-pig/pom.xml
+++ b/parquet-pig/pom.xml
@@ -101,7 +101,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>11.0</version>
+      <version>${guava.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml
index b3e4e50..0c9cae4 100644
--- a/parquet-protobuf/pom.xml
+++ b/parquet-protobuf/pom.xml
@@ -42,7 +42,7 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
-      <version>1.9.5</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-tools/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-tools/pom.xml b/parquet-tools/pom.xml
index 5d3f7c9..66abaa9 100644
--- a/parquet-tools/pom.xml
+++ b/parquet-tools/pom.xml
@@ -70,7 +70,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>11.0</version>
+      <version>${guava.version}</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 510c329..ccc4675 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,8 @@
     <semver.api.version>0.9.33</semver.api.version>
     <slf4j.version>1.7.5</slf4j.version>
     <avro.version>1.8.0</avro.version>
+    <guava.version>11.0</guava.version>
+    <mockito.version>1.9.5</mockito.version>
   </properties>
 
   <modules>


Mime
View raw message