Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CBFB4200D0F for ; Fri, 15 Sep 2017 02:43:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CAA5C1609CE; Fri, 15 Sep 2017 00:43:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C13021609CD for ; Fri, 15 Sep 2017 02:43:26 +0200 (CEST) Received: (qmail 69631 invoked by uid 500); 15 Sep 2017 00:43:25 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 69619 invoked by uid 99); 15 Sep 2017 00:43:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Sep 2017 00:43:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C62CFF566E; Fri, 15 Sep 2017 00:43:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xuf@apache.org To: commits@hive.apache.org Message-Id: <3a07ecf2ae814f36a28049a9dbd74d5f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: Revert "HIVE-17261: Hive use deprecated ParquetInputSplit constructor which blocked parquet dictionary filter (Junjie Chen, reviewed by Ferdinand Xu)" Date: Fri, 15 Sep 2017 00:43:25 +0000 (UTC) archived-at: Fri, 15 Sep 2017 00:43:28 -0000 Repository: hive Updated Branches: refs/heads/master b0b6db730 -> 9329cd909 Revert "HIVE-17261: Hive use deprecated ParquetInputSplit constructor which blocked parquet dictionary filter (Junjie Chen, reviewed by Ferdinand Xu)" This reverts commit 1c84e0c043d68f7ce2e1dd80e1e54ca8d615e7ab. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9329cd90 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9329cd90 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9329cd90 Branch: refs/heads/master Commit: 9329cd90959d05eb4ac7e580f599ba17ad545760 Parents: b0b6db7 Author: Ferdinand Xu Authored: Fri Sep 15 08:41:37 2017 +0800 Committer: Ferdinand Xu Committed: Fri Sep 15 08:41:37 2017 +0800 ---------------------------------------------------------------------- .../ql/io/parquet/ParquetRecordReaderBase.java | 125 ++++++++++++------- .../read/ParquetRecordReaderWrapper.java | 5 - .../io/parquet/TestParquetRowGroupFilter.java | 62 ++------- 3 files changed, 94 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9329cd90/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 6b3859a..167f9b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -23,9 +23,9 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.parquet.Preconditions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.ParquetInputSplit; @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; public class ParquetRecordReaderBase { @@ -52,6 +53,7 @@ public class ParquetRecordReaderBase { protected JobConf jobConf; protected int schemaSize; + protected List filtedBlocks; protected ParquetFileReader reader; /** @@ -62,57 +64,86 @@ public class ParquetRecordReaderBase { * @return a ParquetInputSplit corresponding to the oldSplit * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file */ + @SuppressWarnings("deprecation") protected ParquetInputSplit getSplit( final org.apache.hadoop.mapred.InputSplit oldSplit, final JobConf conf ) throws IOException { - Preconditions.checkArgument((oldSplit instanceof FileSplit), "Unknown split type:" + oldSplit); - final Path finalPath = ((FileSplit) oldSplit).getPath(); - jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); - - // TODO enable MetadataFilter - final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, - finalPath, ParquetMetadataConverter.NO_FILTER); - final List blocks = parquetMetadata.getBlocks(); - final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); - - final ReadSupport.ReadContext - readContext = new DataWritableReadSupport().init(new InitContext(jobConf, - null, fileMetaData.getSchema())); - - // Compute stats - for (BlockMetaData bmd : blocks) { - serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); - serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); - } - - schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() - .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); - - final long splitStart = ((FileSplit) oldSplit).getStart(); - final long splitLength = ((FileSplit) oldSplit).getLength(); - - setFilter(jobConf, fileMetaData.getSchema()); - - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + ParquetInputSplit split; + if (oldSplit instanceof FileSplit) { + final Path finalPath = ((FileSplit) oldSplit).getPath(); + jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); + + // TODO enable MetadataFilter by using readFooter(Configuration configuration, Path file, + // MetadataFilter filter) API + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); + final List blocks = parquetMetadata.getBlocks(); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + + final ReadSupport.ReadContext + readContext = new DataWritableReadSupport().init(new InitContext(jobConf, + null, fileMetaData.getSchema())); + + // Compute stats + for (BlockMetaData bmd : blocks) { + serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); + serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); + } + + schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() + .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); + final List splitGroup = new ArrayList(); + final long splitStart = ((FileSplit) oldSplit).getStart(); + final long splitLength = ((FileSplit) oldSplit).getLength(); + for (final BlockMetaData block : blocks) { + final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { + splitGroup.add(block); + } + } + if (splitGroup.isEmpty()) { + LOG.warn("Skipping split, could not find row group in: " + oldSplit); + return null; + } + + FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); + if (filter != null) { + filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filtedBlocks.isEmpty()) { + LOG.debug("All row groups are dropped due to filter predicates"); + return null; + } + + long droppedBlocks = splitGroup.size() - filtedBlocks.size(); + if (droppedBlocks > 0) { + LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); + } + } else { + filtedBlocks = splitGroup; + } + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + } + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + oldSplit.getLocations(), + filtedBlocks, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + return split; + } else { + throw new IllegalArgumentException("Unknown split type: " + oldSplit); } - - // rowGroupOffsets need to be set to null otherwise filter will not be applied - // in ParquetRecordReader#initializeInternalReader - return new ParquetInputSplit(finalPath, - splitStart, - splitStart + splitLength, - splitLength, - oldSplit.getLocations(), - null); - } - private void setFilter(final JobConf conf, MessageType schema) { + public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); if (sarg == null) { - return; + return null; } // Create the Parquet FilterPredicate without including columns that do not exist @@ -122,12 +153,18 @@ public class ParquetRecordReaderBase { // Filter may have sensitive information. Do not send to debug. LOG.debug("PARQUET predicate push down generated."); ParquetInputFormat.setFilterPredicate(conf, p); + return FilterCompat.get(p); } else { // Filter may have sensitive information. Do not send to debug. LOG.debug("No PARQUET predicate push down is generated."); + return null; } } + public List getFiltedBlocks() { + return filtedBlocks; + } + public SerDeStats getStats() { return serDeStats; } http://git-wip-us.apache.org/repos/asf/hive/blob/9329cd90/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index 4ea8978..ac430a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -88,11 +88,6 @@ public class ParquetRecordReaderWrapper extends ParquetRecordReaderBase HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION, skipTimestampConversion); } - if (jobConf.get(ParquetInputFormat.FILTER_PREDICATE) != null) { - conf.set(ParquetInputFormat.FILTER_PREDICATE, - jobConf.get(ParquetInputFormat.FILTER_PREDICATE)); - } - final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); if (split != null) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/9329cd90/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java index c0bfc3f..bf363f3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -24,11 +24,8 @@ import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; -import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; -import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -42,15 +39,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.hadoop.ParquetInputFormat; -import org.apache.parquet.hadoop.ParquetInputSplit; -import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; @@ -103,14 +93,6 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { } }); - FileSplit testSplit = new FileSplit(testPath, 0, fileLength(testPath), (String[]) null); - ParquetInputSplit parquetSplit = new ParquetInputSplit(testPath, - testSplit.getStart(), - testSplit.getStart() + testSplit.getLength(), - testSplit.getLength(), - testSplit.getLocations(), - null); - // > 50 GenericUDF udf = new GenericUDFOPGreaterThan(); List children = Lists.newArrayList(); @@ -121,22 +103,12 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); String searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); - SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); - FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); - TaskAttemptID taskAttemptID = new org.apache.hadoop.mapred.TaskAttemptID(); - TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); - ParquetInputFormat inputFormat = new ParquetInputFormat<>(DataWritableReadSupport.class); - org.apache.hadoop.mapreduce.RecordReader recordReader = - inputFormat.createRecordReader(parquetSplit, taskContext); - ParquetInputFormat.setFilterPredicate(conf, p); - - try { - recordReader.initialize(parquetSplit, taskContext); - boolean hasValue = recordReader.nextKeyValue(); - Assert.assertTrue("Row groups should not be filtered.", hasValue); - } finally { - recordReader.close(); - } + + ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFiltedBlocks().size()); // > 100 constantDesc = new ExprNodeConstantDesc(100); @@ -144,20 +116,12 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); - sarg = ConvertAstToSearchArg.createFromConf(conf); - p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); - taskAttemptID = new org.apache.hadoop.mapred.TaskAttemptID(); - taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); - recordReader = inputFormat.createRecordReader(parquetSplit, taskContext); - ParquetInputFormat.setFilterPredicate(conf, p); - - try { - recordReader.initialize(parquetSplit, taskContext); - boolean hasValue = recordReader.nextKeyValue(); - Assert.assertFalse("Row groups should be filtered.", hasValue); - } finally { - recordReader.close(); - } + + recordReader = (ParquetRecordReaderWrapper) + new MapredParquetInputFormat().getRecordReader( + new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); + + Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); } private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) {