drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [8/9] drill git commit: DRILL-4028: Update Drill to leverage latest version of Parquet library.
Date Thu, 05 Nov 2015 05:56:46 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 3a00a4c..f42996b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -47,15 +46,16 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import parquet.column.ColumnDescriptor;
-import parquet.format.FileMetaData;
-import parquet.format.SchemaElement;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.PrimitiveType;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.FileMetaData;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.PrimitiveType;
 
 import com.google.common.collect.Lists;
 
@@ -101,7 +101,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   // records specified in the row group metadata
   long mockRecordsRead;
 
-  private final DirectCodecFactory codecFactory;
+  private final CodecFactory codecFactory;
   int rowGroupIndex;
   long totalRecordsRead;
   private final FragmentContext fragmentContext;
@@ -110,7 +110,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       String path,
       int rowGroupIndex,
       FileSystem fs,
-      DirectCodecFactory codecFactory,
+      CodecFactory codecFactory,
       ParquetMetadata footer,
                              List<SchemaPath> columns) throws ExecutionSetupException {
     this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer,
@@ -123,7 +123,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       String path,
       int rowGroupIndex,
       FileSystem fs,
-      DirectCodecFactory codecFactory,
+      CodecFactory codecFactory,
       ParquetMetadata footer,
       List<SchemaPath> columns) throws ExecutionSetupException {
     this.hadoopPath = new Path(path);
@@ -136,7 +136,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
     setColumns(columns);
   }
 
-  public DirectCodecFactory getCodecFactory() {
+  public CodecFactory getCodecFactory() {
     return codecFactory;
   }
 
@@ -471,7 +471,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       columnStatuses = null;
     }
 
-    codecFactory.close();
+    codecFactory.release();
 
     if (varLengthReader != null) {
       for (final VarLengthColumn r : varLengthReader.columns) {

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index a33e616..b6d1a72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -17,21 +17,17 @@
  ******************************************************************************/
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import static parquet.Preconditions.checkArgument;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 
 import org.apache.drill.common.util.CoreDecimalUtility;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.work.ExecErrorConstants;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.schema.PrimitiveType;
+import org.apache.parquet.format.ConvertedType;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.schema.PrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
 
 public class ParquetToDrillTypeConverter {
 

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
index 8e72bff..a62e8c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.ValueVector;
 
-import parquet.column.ColumnDescriptor;
-import parquet.format.Encoding;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.io.api.Binary;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.Encoding;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.api.Binary;
 
 public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumn.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index fe5266c..ba126d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 import io.netty.buffer.DrillBuf;
 
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
@@ -34,9 +35,9 @@ import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarCharVector;
 
-import parquet.column.ColumnDescriptor;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 
 public class VarLengthColumnReaders {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumnReaders.class);
@@ -222,7 +223,8 @@ public class VarLengthColumnReaders {
       }
 
       if (usingDictionary) {
-        mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0, currDictValToWrite.length());
+        ByteBuffer buf = currDictValToWrite.toByteBuffer();
+        mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length());
       } else {
         mutator.setSafe(index, 1, start, start + length, value);
       }
@@ -257,7 +259,8 @@ public class VarLengthColumnReaders {
 
       if (usingDictionary) {
         currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
-        mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0, currDictValToWrite.length());
+        ByteBuffer buf = currDictValToWrite.toByteBuffer();
+        mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length());
       } else {
         mutator.setSafe(index, start, start + length, value);
       }
@@ -294,8 +297,8 @@ public class VarLengthColumnReaders {
       }
 
       if (usingDictionary) {
-        mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0,
-            currDictValToWrite.length());
+        ByteBuffer buf = currDictValToWrite.toByteBuffer();
+        mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length());
       } else {
         mutator.setSafe(index, 1, start, start + length, value);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
index 1e14a3e..6a86cea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -25,11 +25,11 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
-import parquet.column.ColumnDescriptor;
-import parquet.format.Encoding;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.io.api.Binary;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.Encoding;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.api.Binary;
 
 public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLengthColumn {
 

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 6b8154a..5bc8ad2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -67,17 +67,17 @@ import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
 import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
 import org.joda.time.DateTimeUtils;
 
-import parquet.io.api.Binary;
-import parquet.io.api.Converter;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.DecimalMetadata;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.Type;
-import parquet.schema.Type.Repetition;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
 
 import com.google.common.collect.Lists;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 01a9853..9e6919b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -42,7 +42,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
@@ -51,17 +51,19 @@ import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.hadoop.fs.Path;
 
-import parquet.column.ColumnDescriptor;
-import parquet.common.schema.ColumnPath;
-import parquet.hadoop.ColumnChunkIncReadStore;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.hadoop.ColumnChunkIncReadStore;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -80,7 +82,7 @@ public class DrillParquetReader extends AbstractRecordReader {
   private RowGroupReadEntry entry;
   private VectorContainerWriter writer;
   private ColumnChunkIncReadStore pageReadStore;
-  private parquet.io.RecordReader<Void> recordReader;
+  private RecordReader<Void> recordReader;
   private DrillParquetRecordMaterializer recordMaterializer;
   private int recordCount;
   private List<ValueVector> primitiveVectors;
@@ -246,7 +248,8 @@ public class DrillParquetReader extends AbstractRecordReader {
       recordCount = (int) blockMetaData.getRowCount();
 
       pageReadStore = new ColumnChunkIncReadStore(recordCount,
-          new DirectCodecFactory(fileSystem.getConf(), operatorContext.getAllocator()), operatorContext.getAllocator(),
+          CodecFactory.createDirectCodecFactory(fileSystem.getConf(),
+              new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), operatorContext.getAllocator(),
           fileSystem, filePath);
 
       for (String[] path : schema.getPaths()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index a80eb57..6b7edc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -22,9 +22,9 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
 
 import java.util.Collection;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 22bd7df..8ade25c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -35,8 +36,6 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
-import parquet.org.codehaus.jackson.annotate.JsonCreator;
-
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
new file mode 100644
index 0000000..28f6390
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.store.parquet.ColumnDataReader;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
+
+
+public class ColumnChunkIncReadStore implements PageReadStore {
+
+  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+
+  private CodecFactory codecFactory;
+  private BufferAllocator allocator;
+  private FileSystem fs;
+  private Path path;
+  private long rowCount;
+  private List<FSDataInputStream> streams = new ArrayList();
+
+  public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator,
+      FileSystem fs, Path path) {
+    this.codecFactory = codecFactory;
+    this.allocator = allocator;
+    this.fs = fs;
+    this.path = path;
+    this.rowCount = rowCount;
+  }
+
+
+  public class ColumnChunkIncPageReader implements PageReader {
+
+    ColumnChunkMetaData metaData;
+    ColumnDescriptor columnDescriptor;
+    long fileOffset;
+    long size;
+    private long valueReadSoFar = 0;
+
+    private DictionaryPage dictionaryPage;
+    private FSDataInputStream in;
+    private BytesDecompressor decompressor;
+
+    private ByteBuf lastPage;
+
+    public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) throws IOException {
+      this.metaData = metaData;
+      this.columnDescriptor = columnDescriptor;
+      this.size = metaData.getTotalSize();
+      this.fileOffset = metaData.getStartingPos();
+      this.in = in;
+      this.decompressor = codecFactory.getDecompressor(metaData.getCodec());
+    }
+
+    @Override
+    public DictionaryPage readDictionaryPage() {
+      if (dictionaryPage == null) {
+        PageHeader pageHeader = new PageHeader();
+        long pos = 0;
+        try {
+          pos = in.getPos();
+          pageHeader = Util.readPageHeader(in);
+          if (pageHeader.getDictionary_page_header() == null) {
+            in.seek(pos);
+            return null;
+          }
+          dictionaryPage =
+                  new DictionaryPage(
+                          decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+                          pageHeader.getDictionary_page_header().getNum_values(),
+                          parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+                  );
+        } catch (Exception e) {
+          throw new DrillRuntimeException("Error reading dictionary page." +
+            "\nFile path: " + path.toUri().getPath() +
+            "\nRow count: " + rowCount +
+            "\nColumn Chunk Metadata: " + metaData +
+            "\nPage Header: " + pageHeader +
+            "\nFile offset: " + fileOffset +
+            "\nSize: " + size +
+            "\nValue read so far: " + valueReadSoFar +
+            "\nPosition: " + pos, e);
+        }
+      }
+      return dictionaryPage;
+    }
+
+    @Override
+    public long getTotalValueCount() {
+      return metaData.getValueCount();
+    }
+
+    @Override
+    public DataPage readPage() {
+      PageHeader pageHeader = new PageHeader();
+      try {
+        if (lastPage != null) {
+          lastPage.release();
+          lastPage = null;
+        }
+        while(valueReadSoFar < metaData.getValueCount()) {
+          pageHeader = Util.readPageHeader(in);
+          int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+          int compressedPageSize = pageHeader.getCompressed_page_size();
+          switch (pageHeader.type) {
+            case DICTIONARY_PAGE:
+              if (dictionaryPage == null) {
+                dictionaryPage =
+                        new DictionaryPage(
+                                decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+                                pageHeader.uncompressed_page_size,
+                                parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+                        );
+              } else {
+                in.skip(pageHeader.compressed_page_size);
+              }
+              break;
+            case DATA_PAGE:
+              valueReadSoFar += pageHeader.data_page_header.getNum_values();
+              ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
+              lastPage = buf;
+              ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
+              int lengthLeftToRead = pageHeader.compressed_page_size;
+              while (lengthLeftToRead > 0) {
+                lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
+              }
+              return new DataPageV1(
+                      decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+                      pageHeader.data_page_header.num_values,
+                      pageHeader.uncompressed_page_size,
+                      fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
+                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+              );
+            // TODO - finish testing this with more files
+            case DATA_PAGE_V2:
+              valueReadSoFar += pageHeader.data_page_header_v2.getNum_values();
+              buf = allocator.buffer(pageHeader.compressed_page_size);
+              lastPage = buf;
+              buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
+              lengthLeftToRead = pageHeader.compressed_page_size;
+              while (lengthLeftToRead > 0) {
+                lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
+              }
+              DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
+              int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
+              BytesInput decompressedPageData =
+                  decompressor.decompress(
+                      BytesInput.from(buffer, 0, pageHeader.compressed_page_size),
+                      pageHeader.uncompressed_page_size);
+              return new DataPageV2(
+                      dataHeaderV2.getNum_rows(),
+                      dataHeaderV2.getNum_nulls(),
+                      dataHeaderV2.getNum_values(),
+                      BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()),
+                      BytesInput.from(decompressedPageData.toByteBuffer(),
+                          dataHeaderV2.getRepetition_levels_byte_length(),
+                          dataHeaderV2.getDefinition_levels_byte_length()),
+                      parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
+                      BytesInput.from(decompressedPageData.toByteBuffer(),
+                          dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(),
+                          dataSize),
+                      uncompressedPageSize,
+                      fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
+                      dataHeaderV2.isIs_compressed()
+                  );
+            default:
+              in.skip(pageHeader.compressed_page_size);
+              break;
+          }
+        }
+        in.close();
+        return null;
+      } catch (OutOfMemoryRuntimeException e) {
+        throw e; // throw as it is
+      } catch (Exception e) {
+        throw new DrillRuntimeException("Error reading page." +
+          "\nFile path: " + path.toUri().getPath() +
+          "\nRow count: " + rowCount +
+          "\nColumn Chunk Metadata: " + metaData +
+          "\nPage Header: " + pageHeader +
+          "\nFile offset: " + fileOffset +
+          "\nSize: " + size +
+          "\nValue read so far: " + valueReadSoFar, e);
+      }
+    }
+
+    void close() {
+      if (lastPage != null) {
+        lastPage.release();
+        lastPage = null;
+      }
+    }
+  }
+
+  private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap();
+
+  public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) throws IOException {
+    FSDataInputStream in = fs.open(path);
+    streams.add(in);
+    in.seek(metaData.getStartingPos());
+    ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, descriptor, in);
+
+    columns.put(descriptor, reader);
+  }
+
+  public void close() throws IOException {
+    for (FSDataInputStream stream : streams) {
+      stream.close();
+    }
+    for (ColumnChunkIncPageReader reader : columns.values()) {
+      reader.close();
+    }
+  }
+
+  @Override
+  public PageReader getPageReader(ColumnDescriptor descriptor) {
+    return columns.get(descriptor);
+  }
+
+  @Override
+  public long getRowCount() {
+    return rowCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
new file mode 100644
index 0000000..564a0a4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
+
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.schema.MessageType;
+
+public class ColumnChunkPageWriteStoreExposer {
+
+  public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(
+      OperatorContext oContext,
+      BytesCompressor compressor,
+      MessageType schema
+      ) {
+    return new ColumnChunkPageWriteStore(compressor, schema, new ParquetDirectByteBufferAllocator(oContext));
+  }
+
+  public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException {
+    ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w);
+  }
+
+  // TODO(jaltekruse) - review, this used to have a method for closing a pageStore
+  // the parquet code once rebased did not include this close method, make sure it isn't needed
+  // I might have messed up the merge
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
deleted file mode 100644
index 4559083..0000000
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop;
-
-import io.netty.buffer.ByteBuf;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
-import org.apache.drill.exec.store.parquet.DirectCodecFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.bytes.BytesInput;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DataPage;
-import parquet.column.page.DataPageV1;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.format.DataPageHeaderV2;
-import parquet.format.PageHeader;
-import parquet.format.Util;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactory.BytesDecompressor;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.util.CompatibilityUtil;
-
-import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
-
-
-public class ColumnChunkIncReadStore implements PageReadStore {
-
-  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-
-  private DirectCodecFactory codecFactory;
-  private BufferAllocator allocator;
-  private FileSystem fs;
-  private Path path;
-  private long rowCount;
-  private List<FSDataInputStream> streams = new ArrayList();
-
-  public ColumnChunkIncReadStore(long rowCount, DirectCodecFactory codecFactory, BufferAllocator allocator,
-      FileSystem fs, Path path) {
-    this.codecFactory = codecFactory;
-    this.allocator = allocator;
-    this.fs = fs;
-    this.path = path;
-    this.rowCount = rowCount;
-  }
-
-
-  public class ColumnChunkIncPageReader implements PageReader {
-
-    ColumnChunkMetaData metaData;
-    ColumnDescriptor columnDescriptor;
-    long fileOffset;
-    long size;
-    private long valueReadSoFar = 0;
-
-    private DictionaryPage dictionaryPage;
-    private FSDataInputStream in;
-    private BytesDecompressor decompressor;
-
-    private ByteBuf lastPage;
-
-    public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) {
-      this.metaData = metaData;
-      this.columnDescriptor = columnDescriptor;
-      this.size = metaData.getTotalSize();
-      this.fileOffset = metaData.getStartingPos();
-      this.in = in;
-      this.decompressor = codecFactory.getDecompressor(metaData.getCodec());
-    }
-
-    @Override
-    public DictionaryPage readDictionaryPage() {
-      if (dictionaryPage == null) {
-        PageHeader pageHeader = new PageHeader();
-        long pos = 0;
-        try {
-          pos = in.getPos();
-          pageHeader = Util.readPageHeader(in);
-          if (pageHeader.getDictionary_page_header() == null) {
-            in.seek(pos);
-            return null;
-          }
-          dictionaryPage =
-                  new DictionaryPage(
-                          decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
-                          pageHeader.getDictionary_page_header().getNum_values(),
-                          parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
-                  );
-        } catch (Exception e) {
-          throw new DrillRuntimeException("Error reading dictionary page." +
-            "\nFile path: " + path.toUri().getPath() +
-            "\nRow count: " + rowCount +
-            "\nColumn Chunk Metadata: " + metaData +
-            "\nPage Header: " + pageHeader +
-            "\nFile offset: " + fileOffset +
-            "\nSize: " + size +
-            "\nValue read so far: " + valueReadSoFar +
-            "\nPosition: " + pos, e);
-        }
-      }
-      return dictionaryPage;
-    }
-
-    @Override
-    public long getTotalValueCount() {
-      return metaData.getValueCount();
-    }
-
-    @Override
-    public DataPage readPage() {
-      PageHeader pageHeader = new PageHeader();
-      try {
-        if (lastPage != null) {
-          lastPage.release();
-          lastPage = null;
-        }
-        while(valueReadSoFar < metaData.getValueCount()) {
-          pageHeader = Util.readPageHeader(in);
-          int uncompressedPageSize = pageHeader.getUncompressed_page_size();
-          int compressedPageSize = pageHeader.getCompressed_page_size();
-          switch (pageHeader.type) {
-            case DICTIONARY_PAGE:
-              if (dictionaryPage == null) {
-                dictionaryPage =
-                        new DictionaryPage(
-                                decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
-                                pageHeader.uncompressed_page_size,
-                                parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
-                        );
-              } else {
-                in.skip(pageHeader.compressed_page_size);
-              }
-              break;
-            case DATA_PAGE:
-              valueReadSoFar += pageHeader.data_page_header.getNum_values();
-              ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
-              lastPage = buf;
-              ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
-              while (buffer.remaining() > 0) {
-                CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size);
-              }
-              return new DataPageV1(
-                      decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
-                      pageHeader.data_page_header.num_values,
-                      pageHeader.uncompressed_page_size,
-                      fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
-                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
-                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
-                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
-              );
-            // TODO - finish testing this with more files
-            case DATA_PAGE_V2:
-              valueReadSoFar += pageHeader.data_page_header_v2.getNum_values();
-              buf = allocator.buffer(pageHeader.compressed_page_size);
-              lastPage = buf;
-              buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
-              while (buffer.remaining() > 0) {
-                CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size);
-              }
-              DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
-              int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
-              BytesInput decompressedPageData =
-                  decompressor.decompress(
-                      BytesInput.from(buffer, 0, pageHeader.compressed_page_size),
-                      pageHeader.uncompressed_page_size);
-              return new DataPageV2(
-                      dataHeaderV2.getNum_rows(),
-                      dataHeaderV2.getNum_nulls(),
-                      dataHeaderV2.getNum_values(),
-                      BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()),
-                      BytesInput.from(decompressedPageData.toByteBuffer(),
-                          dataHeaderV2.getRepetition_levels_byte_length(),
-                          dataHeaderV2.getDefinition_levels_byte_length()),
-                      parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
-                      BytesInput.from(decompressedPageData.toByteBuffer(),
-                          dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(),
-                          dataSize),
-                      uncompressedPageSize,
-                      fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
-                      dataHeaderV2.isIs_compressed()
-                  );
-            default:
-              in.skip(pageHeader.compressed_page_size);
-              break;
-          }
-        }
-        in.close();
-        return null;
-      } catch (OutOfMemoryRuntimeException e) {
-        throw e; // throw as it is
-      } catch (Exception e) {
-        throw new DrillRuntimeException("Error reading page." +
-          "\nFile path: " + path.toUri().getPath() +
-          "\nRow count: " + rowCount +
-          "\nColumn Chunk Metadata: " + metaData +
-          "\nPage Header: " + pageHeader +
-          "\nFile offset: " + fileOffset +
-          "\nSize: " + size +
-          "\nValue read so far: " + valueReadSoFar, e);
-      }
-    }
-
-    void close() {
-      if (lastPage != null) {
-        lastPage.release();
-        lastPage = null;
-      }
-    }
-  }
-
-  private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap();
-
-  public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) throws IOException {
-    FSDataInputStream in = fs.open(path);
-    streams.add(in);
-    in.seek(metaData.getStartingPos());
-    ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, descriptor, in);
-
-    columns.put(descriptor, reader);
-  }
-
-  public void close() throws IOException {
-    for (FSDataInputStream stream : streams) {
-      stream.close();
-    }
-    for (ColumnChunkIncPageReader reader : columns.values()) {
-      reader.close();
-    }
-  }
-
-  @Override
-  public PageReader getPageReader(ColumnDescriptor descriptor) {
-    return columns.get(descriptor);
-  }
-
-  @Override
-  public long getRowCount() {
-    return rowCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
deleted file mode 100644
index 743d185..0000000
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
-
-import parquet.column.page.PageWriteStore;
-import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.schema.MessageType;
-
-public class ColumnChunkPageWriteStoreExposer {
-
-  public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(
-      OperatorContext oContext,
-      BytesCompressor compressor,
-      MessageType schema,
-      int initialSize
-      ) {
-    return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new ParquetDirectByteBufferAllocator(oContext));
-  }
-
-  public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException {
-    ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w);
-  }
-
-  public static void close(PageWriteStore pageStore) throws IOException {
-    ((ColumnChunkPageWriteStore) pageStore).close();
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 60116e2..352f487 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -166,7 +166,11 @@ public class DrillTestWrapper {
       assertEquals("Different number of records returned", expectedValues.size(), actualValues.size());
 
       for (int i = 0; i < expectedValues.size(); i++) {
-        compareValuesErrorOnMismatch(expectedValues.get(i), actualValues.get(i), i, s);
+        try {
+          compareValuesErrorOnMismatch(expectedValues.get(i), actualValues.get(i), i, s);
+        } catch (Exception ex) {
+          throw new Exception(ex.getMessage() + "\n\n" + printNearbyRecords(expectedRecords, actualRecords, i), ex);
+        }
       }
     }
     if (actualRecords.size() < expectedRecords.size()) {
@@ -174,6 +178,34 @@ public class DrillTestWrapper {
     }
   }
 
+  private String printNearbyRecords(Map<String, List> expectedRecords, Map<String, List> actualRecords, int offset) {
+    StringBuilder expected = new StringBuilder();
+    StringBuilder actual = new StringBuilder();
+    expected.append("Expected Records near verification failure:\n");
+    actual.append("Actual Records near verification failure:\n");
+    int firstRecordToPrint = Math.max(0, offset - 5);
+    List<Object> expectedValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next());
+    List<Object> actualValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next());
+    int numberOfRecordsToPrint = Math.min(Math.min(10, expectedValuesInFirstColumn.size()), actualValuesInFirstColumn.size());
+    for (int i = firstRecordToPrint; i < numberOfRecordsToPrint; i++) {
+      expected.append("Record Number: ").append(i).append(" { ");
+      actual.append("Record Number: ").append(i).append(" { ");
+      for (String s : actualRecords.keySet()) {
+        List actualValues = actualRecords.get(s);
+        actual.append(s).append(" : ").append(actualValues.get(i)).append(",");
+      }
+      for (String s : expectedRecords.keySet()) {
+        List expectedValues = expectedRecords.get(s);
+        expected.append(s).append(" : ").append(expectedValues.get(i)).append(",");
+      }
+      expected.append(" }\n");
+      actual.append(" }\n");
+    }
+
+    return expected.append("\n\n").append(actual).toString();
+
+  }
+
   private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryDataBatch> records, RecordBatchLoader loader,
                                                                       BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java b/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java
index f9cfdb5..204eeb0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill;
 
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
 
 public class ParquetSchemaMerge {
   public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
index deeb7cb..9d9c1a7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.hamcrest.CoreMatchers;
 import org.junit.Test;
 
 import java.math.BigDecimal;
@@ -30,6 +31,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 // TODO - update framework to remove any dependency on the Drill engine for reading baseline result sets
 // currently using it with the assumption that the csv and json readers are well tested, and handling diverse
@@ -237,7 +239,8 @@ public class TestFrameworkTest extends BaseTestQuery{
         .baselineColumns("employee_id", "first_name", "last_name")
         .build().run();
     } catch (Exception ex) {
-      assertEquals("at position 0 column '`employee_id`' mismatched values, expected: 12(Integer) but received 12(Long)", ex.getMessage());
+      assertThat(ex.getMessage(), CoreMatchers.containsString(
+          "at position 0 column '`employee_id`' mismatched values, expected: 12(Integer) but received 12(Long)"));
       // this indicates successful completion of the test
       return;
     }
@@ -254,7 +257,8 @@ public class TestFrameworkTest extends BaseTestQuery{
           .baselineColumns("employee_id", "first_name", "last_name")
           .build().run();
     } catch (Exception ex) {
-      assertEquals("at position 0 column '`first_name`' mismatched values, expected: Jewel(String) but received Peggy(String)", ex.getMessage());
+      assertThat(ex.getMessage(), CoreMatchers.containsString(
+          "at position 0 column '`first_name`' mismatched values, expected: Jewel(String) but received Peggy(String)"));
       // this indicates successful completion of the test
       return;
     }
@@ -319,9 +323,9 @@ public class TestFrameworkTest extends BaseTestQuery{
           .optionSettingQueriesForBaseline("alter system set `store.json.all_text_mode` = true")
           .build().run();
     } catch (Exception ex) {
-      assertEquals("at position 1 column '`field_1`' mismatched values, " +
-          "expected: [\"5\",\"2\",\"3\",\"4\",\"1\",\"2\"](JsonStringArrayList) but received [\"5\"](JsonStringArrayList)",
-          ex.getMessage());
+      assertThat(ex.getMessage(), CoreMatchers.containsString(
+          "at position 1 column '`field_1`' mismatched values, " +
+          "expected: [\"5\",\"2\",\"3\",\"4\",\"1\",\"2\"](JsonStringArrayList) but received [\"5\"](JsonStringArrayList)"));
       // this indicates successful completion of the test
       return;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
index e01e45d..df63d7e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java
@@ -18,11 +18,18 @@
 package org.apache.drill.exec.impersonation;
 
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.physical.impl.writer.TestParquetWriter;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Note to future devs, please do not put random tests here. Make sure that they actually require
+ * access to a DFS instead of the local filesystem implementation used by default in the rest of
+ * the tests. Running this mini cluster is slow and it is best for these tests to only cover
+ * necessary cases.
+ */
 public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation {
 
   @BeforeClass
@@ -37,6 +44,38 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation
     // Create test table in minidfs.tmp schema for use in test queries
     test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`",
         MINIDFS_STORAGE_PLUGIN_NAME));
+
+    // generate a large enough file that the DFS will not fulfill requests to read a
+    // page of data all at once, see notes above testReadLargeParquetFileFromDFS()
+    test(String.format(
+        "CREATE TABLE %s.tmp.large_employee AS " +
+            "(SELECT employee_id, full_name FROM cp.`/employee.json`) " +
+            "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" +
+            "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" +
+            "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" +
+            "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" +
+            "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" +
+            "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" +
+        "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)",
+        MINIDFS_STORAGE_PLUGIN_NAME));
+  }
+
+  /**
+   * When working on merging the Drill fork of parquet a bug was found that only manifested when
+   * run on a cluster. It appears that the local implementation of the Hadoop FileSystem API
+   * never fails to provide all of the bytes that are requested in a single read. The API is
+   * designed to allow for a subset of the requested bytes be returned, and a client can decide
+   * if they want to do processing on teh subset that are available now before requesting the rest.
+   *
+   * For parquet's block compression of page data, we need all of the bytes. This test is here as
+   * a sanitycheck  to make sure we don't accidentally introduce an issue where a subset of the bytes
+   * are read and would otherwise require testing on a cluster for the full contract of the read method
+   * we are using to be exercised.
+   */
+  @Test
+  public void testReadLargeParquetFileFromDFS() throws Exception {
+    test(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME));
+    test("SELECT * FROM tmp.`large_employee`");
   }
 
   @Test // DRILL-3037

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 7c4ac1e..51d5d08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.writer;
 
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Date;
 
@@ -33,7 +32,6 @@ import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -47,6 +45,28 @@ public class TestParquetWriter extends BaseTestQuery {
   public TemporaryFolder folder = new TemporaryFolder();
   static FileSystem fs;
 
+  private String allTypesSelection =
+      "cast( `int_col` AS             int)                `int_col`, " +
+          "cast( `bigint_col` AS          bigint)             `bigint_col`, " +
+          // TODO(DRILL-3367)
+//          "cast( `decimal9_col` AS        decimal(9, 4))      `decimal9_col`, " +
+//          "cast( `decimal18_col` AS       decimal(18,9))      `decimal18_col`, " +
+//          "cast( `decimal28sparse_col` AS decimal(28, 14))    `decimal28sparse_col`, " +
+//          "cast( `decimal38sparse_col` AS decimal(38, 19))    `decimal38sparse_col`, " +
+          "cast( `date_col` AS            date)               `date_col`, " +
+          "cast( `timestamp_col` AS       timestamp)          `timestamp_col`, " +
+          "cast( `float4_col` AS          float)              `float4_col`, " +
+          "cast( `float8_col` AS          double)             `float8_col`, " +
+          "cast( `varbinary_col` AS       varbinary(65000))   `varbinary_col`, " +
+          // TODO(DRILL-2297)
+//        "cast( `intervalyear_col` AS    interval year)      `intervalyear_col`, " +
+          "cast( `intervalday_col` AS     interval day)       `intervalday_col`, " +
+          "cast( `bit_col` AS             boolean)            `bit_col`, " +
+          "      `varchar_col`                                `varchar_col`, " +
+          "cast( `time_col` AS            time)               `time_col` ";
+
+  private String allTypesTable = "cp.`/parquet/alltypes.json`";
+
   @BeforeClass
   public static void initFs() throws Exception {
     Configuration conf = new Configuration();
@@ -61,6 +81,12 @@ public class TestParquetWriter extends BaseTestQuery {
     test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
   }
 
+  @Test
+  public void testSmallFileValueReadWrite() throws Exception {
+    String selection = "key";
+    String inputTable = "cp.`/store/json/intData.json`";
+    runTestAndValidate(selection, selection, inputTable, "smallFileTest");
+  }
 
   @Test
   public void testSimple() throws Exception {
@@ -103,6 +129,47 @@ public class TestParquetWriter extends BaseTestQuery {
   }
 
   @Test
+  public void testAllScalarTypes() throws Exception {
+    /// read once with the flat reader
+    runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
+
+    try {
+      // read all of the types with the complex reader
+      test(String.format("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER));
+      runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
+    } finally {
+      test(String.format("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER));
+    }
+  }
+
+  @Test
+  public void testAllScalarTypesDictionary() throws Exception {
+    try {
+      test(String.format("alter session set %s = true", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+      /// read once with the flat reader
+      runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
+
+      // read all of the types with the complex reader
+      test(String.format("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER));
+      runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json");
+    } finally {
+      test(String.format("alter session set %s = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+    }
+  }
+
+  @Test
+  public void testDictionaryEncoding() throws Exception {
+    String selection = "type";
+    String inputTable = "cp.`donuts.json`";
+    try {
+      test(String.format("alter session set %s = true", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+      runTestAndValidate(selection, selection, inputTable, "donuts_json");
+    } finally {
+      test(String.format("alter session set %s = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+    }
+  }
+
+  @Test
   public void testComplex() throws Exception {
     String selection = "*";
     String inputTable = "cp.`donuts.json`";

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
deleted file mode 100644
index 004a8d0..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import io.netty.buffer.DrillBuf;
-
-import java.nio.ByteBuffer;
-import java.util.Random;
-
-import org.apache.drill.common.DeferredException;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.RootAllocatorFactory;
-import org.apache.drill.exec.store.parquet.DirectCodecFactory;
-import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput;
-import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-
-import parquet.bytes.BytesInput;
-import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-public class TestDirectCodecFactory extends ExecTest {
-  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDirectCodecFactory.class);
-  private final DrillConfig drillConfig = DrillConfig.create();
-
-  private static enum Decompression {
-    ON_HEAP, OFF_HEAP, DRILLBUF
-  }
-
-  private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) throws Exception {
-    DrillBuf rawBuf = null;
-    DrillBuf outBuf = null;
-    try (final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig);
-        final DirectCodecFactory codecFactory = new DirectCodecFactory(new Configuration(), allocator)) {
-      try {
-        rawBuf = allocator.buffer(size);
-        final byte[] rawArr = new byte[size];
-        outBuf = allocator.buffer(size * 2);
-        final Random r = new Random();
-        final byte[] random = new byte[1024];
-        int pos = 0;
-        while (pos < size) {
-          r.nextBytes(random);
-          rawBuf.writeBytes(random);
-          System.arraycopy(random, 0, rawArr, pos, random.length);
-          pos += random.length;
-        }
-
-        final BytesCompressor c = codecFactory.getCompressor(codec, 64 * 1024);
-        final DirectBytesDecompressor d = codecFactory.getDecompressor(codec);
-
-        final BytesInput compressed;
-        if (useOnHeapCompression) {
-          compressed = c.compress(BytesInput.from(rawArr));
-        } else {
-          compressed = c.compress(new ByteBufBytesInput(rawBuf));
-        }
-
-        switch (decomp) {
-        case DRILLBUF: {
-          final ByteBuffer buf = compressed.toByteBuffer();
-          final DrillBuf b = allocator.buffer(buf.capacity());
-          try {
-            b.writeBytes(buf);
-            d.decompress(b, (int) compressed.size(), outBuf, size);
-            for (int i = 0; i < size; i++) {
-              Assert.assertTrue("Data didn't match at " + i, outBuf.getByte(i) == rawBuf.getByte(i));
-            }
-          } finally {
-            b.release();
-          }
-          break;
-        }
-
-        case OFF_HEAP: {
-          final ByteBuffer buf = compressed.toByteBuffer();
-          final DrillBuf b = allocator.buffer(buf.capacity());
-          try {
-            b.writeBytes(buf);
-            final BytesInput input = d.decompress(new ByteBufBytesInput(b), size);
-            Assert.assertArrayEquals(input.toByteArray(), rawArr);
-          } finally {
-            b.release();
-          }
-          break;
-        }
-        case ON_HEAP: {
-          final byte[] buf = compressed.toByteArray();
-          final BytesInput input = d.decompress(BytesInput.from(buf), size);
-          Assert.assertArrayEquals(input.toByteArray(), rawArr);
-          break;
-        }
-        }
-      } catch (Exception e) {
-        final String msg = String.format(
-            "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d",
-            codec.name(),
-            useOnHeapCompression, decomp.name(), size);
-        System.out.println(msg);
-        throw new RuntimeException(msg, e);
-      } finally {
-        if (rawBuf != null) {
-          rawBuf.release();
-        }
-        if (outBuf != null) {
-          outBuf.release();
-        }
-      }
-    }
-  }
-
-  @Test
-  public void compressionCodecs() throws Exception {
-    final int[] sizes = { 4 * 1024, 1 * 1024 * 1024 };
-    final boolean[] comp = { true, false };
-
-    try (final DeferredException ex = new DeferredException()) {
-      for (final int size : sizes) {
-        for (final boolean useOnHeapComp : comp) {
-          for (final Decompression decomp : Decompression.values()) {
-            for (final CompressionCodecName codec : CompressionCodecName.values()) {
-              if (codec == CompressionCodecName.LZO) {
-                // not installed as gpl.
-                continue;
-              }
-              try {
-                test(size, codec, useOnHeapComp, decomp);
-              } catch (Exception e) {
-                ex.addException(e);
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 14cfd8e..d5f7352 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -63,18 +63,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.parquet.hadoop.CodecFactory;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import parquet.bytes.BytesInput;
-import parquet.column.page.DataPageV1;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.MessageType;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Stopwatch;
@@ -638,7 +639,8 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     final BufferAllocator allocator = RootAllocatorFactory.newRoot(c);
     for(int i = 0; i < 25; i++) {
       final ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
-          new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns);
+          CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0),
+          f.getParquetMetadata(), columns);
       final TestOutputMutator mutator = new TestOutputMutator(allocator);
       rr.setup(null, mutator);
       final Stopwatch watch = new Stopwatch();

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
index 013ea95..593e0db 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
@@ -17,25 +17,26 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import static parquet.column.Encoding.PLAIN;
-import static parquet.column.Encoding.RLE;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.RLE;
 
 import java.util.HashMap;
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.store.ByteArrayUtil;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import parquet.bytes.BytesInput;
-import parquet.bytes.DirectByteBufferAllocator;
-import parquet.column.ColumnDescriptor;
-import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
 
 public class TestFileGenerator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFileGenerator.class);
@@ -185,16 +186,19 @@ public class TestFileGenerator {
         ColumnDescriptor c1 = schema.getColumnDescription(path1);
 
         w.startColumn(c1, props.recordsPerRowGroup, codec);
-        int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages);
+        final int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages);
+        final int PAGE_SIZE = 1024 * 1024; // 1 MB
         byte[] bytes;
         RunLengthBitPackingHybridValuesWriter defLevels = new RunLengthBitPackingHybridValuesWriter(
-          MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS,
-          valsPerPage,
-          new DirectByteBufferAllocator());
+            MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS,
+            valsPerPage,
+            PAGE_SIZE,
+            new DirectByteBufferAllocator());
         RunLengthBitPackingHybridValuesWriter repLevels = new RunLengthBitPackingHybridValuesWriter(
-          MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS,
-          valsPerPage,
-          new DirectByteBufferAllocator());
+            MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS,
+            valsPerPage,
+            PAGE_SIZE,
+            new DirectByteBufferAllocator());
         // for variable length binary fields
         int bytesNeededToEncodeLength = 4;
         if ((int) fieldInfo.bitLength > 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/resources/store/json/donuts_short.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/donuts_short.json b/exec/java-exec/src/test/resources/store/json/donuts_short.json
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java
index d119ec8..286ebf4 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.jdbc;
 
+import com.google.common.base.Preconditions;
+
 import java.sql.Array;
 import java.sql.Blob;
 import java.sql.CallableStatement;
@@ -36,8 +38,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executor;
 
-import parquet.Preconditions;
-
 /**
  * A connection decorator that ignores {@link Connection#close} calls.
  *

http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 93b710c..05d8631 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
     <dep.junit.version>4.11</dep.junit.version>
     <dep.slf4j.version>1.7.6</dep.slf4j.version>
     <forkCount>2</forkCount>
-    <parquet.version>1.6.0rc3-drill-r0.3</parquet.version>
+    <parquet.version>1.8.1-drill-r0</parquet.version>
     <sqlline.version>1.1.9-drill-r7</sqlline.version>
 
     <!--


Mime
View raw message