Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 81948200C0F for ; Thu, 19 Jan 2017 02:27:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7FFAE160B68; Thu, 19 Jan 2017 01:27:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A6128160B5B for ; Thu, 19 Jan 2017 02:27:16 +0100 (CET) Received: (qmail 54045 invoked by uid 500); 19 Jan 2017 01:27:15 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 52978 invoked by uid 99); 19 Jan 2017 01:27:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 01:27:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 57848F4039; Thu, 19 Jan 2017 01:27:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Date: Thu, 19 Jan 2017 01:27:53 -0000 Message-Id: In-Reply-To: <25406da3dfe343a9a44d6bc62fd223d9@git.apache.org> References: <25406da3dfe343a9a44d6bc62fd223d9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [42/50] [abbrv] parquet-mr git commit: PARQUET-751: Add setRequestedSchema to ParquetFileReader. archived-at: Thu, 19 Jan 2017 01:27:17 -0000 PARQUET-751: Add setRequestedSchema to ParquetFileReader. This fixes a bug introduced by dictionary filters, which reused an existing file reader to avoid opening multiple input streams. Before that commit, a new file reader was opened and passed the projection columns from the read context. The fix is to set the requested schema on the file reader instead of creating a new instance. This also adds a test to ensure that column projection works to catch bugs like this in the future. Author: Ryan Blue Closes #379 from rdblue/PARQUET-751-fix-column-projection and squashes the following commits: 7ea0c16 [Ryan Blue] PARQUET-751: Fix column projection test. 1da507e [Ryan Blue] PARQUET-751: Add setRequestedSchema to ParquetFileReader. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6294fc0a Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6294fc0a Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6294fc0a Branch: refs/heads/parquet-1.8.x Commit: 6294fc0ae8c60bccdaf984af0a222de0d45c9575 Parents: cec2570 Author: Ryan Blue Authored: Tue Oct 18 17:45:32 2016 -0700 Committer: Ryan Blue Committed: Mon Jan 9 16:54:54 2017 -0800 ---------------------------------------------------------------------- .../hadoop/InternalParquetRecordReader.java | 1 + .../parquet/hadoop/ParquetFileReader.java | 8 + .../hadoop/TestInputFormatColumnProjection.java | 180 +++++++++++++++++++ 3 files changed, 189 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6294fc0a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index d43fd7d..85b6691 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -179,6 +179,7 @@ class InternalParquetRecordReader { this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total); this.filterRecords = configuration.getBoolean( RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT); + reader.setRequestedSchema(requestedSchema); LOG.info("RecordReader initialized will read a total of " + total + " records."); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6294fc0a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index ebc8ca2..9f8eb07 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -92,6 +92,7 @@ import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; /** @@ -656,6 +657,13 @@ public class ParquetFileReader implements Closeable { return blocks; } + public void setRequestedSchema(MessageType projection) { + paths.clear(); + for (ColumnDescriptor col : projection.getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + } + public void appendTo(ParquetFileWriter writer) throws IOException { writer.appendRowGroups(f, blocks, true); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6294fc0a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java new file mode 100644 index 0000000..a6d2732 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleInputFormat; +import org.apache.parquet.hadoop.example.ExampleOutputFormat; +import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.UUID; + +import static java.lang.Thread.sleep; +import static org.apache.parquet.schema.OriginalType.UTF8; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; + +public class TestInputFormatColumnProjection { + public static final String FILE_CONTENT = "" + + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," + + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," + + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + public static MessageType PARQUET_TYPE = Types.buildMessage() + .required(BINARY).as(UTF8).named("uuid") + .required(BINARY).as(UTF8).named("char") + .named("FormatTestObject"); + + public static class Writer extends Mapper { + public static final SimpleGroupFactory GROUP_FACTORY = new SimpleGroupFactory(PARQUET_TYPE); + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + // writes each character of the line with a UUID + String line = value.toString(); + for (int i = 0; i < line.length(); i += 1) { + Group group = GROUP_FACTORY.newGroup(); + group.add(0, Binary.fromString(UUID.randomUUID().toString())); + group.add(1, Binary.fromString(line.substring(i, i+1))); + context.write(null, group); + } + } + } + + public static class Reader extends Mapper { + + public static Counter bytesReadCounter = null; + public static void setBytesReadCounter(Counter bytesRead) { + bytesReadCounter = bytesRead; + } + + @Override + protected void map(Void key, Group value, Context context) + throws IOException, InterruptedException { + // Do nothing. The test uses Hadoop FS counters for verification. + setBytesReadCounter(ContextUtil.getCounter( + context, "parquet", "bytesread")); + } + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testProjectionSize() throws Exception { + Assume.assumeTrue( // only run this test for Hadoop 2 + org.apache.hadoop.mapreduce.JobContext.class.isInterface()); + + File inputFile = temp.newFile(); + FileOutputStream out = new FileOutputStream(inputFile); + out.write(FILE_CONTENT.getBytes("UTF-8")); + out.close(); + + File tempFolder = temp.newFolder(); + tempFolder.delete(); + Path tempPath = new Path(tempFolder.toURI()); + + File outputFolder = temp.newFile(); + outputFolder.delete(); + + Configuration conf = new Configuration(); + // set the projection schema + conf.set("parquet.read.schema", Types.buildMessage() + .required(BINARY).as(UTF8).named("char") + .named("FormatTestObject").toString()); + + // disable summary metadata, it isn't needed + conf.set("parquet.enable.summary-metadata", "false"); + conf.set("parquet.example.schema", PARQUET_TYPE.toString()); + + { + Job writeJob = new Job(conf, "write"); + writeJob.setInputFormatClass(TextInputFormat.class); + TextInputFormat.addInputPath(writeJob, new Path(inputFile.toString())); + + writeJob.setOutputFormatClass(ExampleOutputFormat.class); + writeJob.setMapperClass(Writer.class); + writeJob.setNumReduceTasks(0); // write directly to Parquet without reduce + ParquetOutputFormat.setBlockSize(writeJob, 10240); + ParquetOutputFormat.setPageSize(writeJob, 512); + ParquetOutputFormat.setDictionaryPageSize(writeJob, 1024); + ParquetOutputFormat.setEnableDictionary(writeJob, true); + ParquetOutputFormat.setMaxPaddingSize(writeJob, 1023); // always pad + ParquetOutputFormat.setOutputPath(writeJob, tempPath); + + waitForJob(writeJob); + } + + long bytesWritten = 0; + FileSystem fs = FileSystem.getLocal(conf); + for (FileStatus file : fs.listStatus(tempPath)) { + bytesWritten += file.getLen(); + } + + long bytesRead; + { + Job readJob = new Job(conf, "read"); + readJob.setInputFormatClass(ExampleInputFormat.class); + TextInputFormat.addInputPath(readJob, tempPath); + + readJob.setOutputFormatClass(TextOutputFormat.class); + readJob.setMapperClass(Reader.class); + readJob.setNumReduceTasks(0); // no reduce phase + TextOutputFormat.setOutputPath(readJob, new Path(outputFolder.toString())); + + waitForJob(readJob); + + bytesRead = Reader.bytesReadCounter.getValue(); + } + + Assert.assertTrue("Should read less than 10% of the input file size", + bytesRead < (bytesWritten / 10)); + } + + private void waitForJob(Job job) throws Exception { + job.submit(); + while (!job.isComplete()) { + sleep(100); + } + if (!job.isSuccessful()) { + throw new RuntimeException("job failed " + job.getJobName()); + } + } +}