parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alexleven...@apache.org
Subject [2/2] parquet-mr git commit: PARQUET-251: Binary column statistics error when reuse byte[] among rows
Date Wed, 01 Jul 2015 01:34:28 GMT
PARQUET-251: Binary column statistics error when reuse byte[] among rows

Author: asingh <asingh@cloudera.com>
Author: Alex Levenson <alexlevenson@twitter.com>
Author: Ashish Singh <asingh@cloudera.com>

Closes #197 from SinghAsDev/PARQUET-251 and squashes the following commits:

68e0eae [asingh] Remove deprecated constructors from private classes
67e4e5f [asingh] Add removed public methods in Binary and deprecate them
0e71728 [asingh] Add comment for BinaryStatistics.setMinMaxFromBytes
fbe873f [Ashish Singh] Merge pull request #4 from isnotinvain/PR-197-3
9826ee6 [Alex Levenson] Some minor cleanup
7570035 [asingh] Remove test for stats getting ingnored for version 160 when type is int64
af43d28 [Alex Levenson] Address PR feedback
89ab4ee [Alex Levenson] put the headers in the right location
2838cc9 [Alex Levenson] Split out version checks to separate files, add some tests
5af9142 [Alex Levenson] Generalize tests, make Binary.fromString reused=false
e00d9b7 [asingh] Rename isReused => isBackingBytesReused
d2ad939 [asingh] Rebase over latest trunk
857141a [asingh] Remove redundant junit dependency
32b88ed [asingh] Remove semver from hadoop-common
7a0e99e [asingh] Revert to fromConstantByteArray for ByteString
c820ec9 [asingh] Add unit tests for Binary and to check if stats are ignored for version 160
9bbd1e5 [asingh] Improve version parsing
84a1d8b [asingh] Remove ignoring stats on write side and ignore it on read side
903f8e3 [asingh] Address some review comments. * Ignore stats for writer's version < 1.8.0 * Refactor shoudlIgnoreStatistics method a bit * Assume implementations other than parquet-mr were writing binary   statistics correctly * Add toParquetStatistics method's original method signature to maintain   backwards compatibility and mark it as deprecated
64c2617 [asingh] Revert changes for ignoring stats at RowGroupFilter level
e861b18 [asingh] Ignore max min stats while reading
3a8cb8d [asingh] Fix typo
8e12618 [asingh] Fix usage of fromConstant versions of Binary constructors
860adf7 [asingh] Rename unmodified to constant and isReused instead of isUnmodifiable
0d127a7 [asingh] Add unmodfied and Reused versions for creating a Binary. Add copy() to Binary.
b4e2950 [asingh] Skip filtering based on stats when file was written with version older than 1.6.1
6fcee8c [asingh] Add getBytesUnsafe() to Binary that returns backing byte[] if possible, else returns result of getBytes()
30b07dd [asingh] PARQUET-251: Binary column statistics error when reuse byte[] among rows


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/e3b95020
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/e3b95020
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/e3b95020

Branch: refs/heads/master
Commit: e3b95020f777eb5e0651977f654c1662e3ea1f29
Parents: e6ee42e
Author: asingh <asingh@cloudera.com>
Authored: Tue Jun 30 18:34:48 2015 -0700
Committer: Alex Levenson <alexlevenson@twitter.com>
Committed: Tue Jun 30 18:34:48 2015 -0700

----------------------------------------------------------------------
 .../apache/parquet/avro/AvroWriteSupport.java   |   8 +-
 .../org/apache/parquet/avro/TestReadWrite.java  |   5 +-
 .../avro/TestReadWriteOldListBehavior.java      |   4 +-
 .../parquet/benchmarks/DataGenerator.java       |   2 +-
 .../org/apache/parquet/CorruptStatistics.java   | 104 +++++++++
 .../column/statistics/BinaryStatistics.java     |  18 +-
 .../DeltaLengthByteArrayValuesReader.java       |   2 +-
 .../DeltaLengthByteArrayValuesWriter.java       |   2 +-
 .../deltastrings/DeltaByteArrayReader.java      |   8 +-
 .../deltastrings/DeltaByteArrayWriter.java      |   2 +-
 .../dictionary/DictionaryValuesWriter.java      |   9 +-
 .../dictionary/PlainValuesDictionary.java       |   4 +-
 .../values/plain/BinaryPlainValuesReader.java   |   2 +-
 .../FixedLenByteArrayPlainValuesReader.java     |   2 +-
 .../parquet/example/data/simple/NanoTime.java   |   2 +-
 .../io/RecordConsumerLoggingWrapper.java        |   2 +-
 .../java/org/apache/parquet/io/api/Binary.java  | 169 +++++++++++++--
 .../apache/parquet/schema/PrimitiveType.java    |   2 +-
 .../apache/parquet/CorruptStatisticsTest.java   |  78 +++++++
 .../column/statistics/TestStatistics.java       |  18 ++
 .../values/dictionary/TestDictionary.java       |   8 +-
 .../org/apache/parquet/io/api/TestBinary.java   | 215 +++++++++++++++++++
 .../org/apache/parquet/SemanticVersion.java     | 133 ++++++++++++
 .../java/org/apache/parquet/VersionParser.java  | 105 +++++++++
 .../org/apache/parquet/SemanticVersionTest.java |  53 +++++
 .../java/org/apache/parquet/VersionTest.java    |  44 +++-
 .../converter/ParquetMetadataConverter.java     |  41 +++-
 .../hadoop/InternalParquetRecordReader.java     |   6 +-
 .../parquet/hadoop/ParquetFileReader.java       |  37 +++-
 .../apache/parquet/hadoop/ParquetReader.java    |   2 +-
 .../parquet/hadoop/ParquetRecordReader.java     |   3 +-
 .../hadoop/TestColumnChunkPageWriteStore.java   |   3 +-
 .../parquet/hadoop/TestParquetFileWriter.java   |  27 ++-
 .../parquet/hadoop/TestParquetWriter.java       |   6 +-
 .../hadoop/TestParquetWriterNewPage.java        |   5 +-
 .../ql/io/parquet/writable/BinaryWritable.java  |   2 +-
 .../primitive/ParquetStringInspector.java       |   3 +-
 .../apache/parquet/pig/TupleWriteSupport.java   |   2 +-
 .../apache/parquet/proto/ProtoWriteSupport.java |   2 +-
 .../parquet/proto/ProtoWriteSupportTest.java    |  12 +-
 .../parquet/thrift/ParquetWriteProtocol.java    |   3 +-
 .../parquet/tools/command/DumpCommand.java      |  10 +-
 42 files changed, 1044 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index e86c579..35e3924 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -252,9 +252,9 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
       recordConsumer.addDouble(((Number) value).doubleValue());
     } else if (avroType.equals(Schema.Type.BYTES)) {
       if (value instanceof byte[]) {
-        recordConsumer.addBinary(Binary.fromByteArray((byte[]) value));
+        recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
       } else {
-        recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) value));
+        recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
       }
     } else if (avroType.equals(Schema.Type.STRING)) {
       recordConsumer.addBinary(fromAvroString(value));
@@ -269,14 +269,14 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
     } else if (avroType.equals(Schema.Type.UNION)) {
       writeUnion(type.asGroupType(), nonNullAvroSchema, value);
     } else if (avroType.equals(Schema.Type.FIXED)) {
-      recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes()));
+      recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
     }
   }
 
   private Binary fromAvroString(Object value) {
     if (value instanceof Utf8) {
       Utf8 utf8 = (Utf8) value;
-      return Binary.fromByteArray(utf8.getBytes(), 0, utf8.getByteLength());
+      return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
     }
     return Binary.fromString(value.toString());
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 4d37f40..855a5b1 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -362,7 +362,8 @@ public class TestReadWrite {
         recordConsumer.endField("mydouble", index++);
 
         recordConsumer.startField("mybytes", index);
-        recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
+        recordConsumer.addBinary(
+            Binary.fromReusedByteBuffer((ByteBuffer) record.get("mybytes")));
         recordConsumer.endField("mybytes", index++);
 
         recordConsumer.startField("mystring", index);
@@ -457,7 +458,7 @@ public class TestReadWrite {
         recordConsumer.endField("mymap", index++);
 
         recordConsumer.startField("myfixed", index);
-        recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
+        recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) record.get("myfixed")));
         recordConsumer.endField("myfixed", index++);
 
         recordConsumer.endMessage();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
index 34a160a..7c2bc27 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
@@ -404,7 +404,7 @@ public class TestReadWriteOldListBehavior {
         recordConsumer.endField("mydouble", index++);
 
         recordConsumer.startField("mybytes", index);
-        recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
+        recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) record.get("mybytes")));
         recordConsumer.endField("mybytes", index++);
 
         recordConsumer.startField("mystring", index);
@@ -499,7 +499,7 @@ public class TestReadWriteOldListBehavior {
         recordConsumer.endField("mymap", index++);
 
         recordConsumer.startField("myfixed", index);
-        recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
+        recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) record.get("myfixed")));
         recordConsumer.endField("myfixed", index++);
 
         recordConsumer.endMessage();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
----------------------------------------------------------------------
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
index 05c35bd..42d9953 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
@@ -107,7 +107,7 @@ public class DataGenerator {
           .append("float_field", 1.0f)
           .append("double_field", 2.0d)
           .append("flba_field", new String(chars))
-          .append("int96_field", Binary.fromByteArray(new byte[12]))
+          .append("int96_field", Binary.fromConstantByteArray(new byte[12]))
       );
     }
     writer.close();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java b/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
new file mode 100644
index 0000000..8e15d01
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import org.apache.parquet.SemanticVersion.SemanticVersionParseException;
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.VersionParser.VersionParseException;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * There was a bug (PARQUET-251) that caused the statistics metadata
+ * for binary columns to be corrupted in the write path.
+ *
+ * This class is used to detect whether a file was written with this bug,
+ * and thus it's statistics should be ignored / not trusted.
+ */
+public class CorruptStatistics {
+  private static final Log LOG = Log.getLog(CorruptStatistics.class);
+
+  // the version in which the bug described by jira: PARQUET-251 was fixed
+  // the bug involved writing invalid binary statistics, so stats written prior to this
+  // fix must be ignored / assumed invalid
+  private static final SemanticVersion PARQUET_251_FIXED_VERSION = new SemanticVersion(1, 8, 0);
+
+  /**
+   * Decides if the statistics from a file created by createdBy (the created_by field from parquet format)
+   * should be ignored because they are potentially corrupt.
+   */
+  public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName columnType) {
+
+    if (columnType != PrimitiveTypeName.BINARY && columnType != PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+      // the bug only applies to binary columns
+      return false;
+    }
+
+    if (Strings.isNullOrEmpty(createdBy)) {
+      // created_by is not populated, which could have been caused by
+      // parquet-mr during the same time as PARQUET-251, see PARQUET-297
+      LOG.info("Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297");
+      return true;
+    }
+
+    try {
+      ParsedVersion version = VersionParser.parse(createdBy);
+
+      if (!"parquet-mr".equals(version.application)) {
+        // assume other applications don't have this bug
+        return false;
+      }
+
+      if (Strings.isNullOrEmpty(version.semver)) {
+        LOG.warn("Ignoring statistics because created_by did not contain a semver (see PARQUET-251): " + createdBy);
+        return true;
+      }
+
+      SemanticVersion semver = SemanticVersion.parse(version.semver);
+
+      if (semver.compareTo(PARQUET_251_FIXED_VERSION) < 0) {
+        LOG.info("Ignoring statistics because this file was created prior to "
+            + PARQUET_251_FIXED_VERSION
+            + ", see PARQUET-251" );
+        return true;
+      }
+
+      // this file was created after the fix
+      return false;
+    } catch (RuntimeException e) {
+      // couldn't parse the created_by field, log what went wrong, don't trust the stats,
+      // but don't make this fatal.
+      warnParseError(createdBy, e);
+      return true;
+    } catch (SemanticVersionParseException e) {
+      // couldn't parse the created_by field, log what went wrong, don't trust the stats,
+      // but don't make this fatal.
+      warnParseError(createdBy, e);
+      return true;
+    } catch (VersionParseException e) {
+      // couldn't parse the created_by field, log what went wrong, don't trust the stats,
+      // but don't make this fatal.
+      warnParseError(createdBy, e);
+      return true;
+    }
+  }
+
+  private static void warnParseError(String createdBy, Throwable e) {
+    LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
index 6341f96..e8439f0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
@@ -44,10 +44,16 @@ public class BinaryStatistics extends Statistics<Binary> {
     }
   }
 
+  /**
+   * Sets min and max values, re-uses the byte[] passed in.
+   * Any changes made to byte[] will be reflected in min and max values as well.
+   * @param minBytes byte array to set the min value to
+   * @param maxBytes byte array to set the max value to
+   */
   @Override
   public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
-    max = Binary.fromByteArray(maxBytes);
-    min = Binary.fromByteArray(minBytes);
+    max = Binary.fromReusedByteArray(maxBytes);
+    min = Binary.fromReusedByteArray(minBytes);
     this.markAsNotEmpty();
   }
 
@@ -72,13 +78,13 @@ public class BinaryStatistics extends Statistics<Binary> {
   }
 
   public void updateStats(Binary min_value, Binary max_value) {
-    if (min.compareTo(min_value) > 0) { min = min_value; }
-    if (max.compareTo(max_value) < 0) { max = max_value; }
+    if (min.compareTo(min_value) > 0) { min = min_value.copy(); }
+    if (max.compareTo(max_value) < 0) { max = max_value.copy(); }
   }
 
   public void initializeStats(Binary min_value, Binary max_value) {
-      min = min_value;
-      max = max_value;
+      min = min_value.copy();
+      max = max_value.copy();
       this.markAsNotEmpty();
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index 2db1336..fb9bdc5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -59,7 +59,7 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {
     int length = lengthReader.readInteger();
     int start = offset;
     offset = start + length;
-    return Binary.fromByteArray(in, start, length);
+    return Binary.fromConstantByteArray(in, start, length);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
index 0a498b1..3f686cc 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
@@ -61,7 +61,7 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
   public void writeBytes(Binary v) {
     try {
       lengthWriter.writeInteger(v.length());
-      out.write(v.getBytes());
+      v.writeTo(out);
     } catch (IOException e) {
       throw new ParquetEncodingException("could not write bytes", e);
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
index de3df02..fd55035 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -40,7 +40,7 @@ public class DeltaByteArrayReader extends ValuesReader {
   public DeltaByteArrayReader() {
     this.prefixLengthReader = new DeltaBinaryPackingValuesReader();
     this.suffixReader = new DeltaLengthByteArrayValuesReader();
-    this.previous = Binary.fromByteArray(new byte[0]);
+    this.previous = Binary.fromConstantByteArray(new byte[0]);
   }
 
   @Override
@@ -67,9 +67,9 @@ public class DeltaByteArrayReader extends ValuesReader {
     // We have to do this to materialize the output
     if(prefixLength != 0) {
       byte[] out = new byte[length];
-      System.arraycopy(previous.getBytes(), 0, out, 0, prefixLength);
-      System.arraycopy(suffix.getBytes(), 0, out, prefixLength, suffix.length());
-      previous =  Binary.fromByteArray(out);
+      System.arraycopy(previous.getBytesUnsafe(), 0, out, 0, prefixLength);
+      System.arraycopy(suffix.getBytesUnsafe(), 0, out, prefixLength, suffix.length());
+      previous =  Binary.fromConstantByteArray(out);
     } else {
       previous = suffix;
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
index eb33d30..54234db 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
@@ -87,7 +87,7 @@ public class DeltaByteArrayWriter extends ValuesWriter{
     int length = previous.length < vb.length ? previous.length : vb.length;
     for(i = 0; (i < length) && (previous[i] == vb[i]); i++);
     prefixLengthWriter.writeInteger(i);
-    suffixWriter.writeBytes(Binary.fromByteArray(vb, i, vb.length - i));
+    suffixWriter.writeBytes(v.slice(i, vb.length - i));
     previous = vb;
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
index 928c125..eb9fdd9 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
@@ -235,7 +235,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
       int id = binaryDictionaryContent.getInt(v);
       if (id == -1) {
         id = binaryDictionaryContent.size();
-        binaryDictionaryContent.put(copy(v), id);
+        binaryDictionaryContent.put(v.copy(), id);
         // length as int (4 bytes) + actual bytes
         dictionaryByteSize += 4 + v.length();
       }
@@ -283,11 +283,6 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
         writer.writeBytes(reverseDictionary[id]);
       }
     }
-
-    protected static Binary copy(Binary binary) {
-      return Binary.fromByteArray(
-          Arrays.copyOf(binary.getBytes(), binary.length()));
-    }
   }
 
   /**
@@ -311,7 +306,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
       int id = binaryDictionaryContent.getInt(value);
       if (id == -1) {
         id = binaryDictionaryContent.size();
-        binaryDictionaryContent.put(copy(value), id);
+        binaryDictionaryContent.put(value.copy(), id);
         dictionaryByteSize += length;
       }
       encodedValues.add(id);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
index 055dd73..e671310 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -96,7 +96,7 @@ public abstract class PlainValuesDictionary extends Dictionary {
           // read the length
           offset += 4;
           // wrap the content in a binary
-          binaryDictionaryContent[i] = Binary.fromByteArray(dictionaryBytes, offset, len);
+          binaryDictionaryContent[i] = Binary.fromConstantByteArray(dictionaryBytes, offset, len);
           // increment to the next value
           offset += len;
         }
@@ -106,7 +106,7 @@ public abstract class PlainValuesDictionary extends Dictionary {
             "Invalid byte array length: " + length);
         for (int i = 0; i < binaryDictionaryContent.length; i++) {
           // wrap the content in a Binary
-          binaryDictionaryContent[i] = Binary.fromByteArray(
+          binaryDictionaryContent[i] = Binary.fromConstantByteArray(
               dictionaryBytes, offset, length);
           // increment to the next value
           offset += length;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
index f567803..4346e02 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -39,7 +39,7 @@ public class BinaryPlainValuesReader extends ValuesReader {
       int length = BytesUtils.readIntLittleEndian(in, offset);
       int start = offset + 4;
       offset = start + length;
-      return Binary.fromByteArray(in, start, length);
+      return Binary.fromConstantByteArray(in, start, length);
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
     } catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 3a7d245..098a486 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -46,7 +46,7 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
     try {
       int start = offset;
       offset = start + length;
-      return Binary.fromByteArray(in, start, length);
+      return Binary.fromConstantByteArray(in, start, length);
     } catch (RuntimeException e) {
       throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java b/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
index 61eff42..f8f3979 100644
--- a/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
+++ b/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
@@ -61,7 +61,7 @@ public class NanoTime extends Primitive {
     buf.putLong(timeOfDayNanos);
     buf.putInt(julianDay);
     buf.flip();
-    return Binary.fromByteBuffer(buf);
+    return Binary.fromConstantByteBuffer(buf);
   }
 
   public Int96Value toInt96() {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
index f753b2a..642c1f4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
@@ -112,7 +112,7 @@ public class RecordConsumerLoggingWrapper extends RecordConsumer {
      */
     @Override
     public void addBinary(Binary value) {
-      if (DEBUG) log(Arrays.toString(value.getBytes()));
+      if (DEBUG) log(Arrays.toString(value.getBytesUnsafe()));
       delegate.addBinary(value);
     }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index d3abec5..f88d740 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -33,10 +33,12 @@ import static org.apache.parquet.bytes.BytesUtils.UTF8;
 
 abstract public class Binary implements Comparable<Binary>, Serializable {
 
+  protected boolean isBackingBytesReused;
+
   // this isn't really something others should extend
   private Binary() { }
 
-  public static final Binary EMPTY = fromByteArray(new byte[0]);
+  public static final Binary EMPTY = fromConstantByteArray(new byte[0]);
 
   abstract public String toStringUsingUTF8();
 
@@ -48,6 +50,16 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   abstract public byte[] getBytes();
 
+  /**
+   * Variant of getBytes() that avoids copying backing data structure by returning
+   * backing byte[] of the Binary. Do not modify backing byte[] unless you know what
+   * you are doing.
+   * @return backing byte[] of correct size, with an offset of 0, if possible, else returns result of getBytes()
+   */
+  abstract public byte[] getBytesUnsafe();
+
+  abstract public Binary slice(int start, int length);
+
   abstract boolean equals(byte[] bytes, int offset, int length);
 
   abstract boolean equals(Binary other);
@@ -71,7 +83,28 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   @Override
   public String toString() {
-    return "Binary{" + length() + " bytes, " + Arrays.toString(getBytes()) + "}";
+    return "Binary{" +
+        length() +
+        (isBackingBytesReused ? " reused": " constant") +
+        " bytes, " +
+        Arrays.toString(getBytesUnsafe())
+        + "}";
+  }
+
+  public Binary copy() {
+    if (isBackingBytesReused) {
+      return Binary.fromConstantByteArray(getBytes());
+    } else {
+      return this;
+    }
+  }
+
+  /**
+   * Signals if backing bytes are owned, and can be modified, by producer of the Binary
+   * @return if backing bytes are held on by producer of the Binary
+   */
+  public boolean isBackingBytesReused() {
+    return isBackingBytesReused;
   }
 
   private static class ByteArraySliceBackedBinary extends Binary {
@@ -79,10 +112,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     private final int offset;
     private final int length;
 
-    public ByteArraySliceBackedBinary(byte[] value, int offset, int length) {
+    public ByteArraySliceBackedBinary(byte[] value, int offset, int length, boolean isBackingBytesReused) {
       this.value = value;
       this.offset = offset;
       this.length = length;
+      this.isBackingBytesReused = isBackingBytesReused;
     }
 
     @Override
@@ -110,6 +144,21 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     }
 
     @Override
+    public byte[] getBytesUnsafe() {
+      // Backing array is larger than the slice used for this Binary.
+      return getBytes();
+    }
+
+    @Override
+    public Binary slice(int start, int length) {
+      if (isBackingBytesReused) {
+        return Binary.fromReusedByteArray(value, offset + start, length);
+      } else {
+        return Binary.fromConstantByteArray(value, offset + start, length);
+      }
+    }
+
+    @Override
     public int hashCode() {
       return Binary.hashCode(value, offset, length);
     }
@@ -147,8 +196,19 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
   }
 
   private static class FromStringBinary extends ByteArrayBackedBinary {
-    public FromStringBinary(byte[] value) {
-      super(value);
+    public FromStringBinary(String value) {
+      // reused is false, because we do not
+      // hold on to the underlying bytes,
+      // and nobody else has a handle to them
+      super(encodeUTF8(value), false);
+    }
+
+    private static byte[] encodeUTF8(String value) {
+      try {
+        return value.getBytes("UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        throw new ParquetEncodingException("UTF-8 not supported.", e);
+      }
     }
 
     @Override
@@ -157,15 +217,29 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     }
   }
 
+  public static Binary fromReusedByteArray(final byte[] value, final int offset, final int length) {
+    return new ByteArraySliceBackedBinary(value, offset, length, true);
+  }
+
+  public static Binary fromConstantByteArray(final byte[] value, final int offset,
+                                             final int length) {
+    return new ByteArraySliceBackedBinary(value, offset, length, false);
+  }
+
+  @Deprecated
+  /**
+   * @deprecated Use @link{fromReusedByteArray} or @link{fromConstantByteArray} instead
+   */
   public static Binary fromByteArray(final byte[] value, final int offset, final int length) {
-    return new ByteArraySliceBackedBinary(value, offset, length);
+    return fromReusedByteArray(value, offset, length); // Assume producer intends to reuse byte[]
   }
 
   private static class ByteArrayBackedBinary extends Binary {
     private final byte[] value;
 
-    public ByteArrayBackedBinary(byte[] value) {
+    public ByteArrayBackedBinary(byte[] value, boolean isBackingBytesReused) {
       this.value = value;
+      this.isBackingBytesReused = isBackingBytesReused;
     }
 
     @Override
@@ -185,10 +259,24 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
     @Override
     public byte[] getBytes() {
+      return Arrays.copyOfRange(value, 0, value.length);
+    }
+
+    @Override
+    public byte[] getBytesUnsafe() {
       return value;
     }
 
     @Override
+    public Binary slice(int start, int length) {
+      if (isBackingBytesReused) {
+        return Binary.fromReusedByteArray(value, start, length);
+      } else {
+        return Binary.fromConstantByteArray(value, start, length);
+      }
+    }
+
+    @Override
     public int hashCode() {
       return Binary.hashCode(value, 0, value.length);
     }
@@ -225,15 +313,29 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   }
 
+  public static Binary fromReusedByteArray(final byte[] value) {
+    return new ByteArrayBackedBinary(value, true);
+  }
+
+  public static Binary fromConstantByteArray(final byte[] value) {
+    return new ByteArrayBackedBinary(value, false);
+  }
+
+  @Deprecated
+  /**
+   * @deprecated Use @link{fromReusedByteArray} or @link{fromConstantByteArray} instead
+   */
   public static Binary fromByteArray(final byte[] value) {
-    return new ByteArrayBackedBinary(value);
+    return fromReusedByteArray(value); // Assume producer intends to reuse byte[]
   }
 
   private static class ByteBufferBackedBinary extends Binary {
     private transient ByteBuffer value;
+    private transient byte[] cachedBytes;
 
-    public ByteBufferBackedBinary(ByteBuffer value) {
+    public ByteBufferBackedBinary(ByteBuffer value, boolean isBackingBytesReused) {
       this.value = value;
+      this.isBackingBytesReused = isBackingBytesReused;
     }
 
     @Override
@@ -249,7 +351,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     @Override
     public void writeTo(OutputStream out) throws IOException {
       // TODO: should not have to materialize those bytes
-      out.write(getBytes());
+      out.write(getBytesUnsafe());
     }
 
     @Override
@@ -258,16 +360,29 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
       value.mark();
       value.get(bytes).reset();
+      if (!isBackingBytesReused) { // backing buffer might change
+        cachedBytes = bytes;
+      }
       return bytes;
     }
 
     @Override
+    public byte[] getBytesUnsafe() {
+      return cachedBytes != null ? cachedBytes : getBytes();
+    }
+
+    @Override
+    public Binary slice(int start, int length) {
+      return Binary.fromConstantByteArray(getBytesUnsafe(), start, length);
+    }
+
+    @Override
     public int hashCode() {
       if (value.hasArray()) {
         return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
             value.arrayOffset() + value.remaining());
       }
-      byte[] bytes = getBytes();
+      byte[] bytes = getBytesUnsafe();
       return Binary.hashCode(bytes, 0, bytes.length);
     }
 
@@ -277,7 +392,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
         return other.equals(value.array(), value.arrayOffset() + value.position(),
             value.arrayOffset() + value.remaining());
       }
-      byte[] bytes = getBytes();
+      byte[] bytes = getBytesUnsafe();
       return other.equals(bytes, 0, bytes.length);
     }
 
@@ -287,7 +402,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
         return Binary.equals(value.array(), value.arrayOffset() + value.position(),
             value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
       }
-      byte[] bytes = getBytes();
+      byte[] bytes = getBytesUnsafe();
       return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
     }
 
@@ -297,7 +412,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
         return other.compareTo(value.array(), value.arrayOffset() + value.position(),
             value.arrayOffset() + value.remaining());
       }
-      byte[] bytes = getBytes();
+      byte[] bytes = getBytesUnsafe();
       return other.compareTo(bytes, 0, bytes.length);
     }
 
@@ -307,7 +422,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
         return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
             value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
       }
-      byte[] bytes = getBytes();
+      byte[] bytes = getBytesUnsafe();
       return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
     }
 
@@ -319,11 +434,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     @Override
     public void writeTo(DataOutput out) throws IOException {
       // TODO: should not have to materialize those bytes
-      out.write(getBytes());
+      out.write(getBytesUnsafe());
     }
 
     private void writeObject(java.io.ObjectOutputStream out) throws IOException {
-      byte[] bytes = getBytes();
+      byte[] bytes = getBytesUnsafe();
       out.writeInt(bytes.length);
       out.write(bytes);
     }
@@ -341,16 +456,24 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   }
 
+  public static Binary fromReusedByteBuffer(final ByteBuffer value) {
+    return new ByteBufferBackedBinary(value, true);
+  }
+
+  public static Binary fromConstantByteBuffer(final ByteBuffer value) {
+    return new ByteBufferBackedBinary(value, false);
+  }
+
+  @Deprecated
+  /**
+   * @deprecated Use @link{fromReusedByteBuffer} or @link{fromConstantByteBuffer} instead
+   */
   public static Binary fromByteBuffer(final ByteBuffer value) {
-    return new ByteBufferBackedBinary(value);
+    return fromReusedByteBuffer(value); // Assume producer intends to reuse byte[]
   }
 
   public static Binary fromString(final String value) {
-    try {
-      return new FromStringBinary(value.getBytes("UTF-8"));
-    } catch (UnsupportedEncodingException e) {
-      throw new ParquetEncodingException("UTF-8 not supported.", e);
-    }
+    return new FromStringBinary(value);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index e8d98c0..7988f4a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -204,7 +204,7 @@ public final class PrimitiveType extends Type {
     INT96("getBinary", Binary.class) {
       @Override
       public String toString(ColumnReader columnReader) {
-        return Arrays.toString(columnReader.getBinary().getBytes());
+        return Arrays.toString(columnReader.getBinary().getBytesUnsafe());
       }
       @Override
       public void addValueToRecordConsumer(RecordConsumer recordConsumer,

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/test/java/org/apache/parquet/CorruptStatisticsTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/CorruptStatisticsTest.java b/parquet-column/src/test/java/org/apache/parquet/CorruptStatisticsTest.java
new file mode 100644
index 0000000..084d63a
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/CorruptStatisticsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CorruptStatisticsTest {
+
+  @Test
+  public void testOnlyAppliesToBinary() {
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build abcd)", PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build abcd)", PrimitiveTypeName.DOUBLE));
+  }
+
+  @Test
+  public void testCorruptStatistics() {
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.4.2 (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.100 (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.7.999 (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.22rc99 (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.22rc99-SNAPSHOT (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.1-SNAPSHOT (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0t-01-abcdefg (build abcd)", PrimitiveTypeName.BINARY));
+
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("unparseable string", PrimitiveTypeName.BINARY));
+
+    // missing semver
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version (build abcd)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version  (build abcd)", PrimitiveTypeName.BINARY));
+
+    // missing build hash
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build )", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build)", PrimitiveTypeName.BINARY));
+    assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version (build)", PrimitiveTypeName.BINARY));
+
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("imapla version 1.6.0 (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("imapla version 1.10.0 (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.8.0 (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.8.1 (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.8.1rc3 (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.8.1rc3-SNAPSHOT (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.9.0 (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 2.0.0 (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.9.0t-01-abcdefg (build abcd)", PrimitiveTypeName.BINARY));
+
+    // missing semver
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version (build abcd)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version  (build abcd)", PrimitiveTypeName.BINARY));
+
+    // missing build hash
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version 1.6.0 (build )", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version 1.6.0 (build)", PrimitiveTypeName.BINARY));
+    assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version (build)", PrimitiveTypeName.BINARY));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
index e0c6a10..128acb4 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
@@ -384,6 +384,24 @@ public class TestStatistics {
   }
 
   @Test
+  public void testBinaryMinMaxForReusedBackingByteArray() {
+    BinaryStatistics stats = new BinaryStatistics();
+
+    byte[] bytes = new byte[] { 10 };
+    final Binary value = Binary.fromReusedByteArray(bytes);
+    stats.updateStats(value);
+
+    bytes[0] = 20;
+    stats.updateStats(value);
+
+    bytes[0] = 15;
+    stats.updateStats(value);
+
+    assertArrayEquals(new byte[] { 20 }, stats.getMaxBytes());
+    assertArrayEquals(new byte[] { 10 }, stats.getMinBytes());
+  }
+
+  @Test
   public void testMergingStatistics() {
     testMergingIntStats();
     testMergingLongStats();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
index e60b3ec..020868e 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
@@ -27,6 +27,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -512,11 +513,12 @@ public class TestDictionary {
     }
   }
 
-  private void writeRepeatedWithReuse(int COUNT, ValuesWriter cw, String prefix) {
-    Binary reused = Binary.fromString(prefix + "0");
+  private void writeRepeatedWithReuse(int COUNT, ValuesWriter cw,
+                                      String prefix) throws UnsupportedEncodingException {
+    Binary reused = Binary.fromReusedByteArray((prefix + "0").getBytes("UTF-8"));
     for (int i = 0; i < COUNT; i++) {
       Binary content = Binary.fromString(prefix + i % 10);
-      System.arraycopy(content.getBytes(), 0, reused.getBytes(), 0, reused.length());
+      System.arraycopy(content.getBytesUnsafe(), 0, reused.getBytesUnsafe(), 0, reused.length());
       cw.writeBytes(reused);
     }
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
new file mode 100644
index 0000000..bd8a69d
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.parquet.io.api.TestBinary.BinaryFactory.BinaryAndOriginal;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+public class TestBinary {
+
+  private static final String testString = "test-123";
+  private static final String UTF8 = "UTF-8";
+
+  static interface BinaryFactory {
+    static class BinaryAndOriginal {
+      public Binary binary;
+      public byte[] original;
+
+      public BinaryAndOriginal(Binary binary, byte[] original) {
+        this.binary = binary;
+        this.original = original;
+      }
+    }
+
+    BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception;
+  }
+
+  private static void mutate(byte[] bytes) {
+    for (int i = 0; i < bytes.length; i++) {
+      bytes[i] = (byte) (bytes[i] + 1);
+    }
+  }
+
+  private static final BinaryFactory BYTE_ARRAY_BACKED_BF = new BinaryFactory() {
+    @Override
+    public BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception {
+      byte[] orig = Arrays.copyOf(bytes, bytes.length);
+      if (reused) {
+        return new BinaryAndOriginal(Binary.fromReusedByteArray(orig), orig);
+      } else {
+        return new BinaryAndOriginal(Binary.fromConstantByteArray(orig), orig);
+      }
+    }
+  };
+
+  private static final BinaryFactory BYTE_ARRAY_SLICE_BACKED_BF = new BinaryFactory() {
+    @Override
+    public BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception {
+      byte [] orig = padded(bytes);
+      Binary b;
+      if (reused) {
+        b = Binary.fromReusedByteArray(orig, 5, bytes.length);
+      } else {
+        b = Binary.fromConstantByteArray(orig, 5, bytes.length);
+      }
+      assertArrayEquals(bytes, b.getBytes());
+      return new BinaryAndOriginal(b, orig);
+    }
+  };
+
+  private static final BinaryFactory BUFFER_BF = new BinaryFactory() {
+    @Override
+    public BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception {
+      byte [] orig = padded(bytes);
+      ByteBuffer buff = ByteBuffer.wrap(orig, 5, bytes.length);
+      Binary b;
+
+      if (reused) {
+        b = Binary.fromReusedByteBuffer(buff);
+      } else {
+        b = Binary.fromConstantByteBuffer(buff);
+      }
+
+      buff.mark();
+      assertArrayEquals(bytes, b.getBytes());
+      buff.reset();
+      return new BinaryAndOriginal(b, orig);
+    }
+  };
+
+  private static final BinaryFactory STRING_BF = new BinaryFactory() {
+    @Override
+    public BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception {
+      Binary b = Binary.fromString(new String(bytes, UTF8));
+      return new BinaryAndOriginal(b, b.getBytesUnsafe()); // only way to get underlying bytes for testing
+    }
+  };
+
+  private static byte[] padded(byte[] bytes) {
+    byte[] padded = new byte[bytes.length + 10];
+
+    for (int i = 0; i < 5; i++) {
+      padded[i] = (byte) i;
+    }
+
+    System.arraycopy(bytes, 0, padded, 5, bytes.length);
+
+    for (int i = 0; i < 5; i++) {
+      padded[i + 5 + bytes.length] = (byte) i;
+    }
+
+    return padded;
+  }
+
+  @Test
+  public void testByteArrayBackedBinary() throws Exception {
+    testBinary(BYTE_ARRAY_BACKED_BF, true);
+    testBinary(BYTE_ARRAY_BACKED_BF, false);
+  }
+
+  @Test
+  public void testByteArraySliceBackedBinary() throws Exception {
+    testBinary(BYTE_ARRAY_SLICE_BACKED_BF, true);
+    testBinary(BYTE_ARRAY_SLICE_BACKED_BF, false);
+  }
+
+  @Test
+  public void testByteBufferBackedBinary() throws Exception {
+    testBinary(BUFFER_BF, true);
+    testBinary(BUFFER_BF, false);
+  }
+
+  @Test
+  public void testFromStringBinary() throws Exception {
+    testBinary(STRING_BF, false);
+  }
+
+  private void testSlice(BinaryFactory bf, boolean reused) throws Exception {
+    BinaryAndOriginal bao = bf.get(testString.getBytes(UTF8), reused);
+
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.slice(0, testString.length()).getBytesUnsafe());
+    assertArrayEquals("123".getBytes(UTF8), bao.binary.slice(5, 3).getBytesUnsafe());
+  }
+
+  private void testConstantCopy(BinaryFactory bf) throws Exception {
+    BinaryAndOriginal bao = bf.get(testString.getBytes(UTF8), false);
+    assertEquals(false, bao.binary.isBackingBytesReused());
+
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.getBytes());
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.getBytesUnsafe());
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.copy().getBytesUnsafe());
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.copy().getBytes());
+
+    bao = bf.get(testString.getBytes(UTF8), false);
+    assertEquals(false, bao.binary.isBackingBytesReused());
+
+    Binary copy = bao.binary.copy();
+
+    assertSame(copy, bao.binary);
+
+    mutate(bao.original);
+
+    byte[] expected = testString.getBytes(UTF8);
+    mutate(expected);
+
+    assertArrayEquals(expected, copy.getBytes());
+    assertArrayEquals(expected, copy.getBytesUnsafe());
+    assertArrayEquals(expected, copy.copy().getBytesUnsafe());
+    assertArrayEquals(expected, copy.copy().getBytes());
+  }
+
+  private void testReusedCopy(BinaryFactory bf) throws Exception {
+    BinaryAndOriginal bao = bf.get(testString.getBytes(UTF8), true);
+    assertEquals(true, bao.binary.isBackingBytesReused());
+
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.getBytes());
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.getBytesUnsafe());
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.copy().getBytesUnsafe());
+    assertArrayEquals(testString.getBytes(UTF8), bao.binary.copy().getBytes());
+
+    bao = bf.get(testString.getBytes(UTF8), true);
+    assertEquals(true, bao.binary.isBackingBytesReused());
+
+    Binary copy = bao.binary.copy();
+    mutate(bao.original);
+
+    assertArrayEquals(testString.getBytes(UTF8), copy.getBytes());
+    assertArrayEquals(testString.getBytes(UTF8), copy.getBytesUnsafe());
+    assertArrayEquals(testString.getBytes(UTF8), copy.copy().getBytesUnsafe());
+    assertArrayEquals(testString.getBytes(UTF8), copy.copy().getBytes());
+  }
+
+  private void testBinary(BinaryFactory bf, boolean reused) throws Exception {
+    testSlice(bf, reused);
+
+    if (reused) {
+      testReusedCopy(bf);
+    } else {
+      testConstantCopy(bf);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java b/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
new file mode 100644
index 0000000..feee8de
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Very basic semver parser, only pays attention to major, minor, and patch numbers.
+ * Attempts to do a little bit of validation that the version string is valid, but
+ * is not a full implementation of the semver spec.
+ *
+ * NOTE: compareTo only respects major, minor, and patch (ignores rc numbers, SNAPSHOT, etc)
+ */
+public final class SemanticVersion implements Comparable<SemanticVersion> {
+  // (major).(minor).(patch)[(rc)(rcnum)]?(-(SNAPSHOT))?
+  private static final String FORMAT = "^(\\d+)\\.(\\d+)\\.(\\d+)((.*)(\\d+))?(\\-(.*))?$";
+  private static final Pattern PATTERN = Pattern.compile(FORMAT);
+
+  public final int major;
+  public final int minor;
+  public final int patch;
+
+  public SemanticVersion(int major, int minor, int patch) {
+    Preconditions.checkArgument(major >= 0, "major must be >= 0");
+    Preconditions.checkArgument(minor >= 0, "minor must be >= 0");
+    Preconditions.checkArgument(patch >= 0, "patch must be >= 0");
+
+    this.major = major;
+    this.minor = minor;
+    this.patch = patch;
+  }
+
+  public static SemanticVersion parse(String version) throws SemanticVersionParseException {
+    Matcher matcher = PATTERN.matcher(version);
+
+    if (!matcher.matches()) {
+      throw new SemanticVersionParseException("" + version + " does not match format " + FORMAT);
+    }
+
+    final int major;
+    final int minor;
+    final int patch;
+
+    try {
+      major = Integer.valueOf(matcher.group(1));
+      minor = Integer.valueOf(matcher.group(2));
+      patch = Integer.valueOf(matcher.group(3));
+    } catch (NumberFormatException e) {
+      throw new SemanticVersionParseException(e);
+    }
+
+    if (major < 0 || minor < 0 || patch < 0) {
+      throw new SemanticVersionParseException(
+          String.format("major(%d), minor(%d), and patch(%d) must all be >= 0", major, minor, patch));
+    }
+
+    return new SemanticVersion(major, minor, patch);
+  }
+
+  @Override
+  public int compareTo(SemanticVersion o) {
+    int cmp;
+
+    cmp = Integer.compare(major, o.major);
+    if (cmp != 0) {
+      return cmp;
+    }
+
+    cmp = Integer.compare(minor, o.minor);
+    if (cmp != 0) {
+      return cmp;
+    }
+
+    return Integer.compare(patch, o.patch);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    SemanticVersion that = (SemanticVersion) o;
+    return compareTo(that) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = major;
+    result = 31 * result + minor;
+    result = 31 * result + patch;
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return major + "." + minor + "." + patch;
+  }
+
+  public static class SemanticVersionParseException extends Exception {
+    public SemanticVersionParseException() {
+      super();
+    }
+
+    public SemanticVersionParseException(String message) {
+      super(message);
+    }
+
+    public SemanticVersionParseException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public SemanticVersionParseException(Throwable cause) {
+      super(cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/VersionParser.java b/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
new file mode 100644
index 0000000..a0c6c3d
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+
+/**
+ * Parses a parquet Version string
+ * Tolerates missing semver and buildhash
+ * (semver and build hash may be null)
+ */
+public class VersionParser {
+  // example: parquet-mr version 1.8.0rc2-SNAPSHOT (build ddb469afac70404ea63b72ed2f07a911a8592ff7)
+  public static final String FORMAT = "(.+) version ((.*) )?\\(build ?(.*)\\)";
+  public static final Pattern PATTERN = Pattern.compile(FORMAT);
+
+  public static class ParsedVersion {
+    public final String application;
+    public final String semver;
+    public final String appBuildHash;
+
+    public ParsedVersion(String application, String semver, String appBuildHash) {
+      checkArgument(!Strings.isNullOrEmpty(application), "application cannot be null or empty");
+      this.application = application;
+      this.semver = Strings.isNullOrEmpty(semver) ? null : semver;
+      this.appBuildHash = Strings.isNullOrEmpty(appBuildHash) ? null : appBuildHash;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ParsedVersion version = (ParsedVersion) o;
+
+      if (appBuildHash != null ? !appBuildHash.equals(version.appBuildHash) : version.appBuildHash != null)
+        return false;
+      if (application != null ? !application.equals(version.application) : version.application != null) return false;
+      if (semver != null ? !semver.equals(version.semver) : version.semver != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = application != null ? application.hashCode() : 0;
+      result = 31 * result + (semver != null ? semver.hashCode() : 0);
+      result = 31 * result + (appBuildHash != null ? appBuildHash.hashCode() : 0);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "ParsedVersion(" +
+          "application=" + application +
+          ", semver=" + semver +
+          ", appBuildHash=" + appBuildHash +
+          ')';
+    }
+  }
+
+  public static ParsedVersion parse(String createdBy) throws VersionParseException {
+    Matcher matcher = PATTERN.matcher(createdBy);
+
+    if(!matcher.matches()){
+      throw new VersionParseException("Could not parse created_by: " + createdBy + " using format: " + FORMAT);
+    }
+
+    String application = matcher.group(1);
+    String semver = matcher.group(3);
+    String appBuildHash = matcher.group(4);
+
+    if (Strings.isNullOrEmpty(application)) {
+      throw new VersionParseException("application cannot be null or empty");
+    }
+
+    return new ParsedVersion(application, semver, appBuildHash);
+  }
+
+  public static class VersionParseException extends Exception {
+    public VersionParseException(String message) {
+      super(message);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java b/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
new file mode 100644
index 0000000..6a1c47f
--- /dev/null
+++ b/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SemanticVersionTest {
+  @Test
+  public void testCompare() {
+    assertTrue(new SemanticVersion(1, 8, 1).compareTo(new SemanticVersion(1, 8, 1)) == 0);
+    assertTrue(new SemanticVersion(1, 8, 0).compareTo(new SemanticVersion(1, 8, 1)) < 0);
+    assertTrue(new SemanticVersion(1, 8, 2).compareTo(new SemanticVersion(1, 8, 1)) > 0);
+
+    assertTrue(new SemanticVersion(1, 8, 1).compareTo(new SemanticVersion(1, 8, 1)) == 0);
+    assertTrue(new SemanticVersion(1, 8, 0).compareTo(new SemanticVersion(1, 8, 1)) < 0);
+    assertTrue(new SemanticVersion(1, 8, 2).compareTo(new SemanticVersion(1, 8, 1)) > 0);
+
+    assertTrue(new SemanticVersion(1, 7, 0).compareTo(new SemanticVersion(1, 8, 0)) < 0);
+    assertTrue(new SemanticVersion(1, 9, 0).compareTo(new SemanticVersion(1, 8, 0)) > 0);
+
+    assertTrue(new SemanticVersion(0, 0, 0).compareTo(new SemanticVersion(1, 0, 0)) < 0);
+    assertTrue(new SemanticVersion(2, 0, 0).compareTo(new SemanticVersion(1, 0, 0)) > 0);
+
+    assertTrue(new SemanticVersion(1, 8, 100).compareTo(new SemanticVersion(1, 9, 0)) < 0);
+  }
+
+  @Test
+  public void testParse() throws Exception {
+    assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0"));
+    assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0rc3"));
+    assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0rc3-SNAPSHOT"));
+    assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0-SNAPSHOT"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/VersionTest.java b/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
index e68b2f2..3720007 100644
--- a/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
+++ b/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
@@ -21,11 +21,14 @@ package org.apache.parquet;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.VersionParser.VersionParseException;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * This test doesn't do much, but it makes sure that the Version class
@@ -49,14 +52,37 @@ public class VersionTest {
   }
 
   @Test
-  public void testFullVersion() {
-    // example: parquet-mr version 1.8.0rc2-SNAPSHOT (build ddb469afac70404ea63b72ed2f07a911a8592ff7)
-    String regex = "parquet-mr version (.*) \\(build (.*)\\)";
-    Pattern pattern = Pattern.compile(regex);
-    Matcher m = pattern.matcher(Version.FULL_VERSION);
-    assertTrue(Version.FULL_VERSION + " did not match " + pattern, m.matches());
-    assertVersionValid(m.group(1));
-    assertEquals(Version.VERSION_NUMBER, m.group(1));
-    assertFalse(m.group(2).isEmpty());
+  public void testFullVersion() throws Exception {
+    ParsedVersion version = VersionParser.parse(Version.FULL_VERSION);
+
+    assertVersionValid(version.semver);
+    assertEquals(Version.VERSION_NUMBER, version.semver);
+    assertEquals("parquet-mr", version.application);
+  }
+  
+  @Test
+  public void testVersionParser() throws Exception {
+    assertEquals(new ParsedVersion("parquet-mr", "1.6.0", "abcd"),
+        VersionParser.parse("parquet-mr version 1.6.0 (build abcd)"));
+
+    assertEquals(new ParsedVersion("parquet-mr", "1.6.22rc99-SNAPSHOT", "abcd"),
+        VersionParser.parse("parquet-mr version 1.6.22rc99-SNAPSHOT (build abcd)"));
+
+    try {
+      VersionParser.parse("unparseable string");
+      fail("this should throw");
+    } catch (VersionParseException e) {
+      //
+    }
+
+    // missing semver
+    assertEquals(new ParsedVersion("parquet-mr", null, "abcd"), VersionParser.parse("parquet-mr version (build abcd)"));
+    assertEquals(new ParsedVersion("parquet-mr", null, "abcd"), VersionParser.parse("parquet-mr version  (build abcd)"));
+
+    // missing build hash
+    assertEquals(new ParsedVersion("parquet-mr", "1.6.0", null), VersionParser.parse("parquet-mr version 1.6.0 (build )"));
+    assertEquals(new ParsedVersion("parquet-mr", "1.6.0", null), VersionParser.parse("parquet-mr version 1.6.0 (build)"));
+    assertEquals(new ParsedVersion("parquet-mr", null, null), VersionParser.parse("parquet-mr version (build)"));
+    assertEquals(new ParsedVersion("parquet-mr", null, null), VersionParser.parse("parquet-mr version (build )"));
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 1481783..420d97e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.parquet.CorruptStatistics;
 import org.apache.parquet.Log;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.ColumnChunk;
@@ -232,23 +233,35 @@ public class ParquetMetadataConverter {
     return Encoding.valueOf(encoding.name());
   }
 
-  public static Statistics toParquetStatistics(org.apache.parquet.column.statistics.Statistics statistics) {
+  public static Statistics toParquetStatistics(
+      org.apache.parquet.column.statistics.Statistics statistics) {
     Statistics stats = new Statistics();
     if (!statistics.isEmpty()) {
       stats.setNull_count(statistics.getNumNulls());
-      if(statistics.hasNonNullValue()) {
+      if (statistics.hasNonNullValue()) {
         stats.setMax(statistics.getMaxBytes());
         stats.setMin(statistics.getMinBytes());
-     }
+      }
     }
     return stats;
   }
-
+  /**
+   * @deprecated Replaced by {@link #fromParquetStatistics(
+   * String createdBy, Statistics statistics, PrimitiveTypeName type)}
+   */
+  @Deprecated
   public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics(Statistics statistics, PrimitiveTypeName type) {
+    return fromParquetStatistics(null, statistics, type);
+  }
+
+  public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics
+      (String createdBy, Statistics statistics, PrimitiveTypeName type) {
     // create stats object based on the column type
     org.apache.parquet.column.statistics.Statistics stats = org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type);
     // If there was no statistics written to the footer, create an empty Statistics object and return
-    if (statistics != null) {
+
+    // NOTE: See docs in CorruptStatistics for explanation of why this check is needed
+    if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type)) {
       if (statistics.isSetMax() && statistics.isSetMin()) {
         stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array());
       }
@@ -517,10 +530,12 @@ public class ParquetMetadataConverter {
       public FileMetaData visit(NoFilter filter) throws IOException {
         return readFileMetaData(from);
       }
+
       @Override
       public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
         return readFileMetaData(from, true);
       }
+
       @Override
       public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
         return filterFileMetaData(readFileMetaData(from), filter);
@@ -555,7 +570,10 @@ public class ParquetMetadataConverter {
               messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
               CompressionCodecName.fromParquet(metaData.codec),
               fromFormatEncodings(metaData.encodings),
-              fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
+              fromParquetStatistics(
+                  parquetMetadata.getCreated_by(),
+                  metaData.statistics,
+                  messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
               metaData.data_page_offset,
               metaData.dictionary_page_offset,
               metaData.num_values,
@@ -672,7 +690,10 @@ public class ParquetMetadataConverter {
       org.apache.parquet.column.Encoding dlEncoding,
       org.apache.parquet.column.Encoding valuesEncoding,
       OutputStream to) throws IOException {
-    writePageHeader(newDataPageHeader(uncompressedSize, compressedSize, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding), to);
+    writePageHeader(
+        newDataPageHeader(uncompressedSize, compressedSize, valueCount, statistics,
+            rlEncoding, dlEncoding, valuesEncoding),
+        to);
   }
 
   private static PageHeader newDataPageHeader(
@@ -690,7 +711,8 @@ public class ParquetMetadataConverter {
         getEncoding(dlEncoding),
         getEncoding(rlEncoding)));
     if (!statistics.isEmpty()) {
-      pageHeader.getData_page_header().setStatistics(toParquetStatistics(statistics));
+      pageHeader.getData_page_header().setStatistics(
+          toParquetStatistics(statistics));
     }
     return pageHeader;
   }
@@ -723,7 +745,8 @@ public class ParquetMetadataConverter {
         getEncoding(dataEncoding),
         dlByteLength, rlByteLength);
     if (!statistics.isEmpty()) {
-      dataPageHeaderV2.setStatistics(toParquetStatistics(statistics));
+      dataPageHeaderV2.setStatistics(
+          toParquetStatistics(statistics));
     }
     PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
     pageHeader.setData_page_header_v2(dataPageHeaderV2);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 6ff4eac..ce8c287 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -38,6 +38,7 @@ import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.hadoop.api.InitContext;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.io.ColumnIOFactory;
 import org.apache.parquet.io.MessageColumnIO;
@@ -163,10 +164,11 @@ class InternalParquetRecordReader<T> {
   }
 
   public void initialize(MessageType fileSchema,
-      Map<String, String> fileMetadata,
+      FileMetaData parquetFileMetadata,
       Path file, List<BlockMetaData> blocks, Configuration configuration)
       throws IOException {
     // initialize a ReadContext for this file
+    Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
     ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
         configuration, toSetMultiMap(fileMetadata), fileSchema));
     this.requestedSchema = readContext.getRequestedSchema();
@@ -177,7 +179,7 @@ class InternalParquetRecordReader<T> {
         configuration, fileMetadata, fileSchema, readContext);
     this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    reader = new ParquetFileReader(configuration, file, blocks, columns);
+    reader = new ParquetFileReader(configuration, parquetFileMetadata, file, blocks, columns);
     for (BlockMetaData block : blocks) {
       total += block.getRowCount();
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 7f97b19..19370cd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -73,6 +73,7 @@ import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
 import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
@@ -435,18 +436,33 @@ public class ParquetFileReader implements Closeable {
   private final List<BlockMetaData> blocks;
   private final FSDataInputStream f;
   private final Path filePath;
-  private int currentBlock = 0;
   private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
+  private final FileMetaData fileMetaData;
+  private final String createdBy;
+
+  private int currentBlock = 0;
+
+  /**
+   * @deprecated use @link{ParquetFileReader(Configuration configuration, FileMetaData fileMetaData,
+   * Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns)} instead
+   */
+  public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
+    this(configuration, null, filePath, blocks, columns);
+  }
 
   /**
-   * @param f the Parquet file (will be opened for read in this constructor)
+   * @param configuration the Hadoop conf
+   * @param fileMetaData fileMetaData for parquet file
    * @param blocks the blocks to read
-   * @param colums the columns to read (their path)
-   * @param codecClassName the codec used to compress the blocks
+   * @param columns the columns to read (their path)
    * @throws IOException if the file can not be opened
    */
-  public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
+  public ParquetFileReader(
+      Configuration configuration, FileMetaData fileMetaData,
+      Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
     this.filePath = filePath;
+    this.fileMetaData = fileMetaData;
+    this.createdBy = fileMetaData == null ? null : fileMetaData.getCreatedBy();
     FileSystem fs = filePath.getFileSystem(configuration);
     this.f = fs.open(filePath);
     this.blocks = blocks;
@@ -456,6 +472,7 @@ public class ParquetFileReader implements Closeable {
     this.codecFactory = new CodecFactory(configuration);
   }
 
+
   /**
    * Reads all the columns requested from the row group at the current file position.
    * @throws IOException if an error occurs while reading
@@ -566,7 +583,10 @@ public class ParquetFileReader implements Closeable {
                     this.readAsBytesInput(compressedPageSize),
                     dataHeaderV1.getNum_values(),
                     uncompressedPageSize,
-                    fromParquetStatistics(dataHeaderV1.getStatistics(), descriptor.col.getType()),
+                    fromParquetStatistics(
+                        createdBy,
+                        dataHeaderV1.getStatistics(),
+                        descriptor.col.getType()),
                     ParquetMetadataConverter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
                     ParquetMetadataConverter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
                     ParquetMetadataConverter.getEncoding(dataHeaderV1.getEncoding())
@@ -586,7 +606,10 @@ public class ParquetFileReader implements Closeable {
                     ParquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
                     this.readAsBytesInput(dataSize),
                     uncompressedPageSize,
-                    fromParquetStatistics(dataHeaderV2.getStatistics(), descriptor.col.getType()),
+                    fromParquetStatistics(
+                        createdBy,
+                        dataHeaderV2.getStatistics(),
+                        descriptor.col.getType()),
                     dataHeaderV2.isIs_compressed()
                     ));
             valuesCountReadSoFar += dataHeaderV2.getNum_values();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index a45dde5..b23273a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -149,7 +149,7 @@ public class ParquetReader<T> implements Closeable {
 
       reader = new InternalParquetRecordReader<T>(readSupport, filter);
       reader.initialize(fileSchema,
-          footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
+          footer.getParquetMetadata().getFileMetaData(),
           footer.getFile(), filteredBlocks, conf);
     }
   }


Mime
View raw message