Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7BCF618ED6 for ; Thu, 5 Nov 2015 05:56:40 +0000 (UTC) Received: (qmail 31637 invoked by uid 500); 5 Nov 2015 05:56:40 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 31553 invoked by uid 500); 5 Nov 2015 05:56:40 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 31296 invoked by uid 99); 5 Nov 2015 05:56:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Nov 2015 05:56:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 107B6E1771; Thu, 5 Nov 2015 05:56:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jacques@apache.org To: commits@drill.apache.org Date: Thu, 05 Nov 2015 05:56:46 -0000 Message-Id: <31ad5b35f19e4ec09cb809c0a207716f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [8/9] drill git commit: DRILL-4028: Update Drill to leverage latest version of Parquet library. http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 3a00a4c..f42996b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -39,7 +39,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.store.AbstractRecordReader; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; @@ -47,15 +46,16 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.column.ColumnDescriptor; -import parquet.format.FileMetaData; -import parquet.format.SchemaElement; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.schema.PrimitiveType; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.FileMetaData; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.PrimitiveType; import com.google.common.collect.Lists; @@ -101,7 +101,7 @@ public class ParquetRecordReader extends AbstractRecordReader { // records specified in the row group metadata long mockRecordsRead; - private final DirectCodecFactory codecFactory; + private final CodecFactory codecFactory; int rowGroupIndex; long totalRecordsRead; private final FragmentContext fragmentContext; @@ -110,7 +110,7 @@ public class ParquetRecordReader extends AbstractRecordReader { String path, int rowGroupIndex, FileSystem fs, - DirectCodecFactory codecFactory, + CodecFactory codecFactory, ParquetMetadata footer, List columns) throws ExecutionSetupException { this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer, @@ -123,7 +123,7 @@ public class ParquetRecordReader extends AbstractRecordReader { String path, int rowGroupIndex, FileSystem fs, - DirectCodecFactory codecFactory, + CodecFactory codecFactory, ParquetMetadata footer, List columns) throws ExecutionSetupException { this.hadoopPath = new Path(path); @@ -136,7 +136,7 @@ public class ParquetRecordReader extends AbstractRecordReader { setColumns(columns); } - public DirectCodecFactory getCodecFactory() { + public CodecFactory getCodecFactory() { return codecFactory; } @@ -471,7 +471,7 @@ public class ParquetRecordReader extends AbstractRecordReader { columnStatuses = null; } - codecFactory.close(); + codecFactory.release(); if (varLengthReader != null) { for (final VarLengthColumn r : varLengthReader.columns) { http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java index a33e616..b6d1a72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java @@ -17,21 +17,17 @@ ******************************************************************************/ package org.apache.drill.exec.store.parquet.columnreaders; -import static parquet.Preconditions.checkArgument; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.util.CoreDecimalUtility; -import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; -import org.apache.drill.exec.work.ExecErrorConstants; -import parquet.format.ConvertedType; -import parquet.format.SchemaElement; -import parquet.schema.PrimitiveType; +import org.apache.parquet.format.ConvertedType; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.schema.PrimitiveType; + +import static com.google.common.base.Preconditions.checkArgument; public class ParquetToDrillTypeConverter { http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java index 8e72bff..a62e8c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java @@ -22,11 +22,11 @@ import java.io.IOException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.Encoding; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.io.api.Binary; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.Encoding; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.api.Binary; public abstract class VarLengthColumn extends ColumnReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumn.class); http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java index fe5266c..ba126d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.columnreaders; import io.netty.buffer.DrillBuf; import java.math.BigDecimal; +import java.nio.ByteBuffer; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; @@ -34,9 +35,9 @@ import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; public class VarLengthColumnReaders { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumnReaders.class); @@ -222,7 +223,8 @@ public class VarLengthColumnReaders { } if (usingDictionary) { - mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0, currDictValToWrite.length()); + ByteBuffer buf = currDictValToWrite.toByteBuffer(); + mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length()); } else { mutator.setSafe(index, 1, start, start + length, value); } @@ -257,7 +259,8 @@ public class VarLengthColumnReaders { if (usingDictionary) { currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); - mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0, currDictValToWrite.length()); + ByteBuffer buf = currDictValToWrite.toByteBuffer(); + mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length()); } else { mutator.setSafe(index, start, start + length, value); } @@ -294,8 +297,8 @@ public class VarLengthColumnReaders { } if (usingDictionary) { - mutator.setSafe(index, currDictValToWrite.toByteBuffer(), 0, - currDictValToWrite.length()); + ByteBuffer buf = currDictValToWrite.toByteBuffer(); + mutator.setSafe(index, buf, buf.position(), currDictValToWrite.length()); } else { mutator.setSafe(index, 1, start, start + length, value); } http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java index 1e14a3e..6a86cea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java @@ -25,11 +25,11 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; -import parquet.column.ColumnDescriptor; -import parquet.format.Encoding; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.io.api.Binary; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.format.Encoding; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.api.Binary; public abstract class VarLengthValuesColumn extends VarLengthColumn { http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java index 6b8154a..5bc8ad2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -67,17 +67,17 @@ import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter; import org.apache.drill.exec.vector.complex.writer.VarCharWriter; import org.joda.time.DateTimeUtils; -import parquet.io.api.Binary; -import parquet.io.api.Converter; -import parquet.io.api.GroupConverter; -import parquet.io.api.PrimitiveConverter; -import parquet.schema.DecimalMetadata; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; -import parquet.schema.Type; -import parquet.schema.Type.Repetition; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; import com.google.common.collect.Lists; http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 01a9853..9e6919b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -42,7 +42,7 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; import org.apache.drill.exec.store.parquet.RowGroupReadEntry; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; @@ -51,17 +51,19 @@ import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.hadoop.fs.Path; -import parquet.column.ColumnDescriptor; -import parquet.common.schema.ColumnPath; -import parquet.hadoop.ColumnChunkIncReadStore; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.hadoop.ColumnChunkIncReadStore; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -80,7 +82,7 @@ public class DrillParquetReader extends AbstractRecordReader { private RowGroupReadEntry entry; private VectorContainerWriter writer; private ColumnChunkIncReadStore pageReadStore; - private parquet.io.RecordReader recordReader; + private RecordReader recordReader; private DrillParquetRecordMaterializer recordMaterializer; private int recordCount; private List primitiveVectors; @@ -246,7 +248,8 @@ public class DrillParquetReader extends AbstractRecordReader { recordCount = (int) blockMetaData.getRowCount(); pageReadStore = new ColumnChunkIncReadStore(recordCount, - new DirectCodecFactory(fileSystem.getConf(), operatorContext.getAllocator()), operatorContext.getAllocator(), + CodecFactory.createDirectCodecFactory(fileSystem.getConf(), + new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), operatorContext.getAllocator(), fileSystem, filePath); for (String[] path : schema.getPaths()) { http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java index a80eb57..6b7edc4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java @@ -22,9 +22,9 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; -import parquet.io.api.GroupConverter; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; import java.util.Collection; import java.util.List; http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java index 22bd7df..8ade25c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -35,8 +36,6 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.store.StoragePluginRegistry; -import parquet.org.codehaus.jackson.annotate.JsonCreator; - import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java new file mode 100644 index 0000000..28f6390 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.hadoop; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.store.parquet.ColumnDataReader; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; + + +public class ColumnChunkIncReadStore implements PageReadStore { + + private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + + private CodecFactory codecFactory; + private BufferAllocator allocator; + private FileSystem fs; + private Path path; + private long rowCount; + private List streams = new ArrayList(); + + public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, + FileSystem fs, Path path) { + this.codecFactory = codecFactory; + this.allocator = allocator; + this.fs = fs; + this.path = path; + this.rowCount = rowCount; + } + + + public class ColumnChunkIncPageReader implements PageReader { + + ColumnChunkMetaData metaData; + ColumnDescriptor columnDescriptor; + long fileOffset; + long size; + private long valueReadSoFar = 0; + + private DictionaryPage dictionaryPage; + private FSDataInputStream in; + private BytesDecompressor decompressor; + + private ByteBuf lastPage; + + public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) throws IOException { + this.metaData = metaData; + this.columnDescriptor = columnDescriptor; + this.size = metaData.getTotalSize(); + this.fileOffset = metaData.getStartingPos(); + this.in = in; + this.decompressor = codecFactory.getDecompressor(metaData.getCodec()); + } + + @Override + public DictionaryPage readDictionaryPage() { + if (dictionaryPage == null) { + PageHeader pageHeader = new PageHeader(); + long pos = 0; + try { + pos = in.getPos(); + pageHeader = Util.readPageHeader(in); + if (pageHeader.getDictionary_page_header() == null) { + in.seek(pos); + return null; + } + dictionaryPage = + new DictionaryPage( + decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + pageHeader.getDictionary_page_header().getNum_values(), + parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding) + ); + } catch (Exception e) { + throw new DrillRuntimeException("Error reading dictionary page." + + "\nFile path: " + path.toUri().getPath() + + "\nRow count: " + rowCount + + "\nColumn Chunk Metadata: " + metaData + + "\nPage Header: " + pageHeader + + "\nFile offset: " + fileOffset + + "\nSize: " + size + + "\nValue read so far: " + valueReadSoFar + + "\nPosition: " + pos, e); + } + } + return dictionaryPage; + } + + @Override + public long getTotalValueCount() { + return metaData.getValueCount(); + } + + @Override + public DataPage readPage() { + PageHeader pageHeader = new PageHeader(); + try { + if (lastPage != null) { + lastPage.release(); + lastPage = null; + } + while(valueReadSoFar < metaData.getValueCount()) { + pageHeader = Util.readPageHeader(in); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + switch (pageHeader.type) { + case DICTIONARY_PAGE: + if (dictionaryPage == null) { + dictionaryPage = + new DictionaryPage( + decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + pageHeader.uncompressed_page_size, + parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding) + ); + } else { + in.skip(pageHeader.compressed_page_size); + } + break; + case DATA_PAGE: + valueReadSoFar += pageHeader.data_page_header.getNum_values(); + ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size); + lastPage = buf; + ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); + int lengthLeftToRead = pageHeader.compressed_page_size; + while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead); + } + return new DataPageV1( + decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + pageHeader.data_page_header.num_values, + pageHeader.uncompressed_page_size, + fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()), + parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding), + parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding), + parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding) + ); + // TODO - finish testing this with more files + case DATA_PAGE_V2: + valueReadSoFar += pageHeader.data_page_header_v2.getNum_values(); + buf = allocator.buffer(pageHeader.compressed_page_size); + lastPage = buf; + buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); + lengthLeftToRead = pageHeader.compressed_page_size; + while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead); + } + DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); + int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length(); + BytesInput decompressedPageData = + decompressor.decompress( + BytesInput.from(buffer, 0, pageHeader.compressed_page_size), + pageHeader.uncompressed_page_size); + return new DataPageV2( + dataHeaderV2.getNum_rows(), + dataHeaderV2.getNum_nulls(), + dataHeaderV2.getNum_values(), + BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()), + BytesInput.from(decompressedPageData.toByteBuffer(), + dataHeaderV2.getRepetition_levels_byte_length(), + dataHeaderV2.getDefinition_levels_byte_length()), + parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()), + BytesInput.from(decompressedPageData.toByteBuffer(), + dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(), + dataSize), + uncompressedPageSize, + fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()), + dataHeaderV2.isIs_compressed() + ); + default: + in.skip(pageHeader.compressed_page_size); + break; + } + } + in.close(); + return null; + } catch (OutOfMemoryRuntimeException e) { + throw e; // throw as it is + } catch (Exception e) { + throw new DrillRuntimeException("Error reading page." + + "\nFile path: " + path.toUri().getPath() + + "\nRow count: " + rowCount + + "\nColumn Chunk Metadata: " + metaData + + "\nPage Header: " + pageHeader + + "\nFile offset: " + fileOffset + + "\nSize: " + size + + "\nValue read so far: " + valueReadSoFar, e); + } + } + + void close() { + if (lastPage != null) { + lastPage.release(); + lastPage = null; + } + } + } + + private Map columns = new HashMap(); + + public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) throws IOException { + FSDataInputStream in = fs.open(path); + streams.add(in); + in.seek(metaData.getStartingPos()); + ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, descriptor, in); + + columns.put(descriptor, reader); + } + + public void close() throws IOException { + for (FSDataInputStream stream : streams) { + stream.close(); + } + for (ColumnChunkIncPageReader reader : columns.values()) { + reader.close(); + } + } + + @Override + public PageReader getPageReader(ColumnDescriptor descriptor) { + return columns.get(descriptor); + } + + @Override + public long getRowCount() { + return rowCount; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java new file mode 100644 index 0000000..564a0a4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.IOException; + +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; + +import org.apache.parquet.column.page.PageWriteStore; +import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; +import org.apache.parquet.schema.MessageType; + +public class ColumnChunkPageWriteStoreExposer { + + public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore( + OperatorContext oContext, + BytesCompressor compressor, + MessageType schema + ) { + return new ColumnChunkPageWriteStore(compressor, schema, new ParquetDirectByteBufferAllocator(oContext)); + } + + public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException { + ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w); + } + + // TODO(jaltekruse) - review, this used to have a method for closing a pageStore + // the parquet code once rebased did not include this close method, make sure it isn't needed + // I might have messed up the merge + +} http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java deleted file mode 100644 index 4559083..0000000 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package parquet.hadoop; - -import io.netty.buffer.ByteBuf; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import parquet.bytes.BytesInput; -import parquet.column.ColumnDescriptor; -import parquet.column.page.DataPage; -import parquet.column.page.DataPageV1; -import parquet.column.page.DataPageV2; -import parquet.column.page.DictionaryPage; -import parquet.column.page.PageReadStore; -import parquet.column.page.PageReader; -import parquet.format.DataPageHeaderV2; -import parquet.format.PageHeader; -import parquet.format.Util; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.CodecFactory.BytesDecompressor; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.util.CompatibilityUtil; - -import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; - - -public class ColumnChunkIncReadStore implements PageReadStore { - - private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - - private DirectCodecFactory codecFactory; - private BufferAllocator allocator; - private FileSystem fs; - private Path path; - private long rowCount; - private List streams = new ArrayList(); - - public ColumnChunkIncReadStore(long rowCount, DirectCodecFactory codecFactory, BufferAllocator allocator, - FileSystem fs, Path path) { - this.codecFactory = codecFactory; - this.allocator = allocator; - this.fs = fs; - this.path = path; - this.rowCount = rowCount; - } - - - public class ColumnChunkIncPageReader implements PageReader { - - ColumnChunkMetaData metaData; - ColumnDescriptor columnDescriptor; - long fileOffset; - long size; - private long valueReadSoFar = 0; - - private DictionaryPage dictionaryPage; - private FSDataInputStream in; - private BytesDecompressor decompressor; - - private ByteBuf lastPage; - - public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) { - this.metaData = metaData; - this.columnDescriptor = columnDescriptor; - this.size = metaData.getTotalSize(); - this.fileOffset = metaData.getStartingPos(); - this.in = in; - this.decompressor = codecFactory.getDecompressor(metaData.getCodec()); - } - - @Override - public DictionaryPage readDictionaryPage() { - if (dictionaryPage == null) { - PageHeader pageHeader = new PageHeader(); - long pos = 0; - try { - pos = in.getPos(); - pageHeader = Util.readPageHeader(in); - if (pageHeader.getDictionary_page_header() == null) { - in.seek(pos); - return null; - } - dictionaryPage = - new DictionaryPage( - decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), - pageHeader.getDictionary_page_header().getNum_values(), - parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding) - ); - } catch (Exception e) { - throw new DrillRuntimeException("Error reading dictionary page." + - "\nFile path: " + path.toUri().getPath() + - "\nRow count: " + rowCount + - "\nColumn Chunk Metadata: " + metaData + - "\nPage Header: " + pageHeader + - "\nFile offset: " + fileOffset + - "\nSize: " + size + - "\nValue read so far: " + valueReadSoFar + - "\nPosition: " + pos, e); - } - } - return dictionaryPage; - } - - @Override - public long getTotalValueCount() { - return metaData.getValueCount(); - } - - @Override - public DataPage readPage() { - PageHeader pageHeader = new PageHeader(); - try { - if (lastPage != null) { - lastPage.release(); - lastPage = null; - } - while(valueReadSoFar < metaData.getValueCount()) { - pageHeader = Util.readPageHeader(in); - int uncompressedPageSize = pageHeader.getUncompressed_page_size(); - int compressedPageSize = pageHeader.getCompressed_page_size(); - switch (pageHeader.type) { - case DICTIONARY_PAGE: - if (dictionaryPage == null) { - dictionaryPage = - new DictionaryPage( - decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), - pageHeader.uncompressed_page_size, - parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding) - ); - } else { - in.skip(pageHeader.compressed_page_size); - } - break; - case DATA_PAGE: - valueReadSoFar += pageHeader.data_page_header.getNum_values(); - ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size); - lastPage = buf; - ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); - while (buffer.remaining() > 0) { - CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size); - } - return new DataPageV1( - decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), - pageHeader.data_page_header.num_values, - pageHeader.uncompressed_page_size, - fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()), - parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding), - parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding), - parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding) - ); - // TODO - finish testing this with more files - case DATA_PAGE_V2: - valueReadSoFar += pageHeader.data_page_header_v2.getNum_values(); - buf = allocator.buffer(pageHeader.compressed_page_size); - lastPage = buf; - buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); - while (buffer.remaining() > 0) { - CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size); - } - DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); - int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length(); - BytesInput decompressedPageData = - decompressor.decompress( - BytesInput.from(buffer, 0, pageHeader.compressed_page_size), - pageHeader.uncompressed_page_size); - return new DataPageV2( - dataHeaderV2.getNum_rows(), - dataHeaderV2.getNum_nulls(), - dataHeaderV2.getNum_values(), - BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()), - BytesInput.from(decompressedPageData.toByteBuffer(), - dataHeaderV2.getRepetition_levels_byte_length(), - dataHeaderV2.getDefinition_levels_byte_length()), - parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()), - BytesInput.from(decompressedPageData.toByteBuffer(), - dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(), - dataSize), - uncompressedPageSize, - fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()), - dataHeaderV2.isIs_compressed() - ); - default: - in.skip(pageHeader.compressed_page_size); - break; - } - } - in.close(); - return null; - } catch (OutOfMemoryRuntimeException e) { - throw e; // throw as it is - } catch (Exception e) { - throw new DrillRuntimeException("Error reading page." + - "\nFile path: " + path.toUri().getPath() + - "\nRow count: " + rowCount + - "\nColumn Chunk Metadata: " + metaData + - "\nPage Header: " + pageHeader + - "\nFile offset: " + fileOffset + - "\nSize: " + size + - "\nValue read so far: " + valueReadSoFar, e); - } - } - - void close() { - if (lastPage != null) { - lastPage.release(); - lastPage = null; - } - } - } - - private Map columns = new HashMap(); - - public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) throws IOException { - FSDataInputStream in = fs.open(path); - streams.add(in); - in.seek(metaData.getStartingPos()); - ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, descriptor, in); - - columns.put(descriptor, reader); - } - - public void close() throws IOException { - for (FSDataInputStream stream : streams) { - stream.close(); - } - for (ColumnChunkIncPageReader reader : columns.values()) { - reader.close(); - } - } - - @Override - public PageReader getPageReader(ColumnDescriptor descriptor) { - return columns.get(descriptor); - } - - @Override - public long getRowCount() { - return rowCount; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java deleted file mode 100644 index 743d185..0000000 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package parquet.hadoop; - -import java.io.IOException; - -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; - -import parquet.column.page.PageWriteStore; -import parquet.hadoop.CodecFactory.BytesCompressor; -import parquet.schema.MessageType; - -public class ColumnChunkPageWriteStoreExposer { - - public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore( - OperatorContext oContext, - BytesCompressor compressor, - MessageType schema, - int initialSize - ) { - return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new ParquetDirectByteBufferAllocator(oContext)); - } - - public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException { - ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w); - } - - public static void close(PageWriteStore pageStore) throws IOException { - ((ColumnChunkPageWriteStore) pageStore).close(); - - } - - -} http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java index 60116e2..352f487 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java @@ -166,7 +166,11 @@ public class DrillTestWrapper { assertEquals("Different number of records returned", expectedValues.size(), actualValues.size()); for (int i = 0; i < expectedValues.size(); i++) { - compareValuesErrorOnMismatch(expectedValues.get(i), actualValues.get(i), i, s); + try { + compareValuesErrorOnMismatch(expectedValues.get(i), actualValues.get(i), i, s); + } catch (Exception ex) { + throw new Exception(ex.getMessage() + "\n\n" + printNearbyRecords(expectedRecords, actualRecords, i), ex); + } } } if (actualRecords.size() < expectedRecords.size()) { @@ -174,6 +178,34 @@ public class DrillTestWrapper { } } + private String printNearbyRecords(Map expectedRecords, Map actualRecords, int offset) { + StringBuilder expected = new StringBuilder(); + StringBuilder actual = new StringBuilder(); + expected.append("Expected Records near verification failure:\n"); + actual.append("Actual Records near verification failure:\n"); + int firstRecordToPrint = Math.max(0, offset - 5); + List expectedValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); + List actualValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); + int numberOfRecordsToPrint = Math.min(Math.min(10, expectedValuesInFirstColumn.size()), actualValuesInFirstColumn.size()); + for (int i = firstRecordToPrint; i < numberOfRecordsToPrint; i++) { + expected.append("Record Number: ").append(i).append(" { "); + actual.append("Record Number: ").append(i).append(" { "); + for (String s : actualRecords.keySet()) { + List actualValues = actualRecords.get(s); + actual.append(s).append(" : ").append(actualValues.get(i)).append(","); + } + for (String s : expectedRecords.keySet()) { + List expectedValues = expectedRecords.get(s); + expected.append(s).append(" : ").append(expectedValues.get(i)).append(","); + } + expected.append(" }\n"); + actual.append(" }\n"); + } + + return expected.append("\n\n").append(actual).toString(); + + } + private Map addToHyperVectorMap(List records, RecordBatchLoader loader, BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java b/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java index f9cfdb5..204eeb0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java +++ b/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java @@ -17,11 +17,11 @@ */ package org.apache.drill; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type.Repetition; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; public class ParquetSchemaMerge { public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java index deeb7cb..9d9c1a7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java @@ -23,6 +23,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.hamcrest.CoreMatchers; import org.junit.Test; import java.math.BigDecimal; @@ -30,6 +31,7 @@ import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; // TODO - update framework to remove any dependency on the Drill engine for reading baseline result sets // currently using it with the assumption that the csv and json readers are well tested, and handling diverse @@ -237,7 +239,8 @@ public class TestFrameworkTest extends BaseTestQuery{ .baselineColumns("employee_id", "first_name", "last_name") .build().run(); } catch (Exception ex) { - assertEquals("at position 0 column '`employee_id`' mismatched values, expected: 12(Integer) but received 12(Long)", ex.getMessage()); + assertThat(ex.getMessage(), CoreMatchers.containsString( + "at position 0 column '`employee_id`' mismatched values, expected: 12(Integer) but received 12(Long)")); // this indicates successful completion of the test return; } @@ -254,7 +257,8 @@ public class TestFrameworkTest extends BaseTestQuery{ .baselineColumns("employee_id", "first_name", "last_name") .build().run(); } catch (Exception ex) { - assertEquals("at position 0 column '`first_name`' mismatched values, expected: Jewel(String) but received Peggy(String)", ex.getMessage()); + assertThat(ex.getMessage(), CoreMatchers.containsString( + "at position 0 column '`first_name`' mismatched values, expected: Jewel(String) but received Peggy(String)")); // this indicates successful completion of the test return; } @@ -319,9 +323,9 @@ public class TestFrameworkTest extends BaseTestQuery{ .optionSettingQueriesForBaseline("alter system set `store.json.all_text_mode` = true") .build().run(); } catch (Exception ex) { - assertEquals("at position 1 column '`field_1`' mismatched values, " + - "expected: [\"5\",\"2\",\"3\",\"4\",\"1\",\"2\"](JsonStringArrayList) but received [\"5\"](JsonStringArrayList)", - ex.getMessage()); + assertThat(ex.getMessage(), CoreMatchers.containsString( + "at position 1 column '`field_1`' mismatched values, " + + "expected: [\"5\",\"2\",\"3\",\"4\",\"1\",\"2\"](JsonStringArrayList) but received [\"5\"](JsonStringArrayList)")); // this indicates successful completion of the test return; } http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java index e01e45d..df63d7e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationDisabledWithMiniDFS.java @@ -18,11 +18,18 @@ package org.apache.drill.exec.impersonation; import com.google.common.collect.Maps; +import org.apache.drill.exec.physical.impl.writer.TestParquetWriter; import org.apache.drill.exec.store.dfs.WorkspaceConfig; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +/** + * Note to future devs, please do not put random tests here. Make sure that they actually require + * access to a DFS instead of the local filesystem implementation used by default in the rest of + * the tests. Running this mini cluster is slow and it is best for these tests to only cover + * necessary cases. + */ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation { @BeforeClass @@ -37,6 +44,38 @@ public class TestImpersonationDisabledWithMiniDFS extends BaseTestImpersonation // Create test table in minidfs.tmp schema for use in test queries test(String.format("CREATE TABLE %s.tmp.dfsRegion AS SELECT * FROM cp.`region.json`", MINIDFS_STORAGE_PLUGIN_NAME)); + + // generate a large enough file that the DFS will not fulfill requests to read a + // page of data all at once, see notes above testReadLargeParquetFileFromDFS() + test(String.format( + "CREATE TABLE %s.tmp.large_employee AS " + + "(SELECT employee_id, full_name FROM cp.`/employee.json`) " + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)" + + "UNION ALL (SELECT employee_id, full_name FROM cp.`/employee.json`)", + MINIDFS_STORAGE_PLUGIN_NAME)); + } + + /** + * When working on merging the Drill fork of parquet a bug was found that only manifested when + * run on a cluster. It appears that the local implementation of the Hadoop FileSystem API + * never fails to provide all of the bytes that are requested in a single read. The API is + * designed to allow for a subset of the requested bytes be returned, and a client can decide + * if they want to do processing on teh subset that are available now before requesting the rest. + * + * For parquet's block compression of page data, we need all of the bytes. This test is here as + * a sanitycheck to make sure we don't accidentally introduce an issue where a subset of the bytes + * are read and would otherwise require testing on a cluster for the full contract of the read method + * we are using to be exercised. + */ + @Test + public void testReadLargeParquetFileFromDFS() throws Exception { + test(String.format("USE %s", MINIDFS_STORAGE_PLUGIN_NAME)); + test("SELECT * FROM tmp.`large_employee`"); } @Test // DRILL-3037 http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 7c4ac1e..51d5d08 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.writer; import java.io.File; import java.io.FileWriter; -import java.io.IOException; import java.math.BigDecimal; import java.sql.Date; @@ -33,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.joda.time.DateTime; import org.joda.time.Period; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; @@ -47,6 +45,28 @@ public class TestParquetWriter extends BaseTestQuery { public TemporaryFolder folder = new TemporaryFolder(); static FileSystem fs; + private String allTypesSelection = + "cast( `int_col` AS int) `int_col`, " + + "cast( `bigint_col` AS bigint) `bigint_col`, " + + // TODO(DRILL-3367) +// "cast( `decimal9_col` AS decimal(9, 4)) `decimal9_col`, " + +// "cast( `decimal18_col` AS decimal(18,9)) `decimal18_col`, " + +// "cast( `decimal28sparse_col` AS decimal(28, 14)) `decimal28sparse_col`, " + +// "cast( `decimal38sparse_col` AS decimal(38, 19)) `decimal38sparse_col`, " + + "cast( `date_col` AS date) `date_col`, " + + "cast( `timestamp_col` AS timestamp) `timestamp_col`, " + + "cast( `float4_col` AS float) `float4_col`, " + + "cast( `float8_col` AS double) `float8_col`, " + + "cast( `varbinary_col` AS varbinary(65000)) `varbinary_col`, " + + // TODO(DRILL-2297) +// "cast( `intervalyear_col` AS interval year) `intervalyear_col`, " + + "cast( `intervalday_col` AS interval day) `intervalday_col`, " + + "cast( `bit_col` AS boolean) `bit_col`, " + + " `varchar_col` `varchar_col`, " + + "cast( `time_col` AS time) `time_col` "; + + private String allTypesTable = "cp.`/parquet/alltypes.json`"; + @BeforeClass public static void initFs() throws Exception { Configuration conf = new Configuration(); @@ -61,6 +81,12 @@ public class TestParquetWriter extends BaseTestQuery { test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY)); } + @Test + public void testSmallFileValueReadWrite() throws Exception { + String selection = "key"; + String inputTable = "cp.`/store/json/intData.json`"; + runTestAndValidate(selection, selection, inputTable, "smallFileTest"); + } @Test public void testSimple() throws Exception { @@ -103,6 +129,47 @@ public class TestParquetWriter extends BaseTestQuery { } @Test + public void testAllScalarTypes() throws Exception { + /// read once with the flat reader + runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); + + try { + // read all of the types with the complex reader + test(String.format("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER)); + runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); + } finally { + test(String.format("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER)); + } + } + + @Test + public void testAllScalarTypesDictionary() throws Exception { + try { + test(String.format("alter session set %s = true", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + /// read once with the flat reader + runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); + + // read all of the types with the complex reader + test(String.format("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER)); + runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); + } finally { + test(String.format("alter session set %s = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + } + } + + @Test + public void testDictionaryEncoding() throws Exception { + String selection = "type"; + String inputTable = "cp.`donuts.json`"; + try { + test(String.format("alter session set %s = true", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + runTestAndValidate(selection, selection, inputTable, "donuts_json"); + } finally { + test(String.format("alter session set %s = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + } + } + + @Test public void testComplex() throws Exception { String selection = "*"; String inputTable = "cp.`donuts.json`"; http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java deleted file mode 100644 index 004a8d0..0000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store; - -import io.netty.buffer.DrillBuf; - -import java.nio.ByteBuffer; -import java.util.Random; - -import org.apache.drill.common.DeferredException; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ExecTest; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.RootAllocatorFactory; -import org.apache.drill.exec.store.parquet.DirectCodecFactory; -import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput; -import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Test; - -import parquet.bytes.BytesInput; -import parquet.hadoop.CodecFactory.BytesCompressor; -import parquet.hadoop.metadata.CompressionCodecName; - -public class TestDirectCodecFactory extends ExecTest { - //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDirectCodecFactory.class); - private final DrillConfig drillConfig = DrillConfig.create(); - - private static enum Decompression { - ON_HEAP, OFF_HEAP, DRILLBUF - } - - private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) throws Exception { - DrillBuf rawBuf = null; - DrillBuf outBuf = null; - try (final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig); - final DirectCodecFactory codecFactory = new DirectCodecFactory(new Configuration(), allocator)) { - try { - rawBuf = allocator.buffer(size); - final byte[] rawArr = new byte[size]; - outBuf = allocator.buffer(size * 2); - final Random r = new Random(); - final byte[] random = new byte[1024]; - int pos = 0; - while (pos < size) { - r.nextBytes(random); - rawBuf.writeBytes(random); - System.arraycopy(random, 0, rawArr, pos, random.length); - pos += random.length; - } - - final BytesCompressor c = codecFactory.getCompressor(codec, 64 * 1024); - final DirectBytesDecompressor d = codecFactory.getDecompressor(codec); - - final BytesInput compressed; - if (useOnHeapCompression) { - compressed = c.compress(BytesInput.from(rawArr)); - } else { - compressed = c.compress(new ByteBufBytesInput(rawBuf)); - } - - switch (decomp) { - case DRILLBUF: { - final ByteBuffer buf = compressed.toByteBuffer(); - final DrillBuf b = allocator.buffer(buf.capacity()); - try { - b.writeBytes(buf); - d.decompress(b, (int) compressed.size(), outBuf, size); - for (int i = 0; i < size; i++) { - Assert.assertTrue("Data didn't match at " + i, outBuf.getByte(i) == rawBuf.getByte(i)); - } - } finally { - b.release(); - } - break; - } - - case OFF_HEAP: { - final ByteBuffer buf = compressed.toByteBuffer(); - final DrillBuf b = allocator.buffer(buf.capacity()); - try { - b.writeBytes(buf); - final BytesInput input = d.decompress(new ByteBufBytesInput(b), size); - Assert.assertArrayEquals(input.toByteArray(), rawArr); - } finally { - b.release(); - } - break; - } - case ON_HEAP: { - final byte[] buf = compressed.toByteArray(); - final BytesInput input = d.decompress(BytesInput.from(buf), size); - Assert.assertArrayEquals(input.toByteArray(), rawArr); - break; - } - } - } catch (Exception e) { - final String msg = String.format( - "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d", - codec.name(), - useOnHeapCompression, decomp.name(), size); - System.out.println(msg); - throw new RuntimeException(msg, e); - } finally { - if (rawBuf != null) { - rawBuf.release(); - } - if (outBuf != null) { - outBuf.release(); - } - } - } - } - - @Test - public void compressionCodecs() throws Exception { - final int[] sizes = { 4 * 1024, 1 * 1024 * 1024 }; - final boolean[] comp = { true, false }; - - try (final DeferredException ex = new DeferredException()) { - for (final int size : sizes) { - for (final boolean useOnHeapComp : comp) { - for (final Decompression decomp : Decompression.values()) { - for (final CompressionCodecName codec : CompressionCodecName.values()) { - if (codec == CompressionCodecName.LZO) { - // not installed as gpl. - continue; - } - try { - test(size, codec, useOnHeapComp, decomp); - } catch (Exception e) { - ex.addException(e); - } - } - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 14cfd8e..d5f7352 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -63,18 +63,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.parquet.hadoop.CodecFactory; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import parquet.bytes.BytesInput; -import parquet.column.page.DataPageV1; -import parquet.column.page.PageReadStore; -import parquet.column.page.PageReader; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.schema.MessageType; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; import com.google.common.base.Charsets; import com.google.common.base.Stopwatch; @@ -638,7 +639,8 @@ public class ParquetRecordReaderTest extends BaseTestQuery { final BufferAllocator allocator = RootAllocatorFactory.newRoot(c); for(int i = 0; i < 25; i++) { final ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs, - new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns); + CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0), + f.getParquetMetadata(), columns); final TestOutputMutator mutator = new TestOutputMutator(allocator); rr.setup(null, mutator); final Stopwatch watch = new Stopwatch(); http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java index 013ea95..593e0db 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java @@ -17,25 +17,26 @@ */ package org.apache.drill.exec.store.parquet; -import static parquet.column.Encoding.PLAIN; -import static parquet.column.Encoding.RLE; +import static org.apache.parquet.column.Encoding.PLAIN; +import static org.apache.parquet.column.Encoding.RLE; import java.util.HashMap; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.store.ByteArrayUtil; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.bytes.BytesInput; -import parquet.bytes.DirectByteBufferAllocator; -import parquet.column.ColumnDescriptor; -import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.schema.MessageType; -import parquet.schema.MessageTypeParser; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; public class TestFileGenerator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFileGenerator.class); @@ -185,16 +186,19 @@ public class TestFileGenerator { ColumnDescriptor c1 = schema.getColumnDescription(path1); w.startColumn(c1, props.recordsPerRowGroup, codec); - int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages); + final int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages); + final int PAGE_SIZE = 1024 * 1024; // 1 MB byte[] bytes; RunLengthBitPackingHybridValuesWriter defLevels = new RunLengthBitPackingHybridValuesWriter( - MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, - valsPerPage, - new DirectByteBufferAllocator()); + MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, + valsPerPage, + PAGE_SIZE, + new DirectByteBufferAllocator()); RunLengthBitPackingHybridValuesWriter repLevels = new RunLengthBitPackingHybridValuesWriter( - MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, - valsPerPage, - new DirectByteBufferAllocator()); + MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, + valsPerPage, + PAGE_SIZE, + new DirectByteBufferAllocator()); // for variable length binary fields int bytesNeededToEncodeLength = 4; if ((int) fieldInfo.bitLength > 0) { http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/java-exec/src/test/resources/store/json/donuts_short.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/store/json/donuts_short.json b/exec/java-exec/src/test/resources/store/json/donuts_short.json new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java index d119ec8..286ebf4 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/NonClosableConnection.java @@ -17,6 +17,8 @@ */ package org.apache.drill.jdbc; +import com.google.common.base.Preconditions; + import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -36,8 +38,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; -import parquet.Preconditions; - /** * A connection decorator that ignores {@link Connection#close} calls. * http://git-wip-us.apache.org/repos/asf/drill/blob/39582bd6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 93b710c..05d8631 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ 4.11 1.7.6 2 - 1.6.0rc3-drill-r0.3 + 1.8.1-drill-r0 1.1.9-drill-r7