Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC949200B61 for ; Tue, 9 Aug 2016 21:44:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AB235160AA5; Tue, 9 Aug 2016 19:44:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0BE3A160A6B for ; Tue, 9 Aug 2016 21:44:03 +0200 (CEST) Received: (qmail 48111 invoked by uid 500); 9 Aug 2016 19:44:03 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 48100 invoked by uid 99); 9 Aug 2016 19:44:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 19:44:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EBBF4DFCC0; Tue, 9 Aug 2016 19:44:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hashutosh@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-7239 Fix bug in HiveIndexedInputFormat implementation that causes incorrect query result when input backed by Sequence/RC files (Illya Yalovyy via Ashutosh Chauhan) Date: Tue, 9 Aug 2016 19:44:02 +0000 (UTC) archived-at: Tue, 09 Aug 2016 19:44:05 -0000 Repository: hive Updated Branches: refs/heads/master ca807c63c -> 2ef3ab855 HIVE-7239 Fix bug in HiveIndexedInputFormat implementation that causes incorrect query result when input backed by Sequence/RC files (Illya Yalovyy via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2ef3ab85 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2ef3ab85 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2ef3ab85 Branch: refs/heads/master Commit: 2ef3ab855c725829cabf2c93b9fc9e31553ca643 Parents: ca807c6 Author: Illya Yalovyy Authored: Tue Jul 26 22:09:00 2016 -0800 Committer: Ashutosh Chauhan Committed: Tue Aug 9 12:43:39 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/index/HiveIndexResult.java | 3 +- .../hive/ql/index/HiveIndexedInputFormat.java | 43 +-- .../hadoop/hive/ql/index/IndexResult.java | 25 ++ .../hadoop/hive/ql/index/SplitFilter.java | 125 ++++++++ .../hive/ql/index/MockHiveInputSplits.java | 37 +++ .../hadoop/hive/ql/index/MockIndexResult.java | 38 +++ .../hadoop/hive/ql/index/MockInputFile.java | 130 ++++++++ .../hive/ql/index/SplitFilterTestCase.java | 153 ++++++++++ .../ql/index/TestHiveInputSplitComparator.java | 64 ++++ .../hadoop/hive/ql/index/TestSplitFilter.java | 296 +++++++++++++++++++ 10 files changed, 874 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java index 33cc5c3..586cd68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java @@ -45,7 +45,7 @@ import org.apache.hadoop.mapred.LineRecordReader.LineReader; * HiveIndexResult parses the input stream from an index query * to generate a list of file splits to query. */ -public class HiveIndexResult { +public class HiveIndexResult implements IndexResult { public static final Logger l4j = LoggerFactory.getLogger(HiveIndexResult.class.getSimpleName()); @@ -182,6 +182,7 @@ public class HiveIndexResult { bucket.getOffsets().add(Long.parseLong(one_offset)); } + @Override public boolean contains(FileSplit split) throws HiveException { if (buckets == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java index 5247ece..0e6ec84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Iterator; import java.util.Set; import java.util.Arrays; +import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,6 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; @@ -144,49 +144,14 @@ public class HiveIndexedInputFormat extends HiveInputFormat { HiveInputSplit[] splits = (HiveInputSplit[]) this.doGetSplits(job, numSplits); - ArrayList newSplits = new ArrayList( - numSplits); - long maxInputSize = HiveConf.getLongVar(job, ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE); if (maxInputSize < 0) { maxInputSize=Long.MAX_VALUE; } - long sumSplitLengths = 0; - for (HiveInputSplit split : splits) { - l4j.info("split start : " + split.getStart()); - l4j.info("split end : " + (split.getStart() + split.getLength())); + SplitFilter filter = new SplitFilter(hiveIndexResult, maxInputSize); + Collection newSplits = filter.filter(splits); - try { - if (hiveIndexResult.contains(split)) { - // we may miss a sync here - HiveInputSplit newSplit = split; - if (split.inputFormatClassName().contains("RCFile") - || split.inputFormatClassName().contains("SequenceFile")) { - if (split.getStart() > SequenceFile.SYNC_INTERVAL) { - newSplit = new HiveInputSplit(new FileSplit(split.getPath(), - split.getStart() - SequenceFile.SYNC_INTERVAL, - split.getLength() + SequenceFile.SYNC_INTERVAL, - split.getLocations()), - split.inputFormatClassName()); - } - } - sumSplitLengths += newSplit.getLength(); - if (sumSplitLengths > maxInputSize) { - throw new IOException( - "Size of data to read during a compact-index-based query exceeded the maximum of " - + maxInputSize + " set in " + ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE.varname); - } - newSplits.add(newSplit); - } - } catch (HiveException e) { - throw new RuntimeException( - "Unable to get metadata for input table split" + split.getPath(), e); - } - } - InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()])); - l4j.info("Number of input splits: " + splits.length + " new input splits: " - + retA.length + ", sum of split lengths: " + sumSplitLengths); - return retA; + return newSplits.toArray(new FileSplit[newSplits.size()]); } } http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/java/org/apache/hadoop/hive/ql/index/IndexResult.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/IndexResult.java b/ql/src/java/org/apache/hadoop/hive/ql/index/IndexResult.java new file mode 100644 index 0000000..44e4f76 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/IndexResult.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.mapred.FileSplit; + +public interface IndexResult { + boolean contains(FileSplit split) throws HiveException; +} http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/java/org/apache/hadoop/hive/ql/index/SplitFilter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/SplitFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/index/SplitFilter.java new file mode 100644 index 0000000..8b339ec --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/SplitFilter.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapred.FileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class SplitFilter { + public static final Logger LOG = LoggerFactory.getLogger(SplitFilter.class); + + private final IndexResult indexResult; + private final long maxInputSize; + + public SplitFilter(IndexResult indexResult, long maxInputSize) { + this.indexResult = indexResult; + this.maxInputSize = maxInputSize; + } + + public List filter(HiveInputSplit[] splits) throws IOException { + long sumSplitLengths = 0; + List newSplits = new ArrayList<>(); + + Arrays.sort(splits, new HiveInputSplitComparator()); + + for (HiveInputSplit split : splits) { + LOG.info("split start : " + split.getStart()); + LOG.info("split end : " + (split.getStart() + split.getLength())); + + try { + if (indexResult.contains(split)) { + HiveInputSplit newSplit = split; + if (isAdjustmentRequired(newSplits, split)) { + newSplit = adjustSplit(split); + } + sumSplitLengths += newSplit.getLength(); + if (sumSplitLengths > maxInputSize) { + String messageTemplate = "Size of data to read during a compact-index-based query " + + "exceeded the maximum of %d set in %s"; + throw new IOException(String.format(messageTemplate, maxInputSize, + HiveConf.ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE.varname)); + } + newSplits.add(newSplit); + } + } catch (HiveException e) { + throw new RuntimeException("Unable to get metadata for input table split " + + split.getPath(), e); + } + } + LOG.info("Number of input splits: {}, new input splits: {}, sum of split lengths: {}", + splits.length, newSplits.size(), sumSplitLengths); + return newSplits; + } + + private boolean isAdjustmentRequired(List newSplits, HiveInputSplit split) { + return (split.inputFormatClassName().contains("RCFile") || + split.inputFormatClassName().contains("SequenceFile")) && split.getStart() > 0 && + !doesOverlap(newSplits, split.getPath(), adjustStart(split.getStart())); + } + + private boolean doesOverlap(List newSplits, Path path, long start) { + if (newSplits.isEmpty()) { + return false; + } + HiveInputSplit lastSplit = Iterables.getLast(newSplits); + if (lastSplit.getPath().equals(path)) { + return lastSplit.getStart() + lastSplit.getLength() > start; + } + return false; + } + + private long adjustStart(long start) { + return start > SequenceFile.SYNC_INTERVAL ? start - SequenceFile.SYNC_INTERVAL : 0; + } + + private HiveInputSplit adjustSplit(HiveInputSplit split) throws IOException { + long adjustedStart = adjustStart(split.getStart()); + return new HiveInputSplit(new FileSplit(split.getPath(), adjustedStart, + split.getStart() - adjustedStart + split.getLength(), split.getLocations()), + split.inputFormatClassName()); + } + + @VisibleForTesting + static final class HiveInputSplitComparator implements Comparator { + @Override + public int compare(HiveInputSplit o1, HiveInputSplit o2) { + int pathCompare = comparePath(o1.getPath(), o2.getPath()); + if (pathCompare != 0) { + return pathCompare; + } + return Long.compare(o1.getStart(), o2.getStart()); + } + + private int comparePath(Path p1, Path p2) { + return p1.compareTo(p2); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/MockHiveInputSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/MockHiveInputSplits.java b/ql/src/test/org/apache/hadoop/hive/ql/index/MockHiveInputSplits.java new file mode 100644 index 0000000..7815ed0 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/index/MockHiveInputSplits.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.SequenceFileInputFormat; + +public final class MockHiveInputSplits { + private static final String[] HOSTS = {}; + private static final String INPUT_FORMAT_CLASS_NAME = SequenceFileInputFormat.class.getCanonicalName(); + + private MockHiveInputSplits() { + } + + public static HiveInputSplit createMockSplit(String pathString, long start, long length) { + InputSplit inputSplit = new FileSplit(new Path(pathString), start, length, HOSTS); + return new HiveInputSplit(inputSplit, INPUT_FORMAT_CLASS_NAME); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/MockIndexResult.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/MockIndexResult.java b/ql/src/test/org/apache/hadoop/hive/ql/index/MockIndexResult.java new file mode 100644 index 0000000..95d069c --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/index/MockIndexResult.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import com.google.common.collect.ImmutableSet; +import java.util.Collection; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.mapred.FileSplit; + +public final class MockIndexResult implements IndexResult { + + private final ImmutableSet selectedSplits; + + public MockIndexResult(Collection selectedSplits) { + this.selectedSplits = ImmutableSet.copyOf(selectedSplits); + } + + @Override + public boolean contains(FileSplit split) throws HiveException { + return selectedSplits.contains(split); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/MockInputFile.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/MockInputFile.java b/ql/src/test/org/apache/hadoop/hive/ql/index/MockInputFile.java new file mode 100644 index 0000000..4619b6c --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/index/MockInputFile.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; + +public final class MockInputFile { + private final String path; + private final ImmutableList splits; + private final ImmutableList selectedSplits; + + private MockInputFile(String path, List splits, + List selectedSplits) { + this.path = path; + this.splits = ImmutableList.copyOf(splits); + this.selectedSplits = ImmutableList.copyOf(selectedSplits); + } + + public String getPath() { + return path; + } + + public List getSplits() { + return splits; + } + + public List getSelectedSplits() { + return selectedSplits; + } + + public static PathStep builder() { + return new MockInputFileBuilder(); + } + + public static interface PathStep { + DefaultSplitLengthStep path(String path); + } + + public static interface DefaultSplitLengthStep extends SplitStep { + SplitStep defaultSplitLength(long defaultSplitLength); + } + + public static interface SplitStep { + SplitStep split(); + SplitStep selectedSplit(); + LastSplitStep split(long lastSplitSize); + LastSplitStep selectedSplit(long lastSplitSize); + MockInputFile build(); + } + + public static interface LastSplitStep { + MockInputFile build(); + } + + private static final class MockInputFileBuilder implements PathStep, SplitStep, LastSplitStep, + DefaultSplitLengthStep { + + private String path; + private long defaultSplitSize = SplitFilterTestCase.DEFAULT_SPLIT_SIZE;; + private final List splits = new ArrayList<>(); + private final List selectedSplits = new ArrayList<>(); + private long position = 0; + + @Override + public DefaultSplitLengthStep path(String path) { + this.path = path; + return this; + } + + @Override + public SplitStep split() { + nextSplit(defaultSplitSize); + return this; + } + + @Override + public LastSplitStep split(long lastSplitSize) { + nextSplit(lastSplitSize); + return this; + } + + @Override + public SplitStep selectedSplit() { + selectedSplits.add(nextSplit(defaultSplitSize)); + return this; + } + + @Override + public LastSplitStep selectedSplit(long lastSplitSize) { + selectedSplits.add(nextSplit(lastSplitSize)); + return this; + } + + @Override + public SplitStep defaultSplitLength(long defaultSplitLength) { + this.defaultSplitSize = defaultSplitLength; + return this; + } + + private HiveInputSplit nextSplit(long splitSize) { + HiveInputSplit split = MockHiveInputSplits.createMockSplit(path, position, splitSize); + position += splitSize; + splits.add(split); + return split; + } + + @Override + public MockInputFile build() { + return new MockInputFile(path, splits, selectedSplits); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/SplitFilterTestCase.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/SplitFilterTestCase.java b/ql/src/test/org/apache/hadoop/hive/ql/index/SplitFilterTestCase.java new file mode 100644 index 0000000..394dc74 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/index/SplitFilterTestCase.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public final class SplitFilterTestCase { + public static final long DEFAULT_SPLIT_SIZE = 1024 * 1024; + public static final long SMALL_SPLIT_SIZE = 500; + + private final Set allSplits; + private final Set selectedSplits; + private final Set expectedSplits; + private final long maxInputSize; + + private SplitFilterTestCase(Iterable allSplits, + Iterable selectedSplits, Iterable expectedSplits, + long maxInputSize) { + + this.allSplits = ImmutableSet.copyOf(allSplits); + this.selectedSplits = ImmutableSet.copyOf(selectedSplits); + this.expectedSplits = ImmutableSet.copyOf(expectedSplits); + this.maxInputSize = maxInputSize; + } + + private HiveInputSplit[] toArray(Collection splits) { + return splits.toArray(new HiveInputSplit[splits.size()]); + } + + public void executeAndValidate() throws IOException { + SplitFilter filter = new SplitFilter(new MockIndexResult(selectedSplits), maxInputSize); + List actualSplits = filter.filter(toArray(allSplits)); + assertSplits(expectedSplits, actualSplits); + } + + private void assertSplits(Collection expectedSplits, + Collection actualSplits) { + SplitFilter.HiveInputSplitComparator hiveInputSplitComparator = + new SplitFilter.HiveInputSplitComparator(); + + List sortedExpectedSplits = new ArrayList<>(expectedSplits); + Collections.sort(sortedExpectedSplits, hiveInputSplitComparator); + + List sortedActualSplits = new ArrayList<>(actualSplits); + Collections.sort(sortedActualSplits, hiveInputSplitComparator); + + assertEquals("Number of selected splits.", sortedExpectedSplits.size(), + sortedActualSplits.size()); + + for (int i = 0; i < sortedExpectedSplits.size(); i++) { + HiveInputSplit expectedSplit = sortedExpectedSplits.get(i); + HiveInputSplit actualSplit = sortedActualSplits.get(i); + + String splitName = "Split #" + i; + + assertEquals(splitName + " path.", expectedSplit.getPath(), actualSplit.getPath()); + assertEquals(splitName + " start.", expectedSplit.getStart(), actualSplit.getStart()); + assertEquals(splitName + " length.", expectedSplit.getLength(), actualSplit.getLength()); + } + } + + public static MaxInputSizeStep builder() { + return new SplitFilterTestCaseBuilder(); + } + + public static interface MaxInputSizeStep extends InputFilesStep { + InputFilesStep maxInputSize(long maxInputSize); + } + + public static interface InputFilesStep { + ExpectedSplitsStep inputFiles(MockInputFile... inputFiles); + } + + public static interface ExpectedSplitsStep { + BuildStep expectedSplits(HiveInputSplit... expectedSplits); + } + + public static interface BuildStep { + SplitFilterTestCase build(); + } + + private static final class SplitFilterTestCaseBuilder implements MaxInputSizeStep, InputFilesStep, + ExpectedSplitsStep, BuildStep { + + private long maxInputSize = Long.MAX_VALUE; + private List inputFiles; + private List expectedSplits; + + @Override + public InputFilesStep maxInputSize(long maxInputSize) { + this.maxInputSize = maxInputSize; + return this; + } + + @Override + public ExpectedSplitsStep inputFiles(MockInputFile... inputFiles) { + this.inputFiles = Arrays.asList(inputFiles); + return this; + } + + @Override + public BuildStep expectedSplits(HiveInputSplit... expectedSplits) { + this.expectedSplits = Arrays.asList(expectedSplits); + return this; + } + + @Override + public SplitFilterTestCase build() { + List allSplits = new ArrayList<>(); + List selectedSplits = new ArrayList<>(); + Set seenPaths = new HashSet(); + + for (MockInputFile inputFile : inputFiles) { + if (seenPaths.add(inputFile.getPath())) { + allSplits.addAll(inputFile.getSplits()); + selectedSplits.addAll(inputFile.getSelectedSplits()); + } else { + fail(String.format("Cannot add 2 input files with the same path to a test case. " + + "The duplicated path is '%s'.", inputFile.getPath())); + } + } + + return new SplitFilterTestCase(allSplits, selectedSplits, expectedSplits, maxInputSize); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/TestHiveInputSplitComparator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/TestHiveInputSplitComparator.java b/ql/src/test/org/apache/hadoop/hive/ql/index/TestHiveInputSplitComparator.java new file mode 100644 index 0000000..ec8eb3a --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/index/TestHiveInputSplitComparator.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import java.util.Arrays; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.hive.ql.index.SplitFilter.HiveInputSplitComparator; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import static org.apache.hadoop.hive.ql.index.MockHiveInputSplits.createMockSplit; +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class TestHiveInputSplitComparator { + + @Parameter(0) + public HiveInputSplit split1; + @Parameter(1) + public HiveInputSplit split2; + @Parameter(2) + public Integer expected; + + @Parameters(name = "{index}: {0}<=>{1} ") + public static Iterable data() { + return Arrays.asList(new Object[][]{ + {createMockSplit("A", 0, 100), createMockSplit("A", 1000, 100), -1}, + {createMockSplit("A", 1000, 100), createMockSplit("A", 100, 100), 1}, + {createMockSplit("A", 0, 100), createMockSplit("A", 0, 100), 0}, + {createMockSplit("A", 0, 100), createMockSplit("B", 0, 100), -1}, + {createMockSplit("A", 100, 100), createMockSplit("B", 0, 100), -1}, + {createMockSplit("A", 100, 100), createMockSplit("B", 0, 100), -1}} + ); + } + + @Test + public void testCompare() { + HiveInputSplitComparator cmp = new HiveInputSplitComparator(); + int actual = cmp.compare(split1, split2); + assertCompareResult(expected, actual); + } + + private void assertCompareResult(int expected, int actual) { + assertEquals(expected, (int) Math.signum(actual)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2ef3ab85/ql/src/test/org/apache/hadoop/hive/ql/index/TestSplitFilter.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/index/TestSplitFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/index/TestSplitFilter.java new file mode 100644 index 0000000..f2e2c8b --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/index/TestSplitFilter.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.index; + +import java.io.IOException; +import org.junit.Test; + +import static org.apache.hadoop.hive.ql.index.MockHiveInputSplits.createMockSplit; +import static org.apache.hadoop.io.SequenceFile.SYNC_INTERVAL; +import static org.apache.hadoop.hive.ql.index.SplitFilterTestCase.DEFAULT_SPLIT_SIZE; +import static org.apache.hadoop.hive.ql.index.SplitFilterTestCase.SMALL_SPLIT_SIZE; + +public class TestSplitFilter { + private SplitFilterTestCase testCase; + + @Test + public void testOneSelectedSplitsInMiddle() throws Exception { + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .split() + .selectedSplit() + .split() + .build() + ) + .expectedSplits( + createMockSplit("A", DEFAULT_SPLIT_SIZE - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testSelectedFirstSplit() throws Exception { + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .selectedSplit() + .split() + .split() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, DEFAULT_SPLIT_SIZE) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testSelectedLastSplit() throws Exception { + int lastSplitSize = 1234; + + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .split() + .selectedSplit(lastSplitSize) + .build() + ) + .expectedSplits( + createMockSplit("A", DEFAULT_SPLIT_SIZE - SYNC_INTERVAL, lastSplitSize + SYNC_INTERVAL) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testSelectedTwoAdjacentSplits() throws Exception { + + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .selectedSplit() + .selectedSplit() + .split() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, DEFAULT_SPLIT_SIZE), + createMockSplit("A", DEFAULT_SPLIT_SIZE, DEFAULT_SPLIT_SIZE) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testSelectedThreeAdjacentSplits() throws Exception { + + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .selectedSplit() + .selectedSplit() + .selectedSplit() + .split() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, DEFAULT_SPLIT_SIZE), + createMockSplit("A", DEFAULT_SPLIT_SIZE, DEFAULT_SPLIT_SIZE), + createMockSplit("A", DEFAULT_SPLIT_SIZE * 2, DEFAULT_SPLIT_SIZE) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testSelectedSplitsInTwoFiles() throws Exception { + + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .selectedSplit() + .split() + .build(), + MockInputFile.builder() + .path("B") + .selectedSplit() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, DEFAULT_SPLIT_SIZE), + createMockSplit("B", 0, DEFAULT_SPLIT_SIZE) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testOverlapWithPreviousFile() throws Exception { + + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .selectedSplit() + .build(), + MockInputFile.builder() + .path("B") + .split() + .selectedSplit() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, DEFAULT_SPLIT_SIZE), + createMockSplit("B", DEFAULT_SPLIT_SIZE - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testOverlapInSecondFile() throws Exception { + + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .selectedSplit() + .build(), + MockInputFile.builder() + .path("B") + .split() + .selectedSplit() + .selectedSplit() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, DEFAULT_SPLIT_SIZE), + createMockSplit("B", DEFAULT_SPLIT_SIZE - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL), + createMockSplit("B", DEFAULT_SPLIT_SIZE * 2, DEFAULT_SPLIT_SIZE) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testSmallSplitsLengthAdjustment() throws Exception { + + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .defaultSplitLength(SMALL_SPLIT_SIZE) + .split() + .selectedSplit() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, SMALL_SPLIT_SIZE * 2) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testSmallSplitsOverlap() throws Exception { + + testCase = SplitFilterTestCase.builder() + .inputFiles( + MockInputFile.builder() + .path("A") + .defaultSplitLength(SMALL_SPLIT_SIZE) + .selectedSplit() + .split() + .selectedSplit() + .split() + .selectedSplit() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, SMALL_SPLIT_SIZE), + createMockSplit("A", SMALL_SPLIT_SIZE * 2, SMALL_SPLIT_SIZE), + createMockSplit("A", SMALL_SPLIT_SIZE * 4, SMALL_SPLIT_SIZE) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test + public void testMaxSplitsSizePositive() throws Exception { + + testCase = SplitFilterTestCase.builder() + .maxInputSize(DEFAULT_SPLIT_SIZE * 3 + SYNC_INTERVAL * 2) + .inputFiles( + MockInputFile.builder() + .path("A") + .selectedSplit() + .split() + .selectedSplit() + .split() + .selectedSplit() + .build() + ) + .expectedSplits( + createMockSplit("A", 0, DEFAULT_SPLIT_SIZE), + createMockSplit("A", DEFAULT_SPLIT_SIZE * 2 - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL), + createMockSplit("A", DEFAULT_SPLIT_SIZE * 4 - SYNC_INTERVAL, DEFAULT_SPLIT_SIZE + SYNC_INTERVAL) + ) + .build(); + + testCase.executeAndValidate(); + } + + @Test(expected = IOException.class) + public void testMaxSplitsSizeNegative() throws Exception { + testCase = SplitFilterTestCase.builder() + .maxInputSize(DEFAULT_SPLIT_SIZE * 3) + .inputFiles( + MockInputFile.builder() + .path("A") + .selectedSplit() + .split() + .selectedSplit() + .split() + .selectedSplit() + .build() + ) + .expectedSplits() + .build(); + + testCase.executeAndValidate(); + } +}