parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-246: File recovery and work-arounds
Date Thu, 09 Jul 2015 17:20:05 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master f4e754e66 -> 043fcde30


PARQUET-246: File recovery and work-arounds

This is another way to recover data written with the delta byte array problem in PARQUET-246. This builds on @isnotinvain's strategy for solving the problem by adding a method to the encoding to detect it. This version is more similar to the fix for PARQUET-251 and includes a CorruptDeltaByteArrays helper class that uses the writer version. Most of the file changes are to get the file writer version to Encoding and the ColumnReaderImpl.

This also repairs the problem by using a new interface, RequiresPreviousReader, to pass the previous ValuesReader, which is slightly cleaner because the reader doesn't need to expose getter and setter methods.

The problem affects pages written to different row groups, so it was necessary to detect the problem in parquet-hadoop and fail jobs that cannot reconstruct data. The work-around to recover is to set "parquet.split.files" to false so that files are read sequentially. This could be set automatically in isSplittable, but this would require reading all file footers before submitting jobs, which was recently fixed. I think it is a fair compromise to detect the error case and recommend a solution.

This also includes tests for the problem to verify the fix.

Replaces old pull requests: closes #217 closes #235

Author: Ryan Blue <blue@apache.org>

Closes #235 from rdblue/PARQUET-246-recover-files and squashes the following commits:

067d5ca [Ryan Blue] PARQUET-246: Refactor after review comments.
3236a3b [Ryan Blue] PARQUET-246: Fix ParquetInputFormat for delta byte[] corruption.
3107362 [Ryan Blue] PARQUET-246: Add tests for delta byte array fix.
a10b157 [Ryan Blue] PARQUET-246: Fix reading for corrupt delta byte arrays.
5c9497c [Ryan Blue] PARQUET-246: Parse semantic version with full version.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/043fcde3
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/043fcde3
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/043fcde3

Branch: refs/heads/master
Commit: 043fcde300267e183972056a007bcf406e5c484a
Parents: f4e754e
Author: Ryan Blue <blue@apache.org>
Authored: Thu Jul 9 10:19:51 2015 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Thu Jul 9 10:19:51 2015 -0700

----------------------------------------------------------------------
 .../apache/parquet/CorruptDeltaByteArrays.java  |  99 +++++++
 .../org/apache/parquet/CorruptStatistics.java   |   4 +-
 .../column/impl/ColumnReadStoreImpl.java        |  22 +-
 .../parquet/column/impl/ColumnReaderImpl.java   |  42 ++-
 .../column/values/RequiresPreviousReader.java   |  23 ++
 .../deltastrings/DeltaByteArrayReader.java      |  25 +-
 .../org/apache/parquet/io/ColumnIOFactory.java  |  38 ++-
 .../org/apache/parquet/io/MessageColumnIO.java  |  11 +-
 .../column/impl/TestColumnReaderImpl.java       |  10 +-
 .../column/impl/TestCorruptDeltaByteArrays.java | 259 +++++++++++++++++++
 .../parquet/column/mem/TestMemColumn.java       |   3 +-
 .../org/apache/parquet/SemanticVersion.java     |  29 ++-
 .../java/org/apache/parquet/VersionParser.java  |  39 ++-
 .../org/apache/parquet/SemanticVersionTest.java |  10 +-
 .../java/org/apache/parquet/VersionTest.java    |   9 +-
 .../hadoop/InternalParquetRecordReader.java     |   3 +-
 .../parquet/hadoop/ParquetInputFormat.java      |  10 +
 .../parquet/hadoop/ParquetRecordReader.java     |  27 +-
 .../parquet/hadoop/TestParquetFileWriter.java   |  11 +
 .../parquet/tools/command/DumpCommand.java      |   4 +-
 pom.xml                                         |   2 +
 21 files changed, 626 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/main/java/org/apache/parquet/CorruptDeltaByteArrays.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/CorruptDeltaByteArrays.java b/parquet-column/src/main/java/org/apache/parquet/CorruptDeltaByteArrays.java
new file mode 100644
index 0000000..258c9ee
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/CorruptDeltaByteArrays.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.column.Encoding;
+
+public class CorruptDeltaByteArrays {
+  private static final Log LOG = Log.getLog(CorruptStatistics.class);
+
+  private static final SemanticVersion PARQUET_246_FIXED_VERSION =
+      new SemanticVersion(1, 8, 0);
+
+  public static boolean requiresSequentialReads(ParsedVersion version, Encoding encoding) {
+    if (encoding != Encoding.DELTA_BYTE_ARRAY) {
+      return false;
+    }
+
+    if (version == null) {
+      return true;
+    }
+
+    if (!"parquet-mr".equals(version.application)) {
+      // assume other applications don't have this bug
+      return false;
+    }
+
+    if (!version.hasSemanticVersion()) {
+      LOG.warn("Requiring sequential reads because created_by did not " +
+          "contain a valid version (see PARQUET-246): " + version.version);
+      return true;
+    }
+
+    return requiresSequentialReads(version.getSemanticVersion(), encoding);
+  }
+
+  public static boolean requiresSequentialReads(SemanticVersion semver, Encoding encoding) {
+    if (encoding != Encoding.DELTA_BYTE_ARRAY) {
+      return false;
+    }
+
+    if (semver == null) {
+      return true;
+    }
+
+    if (semver.compareTo(PARQUET_246_FIXED_VERSION) < 0) {
+      LOG.info("Requiring sequential reads because this file was created " +
+          "prior to " + PARQUET_246_FIXED_VERSION + ". See PARQUET-246" );
+      return true;
+    }
+
+    // this file was created after the fix
+    return false;
+  }
+
+  public static boolean requiresSequentialReads(String createdBy, Encoding encoding) {
+    if (encoding != Encoding.DELTA_BYTE_ARRAY) {
+      return false;
+    }
+
+    if (Strings.isNullOrEmpty(createdBy)) {
+      LOG.info("Requiring sequential reads because file version is empty. " +
+          "See PARQUET-246");
+      return true;
+    }
+
+    try {
+      return requiresSequentialReads(VersionParser.parse(createdBy), encoding);
+
+    } catch (RuntimeException e) {
+      warnParseError(createdBy, e);
+      return true;
+    } catch (VersionParser.VersionParseException e) {
+      warnParseError(createdBy, e);
+      return true;
+    }
+  }
+
+  private static void warnParseError(String createdBy, Throwable e) {
+    LOG.warn("Requiring sequential reads because created_by could not be " +
+        "parsed (see PARQUET-246): " + createdBy, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java b/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
index 8e15d01..e2b8114 100644
--- a/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
@@ -64,12 +64,12 @@ public class CorruptStatistics {
         return false;
       }
 
-      if (Strings.isNullOrEmpty(version.semver)) {
+      if (Strings.isNullOrEmpty(version.version)) {
         LOG.warn("Ignoring statistics because created_by did not contain a semver (see PARQUET-251): " + createdBy);
         return true;
       }
 
-      SemanticVersion semver = SemanticVersion.parse(version.semver);
+      SemanticVersion semver = SemanticVersion.parse(version.version);
 
       if (semver.compareTo(PARQUET_251_FIXED_VERSION) < 0) {
         LOG.info("Ignoring statistics because this file was created prior to "

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index bfbcdb9..3217b94 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -18,6 +18,9 @@
  */
 package org.apache.parquet.column.impl;
 
+import org.apache.parquet.VersionParser;
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.VersionParser.VersionParseException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnReadStore;
 import org.apache.parquet.column.ColumnReader;
@@ -43,17 +46,30 @@ public class ColumnReadStoreImpl implements ColumnReadStore {
   private final PageReadStore pageReadStore;
   private final GroupConverter recordConverter;
   private final MessageType schema;
+  private final ParsedVersion writerVersion;
 
   /**
-   * @param pageReadStore uderlying page storage
+   * @param pageReadStore underlying page storage
    * @param recordConverter the user provided converter to materialize records
    * @param schema the schema we are reading
    */
-  public ColumnReadStoreImpl(PageReadStore pageReadStore, GroupConverter recordConverter, MessageType schema) {
+  public ColumnReadStoreImpl(PageReadStore pageReadStore,
+                             GroupConverter recordConverter,
+                             MessageType schema, String createdBy) {
     super();
     this.pageReadStore = pageReadStore;
     this.recordConverter = recordConverter;
     this.schema = schema;
+
+    ParsedVersion version;
+    try {
+      version = VersionParser.parse(createdBy);
+    } catch (RuntimeException e) {
+      version = null;
+    } catch (VersionParseException e) {
+      version = null;
+    }
+    this.writerVersion = version;
   }
 
   @Override
@@ -63,7 +79,7 @@ public class ColumnReadStoreImpl implements ColumnReadStore {
 
   private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
     PrimitiveConverter converter = getPrimitiveConverter(path);
-    return new ColumnReaderImpl(path, pageReader, converter);
+    return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
   }
 
   private PrimitiveConverter getPrimitiveConverter(ColumnDescriptor path) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
index 2fa63a8..09b1bdf 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
@@ -28,7 +28,9 @@ import static org.apache.parquet.column.ValuesType.VALUES;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
+import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.Log;
+import org.apache.parquet.VersionParser.ParsedVersion;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -40,6 +42,7 @@ import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -130,6 +133,7 @@ class ColumnReaderImpl implements ColumnReader {
     }
   }
 
+  private final ParsedVersion writerVersion;
   private final ColumnDescriptor path;
   private final long totalValueCount;
   private final PageReader pageReader;
@@ -138,6 +142,7 @@ class ColumnReaderImpl implements ColumnReader {
   private IntIterator repetitionLevelColumn;
   private IntIterator definitionLevelColumn;
   protected ValuesReader dataColumn;
+  private Encoding currentEncoding;
 
   private int repetitionLevel;
   private int definitionLevel;
@@ -327,10 +332,11 @@ class ColumnReaderImpl implements ColumnReader {
    * @param path the descriptor for the corresponding column
    * @param pageReader the underlying store to read from
    */
-  public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter) {
+  public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter, ParsedVersion writerVersion) {
     this.path = checkNotNull(path, "path");
     this.pageReader = checkNotNull(pageReader, "pageReader");
     this.converter = checkNotNull(converter, "converter");
+    this.writerVersion = writerVersion;
     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
     if (dictionaryPage != null) {
       try {
@@ -459,10 +465,28 @@ class ColumnReaderImpl implements ColumnReader {
         valueRead = true;
       }
     } catch (RuntimeException e) {
+      if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, currentEncoding) &&
+          e instanceof ArrayIndexOutOfBoundsException) {
+        // this is probably PARQUET-246, which may happen if reading data with
+        // MR because this can't be detected without reading all footers
+        throw new ParquetDecodingException("Read failure possibly due to " +
+            "PARQUET-246: try setting parquet.split.files to false",
+            new ParquetDecodingException(
+                format("Can't read value in column %s at value %d out of %d, " +
+                        "%d out of %d in currentPage. repetition level: " +
+                        "%d, definition level: %d",
+                    path, readValues, totalValueCount,
+                    readValues - (endOfPageValueCount - pageValueCount),
+                    pageValueCount, repetitionLevel, definitionLevel),
+                e));
+      }
       throw new ParquetDecodingException(
-          format(
-              "Can't read value in column %s at value %d out of %d, %d out of %d in currentPage. repetition level: %d, definition level: %d",
-              path, readValues, totalValueCount, readValues - (endOfPageValueCount - pageValueCount), pageValueCount, repetitionLevel, definitionLevel),
+          format("Can't read value in column %s at value %d out of %d, " +
+                  "%d out of %d in currentPage. repetition level: " +
+                  "%d, definition level: %d",
+              path, readValues, totalValueCount,
+              readValues - (endOfPageValueCount - pageValueCount),
+              pageValueCount, repetitionLevel, definitionLevel),
           e);
     }
   }
@@ -525,8 +549,12 @@ class ColumnReaderImpl implements ColumnReader {
   }
 
   private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
+    ValuesReader previousReader = this.dataColumn;
+
+    this.currentEncoding = dataEncoding;
     this.pageValueCount = valueCount;
     this.endOfPageValueCount = readValues + pageValueCount;
+
     if (dataEncoding.usesDictionary()) {
       if (dictionary == null) {
         throw new ParquetDecodingException(
@@ -546,6 +574,12 @@ class ColumnReaderImpl implements ColumnReader {
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page in col " + path, e);
     }
+
+    if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
+        previousReader != null && previousReader instanceof RequiresPreviousReader) {
+      // previous reader can only be set if reading sequentially
+      ((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader);
+    }
   }
 
   private void readPageV1(DataPageV1 page) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresPreviousReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresPreviousReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresPreviousReader.java
new file mode 100644
index 0000000..538d586
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresPreviousReader.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+public interface RequiresPreviousReader {
+  void setPreviousReader(ValuesReader reader);
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
index fd55035..87ec08e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.deltastrings;
 
 import java.io.IOException;
 
+import org.apache.parquet.column.values.RequiresPreviousReader;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
 import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
@@ -31,7 +32,7 @@ import org.apache.parquet.io.api.Binary;
  * @author Aniket Mokashi
  *
  */
-public class DeltaByteArrayReader extends ValuesReader {
+public class DeltaByteArrayReader extends ValuesReader implements RequiresPreviousReader {
   private ValuesReader prefixLengthReader;
   private ValuesReader suffixReader;
 
@@ -63,7 +64,13 @@ public class DeltaByteArrayReader extends ValuesReader {
     // This does not copy bytes
     Binary suffix = suffixReader.readBytes();
     int length = prefixLength + suffix.length();
-    
+
+    // NOTE: due to PARQUET-246, it is important that we
+    // respect prefixLength which was read from prefixLengthReader,
+    // even for the *first* value of a page. Even though the first
+    // value of the page should have an empty prefix, it may not
+    // because of PARQUET-246.
+
     // We have to do this to materialize the output
     if(prefixLength != 0) {
       byte[] out = new byte[length];
@@ -75,4 +82,18 @@ public class DeltaByteArrayReader extends ValuesReader {
     }
     return previous;
   }
+
+  /**
+   * There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset() method did not
+   * clear the previous value state that it tracks internally. This resulted in the first
+   * value of all pages (except for the first page) to be a delta from the last value of the
+   * previous page. In order to read corrupted files written with this bug, when reading a
+   * new page we need to recover the previous page's last value to use it (if needed) to
+   * read the first value.
+   */
+  public void setPreviousReader(ValuesReader reader) {
+    if (reader != null) {
+      this.previous = ((DeltaByteArrayReader) reader).previous;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
index 71af780..aeef510 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
@@ -35,30 +35,28 @@ import org.apache.parquet.schema.TypeVisitor;
  */
 public class ColumnIOFactory {
 
-  public class ColumnIOCreatorVisitor implements TypeVisitor {
+  private class ColumnIOCreatorVisitor implements TypeVisitor {
 
     private MessageColumnIO columnIO;
     private GroupColumnIO current;
     private List<PrimitiveColumnIO> leaves = new ArrayList<PrimitiveColumnIO>();
     private final boolean validating;
     private final MessageType requestedSchema;
+    private final String createdBy;
     private int currentRequestedIndex;
     private Type currentRequestedType;
     private boolean strictTypeChecking;
-
-    public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) {
-      this(validating, requestedSchema, true);
-    }
     
-    public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean strictTypeChecking) {
+    private ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, String createdBy, boolean strictTypeChecking) {
       this.validating = validating;
       this.requestedSchema = requestedSchema;
+      this.createdBy = createdBy;
       this.strictTypeChecking = strictTypeChecking;
     }
 
     @Override
     public void visit(MessageType messageType) {
-      columnIO = new MessageColumnIO(requestedSchema, validating);
+      columnIO = new MessageColumnIO(requestedSchema, validating, createdBy);
       visitChildren(columnIO, messageType, requestedSchema);
       columnIO.setLevels();
       columnIO.setLeaves(leaves);
@@ -112,25 +110,43 @@ public class ColumnIOFactory {
 
   }
 
+  private final String createdBy;
   private final boolean validating;
 
   /**
    * validation is off by default
    */
   public ColumnIOFactory() {
-    this(false);
+    this(null, false);
+  }
+
+  /**
+   * validation is off by default
+   * @param createdBy createdBy string for readers
+   */
+  public ColumnIOFactory(String createdBy) {
+    this(createdBy, false);
   }
 
   /**
    * @param validating to turn validation on
    */
   public ColumnIOFactory(boolean validating) {
+    this(null, validating);
+  }
+
+  /**
+   * @param createdBy createdBy string for readers
+   * @param validating to turn validation on
+   */
+  public ColumnIOFactory(String createdBy, boolean validating) {
     super();
+    this.createdBy = createdBy;
     this.validating = validating;
   }
 
   /**
-   * @param schema the requestedSchema we want to read/write
+   * @param requestedSchema the requestedSchema we want to read/write
    * @param fileSchema the file schema (when reading it can be different from the requested schema)
    * @return the corresponding serializing/deserializing structure
    */
@@ -139,13 +155,13 @@ public class ColumnIOFactory {
   }
   
   /**
-   * @param schema the requestedSchema we want to read/write
+   * @param requestedSchema the requestedSchema we want to read/write
    * @param fileSchema the file schema (when reading it can be different from the requested schema)
    * @param strict should file type and requested primitive types match
    * @return the corresponding serializing/deserializing structure
    */
   public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) {
-    ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, strict);
+    ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, createdBy, strict);
     fileSchema.accept(visitor);
     return visitor.getColumnIO();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
index e24aedb..9a8f88e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -60,10 +60,12 @@ public class MessageColumnIO extends GroupColumnIO {
   private List<PrimitiveColumnIO> leaves;
 
   private final boolean validating;
+  private final String createdBy;
 
-  MessageColumnIO(MessageType messageType, boolean validating) {
+  MessageColumnIO(MessageType messageType, boolean validating, String createdBy) {
     super(messageType, null, 0);
     this.validating = validating;
+    this.createdBy = createdBy;
   }
 
   public List<String[]> getColumnNames() {
@@ -113,7 +115,7 @@ public class MessageColumnIO extends GroupColumnIO {
             MessageColumnIO.this,
             filteringRecordMaterializer,
             validating,
-            new ColumnReadStoreImpl(columns, filteringRecordMaterializer.getRootConverter(), getType()));
+            new ColumnReadStoreImpl(columns, filteringRecordMaterializer.getRootConverter(), getType(), createdBy));
       }
 
       @Override
@@ -122,11 +124,10 @@ public class MessageColumnIO extends GroupColumnIO {
             MessageColumnIO.this,
             recordMaterializer,
             validating,
-            new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()),
+            new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType(), createdBy),
             unboundRecordFilterCompat.getUnboundRecordFilter(),
             columns.getRowCount()
         );
-
       }
 
       @Override
@@ -135,7 +136,7 @@ public class MessageColumnIO extends GroupColumnIO {
             MessageColumnIO.this,
             recordMaterializer,
             validating,
-            new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()));
+            new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType(), createdBy));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
index ebdbdf8..a1820e6 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
@@ -23,6 +23,8 @@ import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_
 
 import java.util.List;
 
+import org.apache.parquet.Version;
+import org.apache.parquet.VersionParser;
 import org.junit.Test;
 
 import org.apache.parquet.column.ColumnDescriptor;
@@ -52,7 +54,7 @@ public class TestColumnReaderImpl {
   }
 
   @Test
-  public void test() {
+  public void test() throws Exception {
     MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
     ColumnDescriptor col = schema.getColumns().get(0);
     MemPageWriter pageWriter = new MemPageWriter();
@@ -76,7 +78,7 @@ public class TestColumnReaderImpl {
     assertEquals(rows, valueCount);
     MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
     ValidatingConverter converter = new ValidatingConverter();
-    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
+    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
     for (int i = 0; i < rows; i++) {
       assertEquals(0, columnReader.getCurrentRepetitionLevel());
       assertEquals(0, columnReader.getCurrentDefinitionLevel());
@@ -87,7 +89,7 @@ public class TestColumnReaderImpl {
   }
 
   @Test
-  public void testOptional() {
+  public void testOptional() throws Exception {
     MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
     ColumnDescriptor col = schema.getColumns().get(0);
     MemPageWriter pageWriter = new MemPageWriter();
@@ -111,7 +113,7 @@ public class TestColumnReaderImpl {
     assertEquals(rows, valueCount);
     MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
     ValidatingConverter converter = new ValidatingConverter();
-    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
+    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
     for (int i = 0; i < rows; i++) {
       assertEquals(0, columnReader.getCurrentRepetitionLevel());
       assertEquals(0, columnReader.getCurrentDefinitionLevel());

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
new file mode 100644
index 0000000..0327948
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import org.apache.parquet.CorruptDeltaByteArrays;
+import org.apache.parquet.SemanticVersion;
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.page.mem.MemPageStore;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestCorruptDeltaByteArrays {
+  @Test
+  public void testCorruptDeltaByteArrayVerisons() {
+    assertTrue(CorruptDeltaByteArrays.requiresSequentialReads("parquet-mr version 1.6.0 (build abcd)", Encoding.DELTA_BYTE_ARRAY));
+    assertTrue(CorruptDeltaByteArrays.requiresSequentialReads((String) null, Encoding.DELTA_BYTE_ARRAY));
+    assertTrue(CorruptDeltaByteArrays.requiresSequentialReads((ParsedVersion) null, Encoding.DELTA_BYTE_ARRAY));
+    assertTrue(CorruptDeltaByteArrays.requiresSequentialReads((SemanticVersion) null, Encoding.DELTA_BYTE_ARRAY));
+    assertTrue(CorruptDeltaByteArrays.requiresSequentialReads("parquet-mr version 1.8.0-SNAPSHOT (build abcd)", Encoding.DELTA_BYTE_ARRAY));
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads("parquet-mr version 1.6.0 (build abcd)", Encoding.DELTA_BINARY_PACKED));
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads((String) null, Encoding.DELTA_LENGTH_BYTE_ARRAY));
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads((ParsedVersion) null, Encoding.PLAIN));
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads((SemanticVersion) null, Encoding.RLE));
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads("parquet-mr version 1.8.0-SNAPSHOT (build abcd)", Encoding.RLE_DICTIONARY));
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads("parquet-mr version 1.8.0-SNAPSHOT (build abcd)", Encoding.PLAIN_DICTIONARY));
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads("parquet-mr version 1.8.0-SNAPSHOT (build abcd)", Encoding.BIT_PACKED));
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads("parquet-mr version 1.8.0 (build abcd)", Encoding.DELTA_BYTE_ARRAY));
+  }
+
+  @Test
+  public void testEncodingRequiresSequentailRead() {
+    ParsedVersion impala = new ParsedVersion("impala", "1.2.0", "abcd");
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads(impala, Encoding.DELTA_BYTE_ARRAY));
+    ParsedVersion broken = new ParsedVersion("parquet-mr", "1.8.0-SNAPSHOT", "abcd");
+    assertTrue(CorruptDeltaByteArrays.requiresSequentialReads(broken, Encoding.DELTA_BYTE_ARRAY));
+    ParsedVersion fixed = new ParsedVersion("parquet-mr", "1.8.0", "abcd");
+    assertFalse(CorruptDeltaByteArrays.requiresSequentialReads(fixed, Encoding.DELTA_BYTE_ARRAY));
+  }
+
+  @Test
+  public void testReassemblyWithCorruptPage() throws Exception {
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+
+    String lastValue = null;
+    for (int i = 0; i < 10; i += 1) {
+      lastValue = str(i);
+      writer.writeBytes(Binary.fromString(lastValue));
+    }
+    byte[] firstPageBytes = writer.getBytes().toByteArray();
+
+    writer.reset(); // sets previous to new byte[0]
+    corruptWriter(writer, lastValue);
+
+    for (int i = 10; i < 20; i += 1) {
+      writer.writeBytes(Binary.fromString(str(i)));
+    }
+    byte[] corruptPageBytes = writer.getBytes().toByteArray();
+
+    DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
+    firstPageReader.initFromPage(10, firstPageBytes, 0);
+    for (int i = 0; i < 10; i += 1) {
+      assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
+    }
+
+    DeltaByteArrayReader corruptPageReader = new DeltaByteArrayReader();
+    corruptPageReader.initFromPage(10, corruptPageBytes, 0);
+    try {
+      corruptPageReader.readBytes();
+      fail("Corrupt page did not throw an exception when read");
+    } catch (ArrayIndexOutOfBoundsException e) {
+      // expected, this is a corrupt page
+    }
+
+    DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
+    secondPageReader.initFromPage(10, corruptPageBytes, 0);
+    secondPageReader.setPreviousReader(firstPageReader);
+
+    for (int i = 10; i < 20; i += 1) {
+      assertEquals(secondPageReader.readBytes().toStringUsingUTF8(), str(i));
+    }
+  }
+
+  @Test
+  public void testReassemblyWithoutCorruption() throws Exception {
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+
+    for (int i = 0; i < 10; i += 1) {
+      writer.writeBytes(Binary.fromString(str(i)));
+    }
+    byte[] firstPageBytes = writer.getBytes().toByteArray();
+
+    writer.reset(); // sets previous to new byte[0]
+
+    for (int i = 10; i < 20; i += 1) {
+      writer.writeBytes(Binary.fromString(str(i)));
+    }
+    byte[] secondPageBytes = writer.getBytes().toByteArray();
+
+    DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
+    firstPageReader.initFromPage(10, firstPageBytes, 0);
+    for (int i = 0; i < 10; i += 1) {
+      assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
+    }
+
+    DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
+    secondPageReader.initFromPage(10, secondPageBytes, 0);
+    secondPageReader.setPreviousReader(firstPageReader);
+
+    for (int i = 10; i < 20; i += 1) {
+      assertEquals(secondPageReader.readBytes().toStringUsingUTF8(), str(i));
+    }
+  }
+
+  @Test
+  public void testOldReassemblyWithoutCorruption() throws Exception {
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+
+    for (int i = 0; i < 10; i += 1) {
+      writer.writeBytes(Binary.fromString(str(i)));
+    }
+    byte[] firstPageBytes = writer.getBytes().toByteArray();
+
+    writer.reset(); // sets previous to new byte[0]
+
+    for (int i = 10; i < 20; i += 1) {
+      writer.writeBytes(Binary.fromString(str(i)));
+    }
+    byte[] secondPageBytes = writer.getBytes().toByteArray();
+
+    DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
+    firstPageReader.initFromPage(10, firstPageBytes, 0);
+    for (int i = 0; i < 10; i += 1) {
+      assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
+    }
+
+    DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
+    secondPageReader.initFromPage(10, secondPageBytes, 0);
+
+    for (int i = 10; i < 20; i += 1) {
+      assertEquals(secondPageReader.readBytes().toStringUsingUTF8(), str(i));
+    }
+  }
+
+  @Test
+  public void testColumnReaderImplWithCorruptPage() throws Exception {
+    ColumnDescriptor column = new ColumnDescriptor(
+        new String[] {"s"}, PrimitiveType.PrimitiveTypeName.BINARY, 0, 0);
+    MemPageStore pages = new MemPageStore(0);
+    PageWriter memWriter = pages.getPageWriter(column);
+
+    // get generic repetition and definition level bytes to use for pages
+    ValuesWriter rdValues = ParquetProperties
+        .getColumnDescriptorValuesWriter(0, 10, 100);
+    for (int i = 0; i < 10; i += 1) {
+      rdValues.writeInteger(0);
+    }
+    // use a byte array backed BytesInput because it is reused
+    BytesInput rd = BytesInput.from(rdValues.getBytes().toByteArray());
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+    String lastValue = null;
+    List<String> values = new ArrayList<String>();
+    for (int i = 0; i < 10; i += 1) {
+      lastValue = str(i);
+      writer.writeBytes(Binary.fromString(lastValue));
+      values.add(lastValue);
+    }
+
+    memWriter.writePage(BytesInput.concat(rd, rd, writer.getBytes()),
+        10, /* number of values in the page */
+        new BinaryStatistics(),
+        rdValues.getEncoding(),
+        rdValues.getEncoding(),
+        writer.getEncoding());
+    pages.addRowCount(10);
+
+    writer.reset(); // sets previous to new byte[0]
+    corruptWriter(writer, lastValue);
+    for (int i = 10; i < 20; i += 1) {
+      String value = str(i);
+      writer.writeBytes(Binary.fromString(value));
+      values.add(value);
+    }
+
+    memWriter.writePage(BytesInput.concat(rd, rd, writer.getBytes()),
+        10, /* number of values in the page */
+        new BinaryStatistics(),
+        rdValues.getEncoding(),
+        rdValues.getEncoding(),
+        writer.getEncoding());
+    pages.addRowCount(10);
+
+    final List<String> actualValues = new ArrayList<String>();
+    PrimitiveConverter converter = new PrimitiveConverter() {
+      @Override
+      public void addBinary(Binary value) {
+        actualValues.add(value.toStringUsingUTF8());
+      }
+    };
+
+    ColumnReaderImpl columnReader = new ColumnReaderImpl(
+        column, pages.getPageReader(column), converter,
+        new ParsedVersion("parquet-mr", "1.6.0", "abcd"));
+
+    while (actualValues.size() < columnReader.getTotalValueCount()) {
+      columnReader.writeCurrentValueToConverter();
+      columnReader.consume();
+    }
+
+    Assert.assertEquals(values, actualValues);
+  }
+
+  public void corruptWriter(DeltaByteArrayWriter writer, String data) throws Exception {
+    Field previous = writer.getClass().getDeclaredField("previous");
+    previous.setAccessible(true);
+    previous.set(writer, Binary.fromString(data).getBytesUnsafe());
+  }
+
+  public String str(int i) {
+    char c = 'a';
+    return "aaaaaaaaaaa" + (char) (c + i);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index d801442..135123f 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -67,7 +67,8 @@ public class TestMemColumn {
     return new ColumnReadStoreImpl(
         memPageStore,
         new DummyRecordConverter(schema).getRootConverter(),
-        schema
+        schema,
+        null
         ).getColumnReader(path);
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java b/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
index feee8de..d573a81 100644
--- a/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
+++ b/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
@@ -26,7 +26,8 @@ import java.util.regex.Pattern;
  * Attempts to do a little bit of validation that the version string is valid, but
  * is not a full implementation of the semver spec.
  *
- * NOTE: compareTo only respects major, minor, and patch (ignores rc numbers, SNAPSHOT, etc)
+ * NOTE: compareTo only respects major, minor, patch, and whether this is a
+ * prerelease version. All prerelease versions are considered equivalent.
  */
 public final class SemanticVersion implements Comparable<SemanticVersion> {
   // (major).(minor).(patch)[(rc)(rcnum)]?(-(SNAPSHOT))?
@@ -36,6 +37,7 @@ public final class SemanticVersion implements Comparable<SemanticVersion> {
   public final int major;
   public final int minor;
   public final int patch;
+  public final boolean prerelease;
 
   public SemanticVersion(int major, int minor, int patch) {
     Preconditions.checkArgument(major >= 0, "major must be >= 0");
@@ -45,6 +47,18 @@ public final class SemanticVersion implements Comparable<SemanticVersion> {
     this.major = major;
     this.minor = minor;
     this.patch = patch;
+    this.prerelease = false;
+  }
+
+  public SemanticVersion(int major, int minor, int patch, boolean isPrerelease) {
+    Preconditions.checkArgument(major >= 0, "major must be >= 0");
+    Preconditions.checkArgument(minor >= 0, "minor must be >= 0");
+    Preconditions.checkArgument(patch >= 0, "patch must be >= 0");
+
+    this.major = major;
+    this.minor = minor;
+    this.patch = patch;
+    this.prerelease = isPrerelease;
   }
 
   public static SemanticVersion parse(String version) throws SemanticVersionParseException {
@@ -57,11 +71,15 @@ public final class SemanticVersion implements Comparable<SemanticVersion> {
     final int major;
     final int minor;
     final int patch;
+    boolean prerelease = false;
 
     try {
       major = Integer.valueOf(matcher.group(1));
       minor = Integer.valueOf(matcher.group(2));
       patch = Integer.valueOf(matcher.group(3));
+      for (int g = 4; g <= matcher.groupCount(); g += 1) {
+        prerelease |= (matcher.group(g) != null);
+      }
     } catch (NumberFormatException e) {
       throw new SemanticVersionParseException(e);
     }
@@ -71,7 +89,7 @@ public final class SemanticVersion implements Comparable<SemanticVersion> {
           String.format("major(%d), minor(%d), and patch(%d) must all be >= 0", major, minor, patch));
     }
 
-    return new SemanticVersion(major, minor, patch);
+    return new SemanticVersion(major, minor, patch, prerelease);
   }
 
   @Override
@@ -88,7 +106,12 @@ public final class SemanticVersion implements Comparable<SemanticVersion> {
       return cmp;
     }
 
-    return Integer.compare(patch, o.patch);
+    cmp = Integer.compare(patch, o.patch);
+    if (cmp != 0) {
+      return cmp;
+    }
+
+    return Boolean.compare(o.prerelease, prerelease);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/VersionParser.java b/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
index a0c6c3d..c1a94d3 100644
--- a/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
+++ b/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet;
 
+import org.apache.parquet.SemanticVersion.SemanticVersionParseException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -35,14 +36,40 @@ public class VersionParser {
 
   public static class ParsedVersion {
     public final String application;
-    public final String semver;
+    public final String version;
     public final String appBuildHash;
 
-    public ParsedVersion(String application, String semver, String appBuildHash) {
+    private final boolean hasSemver;
+    private final SemanticVersion semver;
+
+    public ParsedVersion(String application, String version, String appBuildHash) {
       checkArgument(!Strings.isNullOrEmpty(application), "application cannot be null or empty");
       this.application = application;
-      this.semver = Strings.isNullOrEmpty(semver) ? null : semver;
+      this.version = Strings.isNullOrEmpty(version) ? null : version;
       this.appBuildHash = Strings.isNullOrEmpty(appBuildHash) ? null : appBuildHash;
+
+      SemanticVersion sv;
+      boolean hasSemver;
+      try {
+        sv = SemanticVersion.parse(version);
+        hasSemver = true;
+      } catch (RuntimeException e) {
+        sv = null;
+        hasSemver = false;
+      } catch (SemanticVersionParseException e) {
+        sv = null;
+        hasSemver = false;
+      }
+      this.semver = sv;
+      this.hasSemver = hasSemver;
+    }
+
+    public boolean hasSemanticVersion() {
+      return hasSemver;
+    }
+
+    public SemanticVersion getSemanticVersion() {
+      return semver;
     }
 
     @Override
@@ -55,7 +82,7 @@ public class VersionParser {
       if (appBuildHash != null ? !appBuildHash.equals(version.appBuildHash) : version.appBuildHash != null)
         return false;
       if (application != null ? !application.equals(version.application) : version.application != null) return false;
-      if (semver != null ? !semver.equals(version.semver) : version.semver != null) return false;
+      if (this.version != null ? !this.version.equals(version.version) : version.version != null) return false;
 
       return true;
     }
@@ -63,7 +90,7 @@ public class VersionParser {
     @Override
     public int hashCode() {
       int result = application != null ? application.hashCode() : 0;
-      result = 31 * result + (semver != null ? semver.hashCode() : 0);
+      result = 31 * result + (version != null ? version.hashCode() : 0);
       result = 31 * result + (appBuildHash != null ? appBuildHash.hashCode() : 0);
       return result;
     }
@@ -72,7 +99,7 @@ public class VersionParser {
     public String toString() {
       return "ParsedVersion(" +
           "application=" + application +
-          ", semver=" + semver +
+          ", semver=" + version +
           ", appBuildHash=" + appBuildHash +
           ')';
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java b/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
index 6a1c47f..e969aab 100644
--- a/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
+++ b/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
@@ -41,13 +41,17 @@ public class SemanticVersionTest {
     assertTrue(new SemanticVersion(2, 0, 0).compareTo(new SemanticVersion(1, 0, 0)) > 0);
 
     assertTrue(new SemanticVersion(1, 8, 100).compareTo(new SemanticVersion(1, 9, 0)) < 0);
+
+    assertTrue(new SemanticVersion(1, 8, 0).compareTo(new SemanticVersion(1, 8, 0, true)) > 0);
+    assertTrue(new SemanticVersion(1, 8, 0, true).compareTo(new SemanticVersion(1, 8, 0, true)) == 0);
+    assertTrue(new SemanticVersion(1, 8, 0, true).compareTo(new SemanticVersion(1, 8, 0)) < 0);
   }
 
   @Test
   public void testParse() throws Exception {
     assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0"));
-    assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0rc3"));
-    assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0rc3-SNAPSHOT"));
-    assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0-SNAPSHOT"));
+    assertEquals(new SemanticVersion(1, 8, 0, true), SemanticVersion.parse("1.8.0rc3"));
+    assertEquals(new SemanticVersion(1, 8, 0, true), SemanticVersion.parse("1.8.0rc3-SNAPSHOT"));
+    assertEquals(new SemanticVersion(1, 8, 0, true), SemanticVersion.parse("1.8.0-SNAPSHOT"));
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/VersionTest.java b/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
index 3720007..75f8b76 100644
--- a/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
+++ b/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
@@ -18,16 +18,11 @@
  */
 package org.apache.parquet;
 
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.parquet.VersionParser.ParsedVersion;
 import org.apache.parquet.VersionParser.VersionParseException;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -55,8 +50,8 @@ public class VersionTest {
   public void testFullVersion() throws Exception {
     ParsedVersion version = VersionParser.parse(Version.FULL_VERSION);
 
-    assertVersionValid(version.semver);
-    assertEquals(Version.VERSION_NUMBER, version.semver);
+    assertVersionValid(version.version);
+    assertEquals(Version.VERSION_NUMBER, version.version);
     assertEquals("parquet-mr", version.application);
   }
   

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index ce8c287..c1bd037 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -57,7 +57,7 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
 class InternalParquetRecordReader<T> {
   private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
 
-  private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+  private ColumnIOFactory columnIOFactory = null;
   private final Filter filter;
 
   private MessageType requestedSchema;
@@ -171,6 +171,7 @@ class InternalParquetRecordReader<T> {
     Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
     ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
         configuration, toSetMultiMap(fileMetadata), fileSchema));
+    this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
     this.requestedSchema = readContext.getRequestedSchema();
     this.fileSchema = fileSchema;
     this.file = file;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index a4baf98..4848f22 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -120,6 +120,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata";
 
+  /**
+   * key to turn off file splitting. See PARQUET-246.
+   */
+  public static final String SPLIT_FILES = "parquet.split.files";
+
   private static final int MIN_FOOTER_CACHE_SIZE = 100;
 
   public static void setTaskSideMetaData(Job job,  boolean taskSideMetadata) {
@@ -280,6 +285,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     }
   }
 
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    return ContextUtil.getConfiguration(context).getBoolean(SPLIT_FILES, true);
+  }
+
   /**
    * {@inheritDoc}
    */

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index beb7dd6..0a31f9c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -22,6 +22,7 @@ import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.SPLIT_FILES;
 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 import java.io.IOException;
@@ -29,7 +30,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,15 +41,20 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.Log;
+import org.apache.parquet.column.Encoding;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.schema.MessageType;
 
 /**
@@ -187,11 +192,31 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
             + " in range " + split.getStart() + ", " + split.getEnd());
       }
     }
+
+    checkDeltaByteArrayProblem(footer.getFileMetaData(), configuration, filteredBlocks.get(0));
+
     MessageType fileSchema = footer.getFileMetaData().getSchema();
     internalReader.initialize(
         fileSchema, footer.getFileMetaData(), path, filteredBlocks, configuration);
   }
 
+  private void checkDeltaByteArrayProblem(FileMetaData meta, Configuration conf, BlockMetaData block) {
+    // splitting files?
+    if (conf.getBoolean(ParquetInputFormat.SPLIT_FILES, true)) {
+      // this is okay if not using DELTA_BYTE_ARRAY with the bug
+      Set<Encoding> encodings = new HashSet<Encoding>();
+      for (ColumnChunkMetaData column : block.getColumns()) {
+        encodings.addAll(column.getEncodings());
+      }
+      for (Encoding encoding : encodings) {
+        if (CorruptDeltaByteArrays.requiresSequentialReads(meta.getCreatedBy(), encoding)) {
+          throw new ParquetDecodingException("Cannot read data due to " +
+              "PARQUET-246: to read safely, set " + SPLIT_FILES + " to false");
+        }
+      }
+    }
+  }
+
   /**
    * {@inheritDoc}
    */

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 6151f48..d22b657 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -24,7 +24,11 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.CorruptStatistics;
+import org.apache.parquet.Version;
+import org.apache.parquet.VersionParser;
 import org.apache.parquet.bytes.BytesUtils;
+import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
 import org.apache.parquet.Log;
@@ -50,6 +54,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics;
 import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
 import static org.junit.Assert.*;
 import static org.apache.parquet.column.Encoding.BIT_PACKED;
@@ -438,6 +443,9 @@ public class TestParquetFileWriter {
 
   @Test
   public void testWriteReadStatistics() throws Exception {
+    // this test assumes statistics will be read
+    Assume.assumeTrue(!shouldIgnoreStatistics(Version.FULL_VERSION, BINARY));
+
     File testFile = temp.newFile();
     testFile.delete();
 
@@ -568,6 +576,9 @@ public class TestParquetFileWriter {
 
   @Test
   public void testWriteReadStatisticsAllNulls() throws Exception {
+    // this test assumes statistics will be read
+    Assume.assumeTrue(!shouldIgnoreStatistics(Version.FULL_VERSION, BINARY));
+
     File testFile = temp.newFile();
     testFile.delete();
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
----------------------------------------------------------------------
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
index 837a46a..1388ed3 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
@@ -216,7 +216,9 @@ public class DumpCommand extends ArgsOnlyCommand {
                         conf, meta.getFileMetaData(), inpath, blocks, Collections.singletonList(column));
                     PageReadStore store = freader.readNextRowGroup();
                     while (store != null) {
-                        ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema);
+                        ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(
+                            store, new DumpGroupConverter(), schema,
+                            meta.getFileMetaData().getCreatedBy());
                         dump(out, crstore, column, page++, total, offset);
 
                         offset += store.getRowCount();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/043fcde3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 177c649..271d40f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -232,6 +232,8 @@
                      <exclude>org/apache/parquet/thrift/ThriftSchemaConvertVisitor</exclude> <!-- not public -->
                      <exclude>org/apache/parquet/avro/AvroParquetReader</exclude> <!-- returns subclass of old return class -->
                      <exclude>org/apache/parquet/avro/SpecificDataSupplier</exclude> <!-- made public -->
+                     <exclude>org/apache/parquet/io/ColumnIOFactory$ColumnIOCreatorVisitor</exclude> <!-- removed non-API class -->
+                     <exclude>org/apache/parquet/io/ColumnIOFactory/**</exclude> <!-- removed non-API class and methods-->
                    </excludes>
                  </requireBackwardCompatibility>
                </rules>


Mime
View raw message