Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1FD6B200CB5 for ; Wed, 12 Jul 2017 19:32:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1E4131697C0; Wed, 12 Jul 2017 17:32:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C53E81697BC for ; Wed, 12 Jul 2017 19:32:32 +0200 (CEST) Received: (qmail 90933 invoked by uid 500); 12 Jul 2017 17:32:32 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 90722 invoked by uid 99); 12 Jul 2017 17:32:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Jul 2017 17:32:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BF6A7F54DE; Wed, 12 Jul 2017 17:32:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ekoifman@apache.org To: commits@hive.apache.org Date: Wed, 12 Jul 2017 17:32:32 -0000 Message-Id: <222d1375ed93448fa7852056bf657e37@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hive git commit: HIVE-16177 non Acid to acid conversion doesn't handle _copy_N files (Eugene Koifman, reviewed by Sergey Shelukhin) archived-at: Wed, 12 Jul 2017 17:32:35 -0000 HIVE-16177 non Acid to acid conversion doesn't handle _copy_N files (Eugene Koifman, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0b7e7273 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0b7e7273 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0b7e7273 Branch: refs/heads/branch-2 Commit: 0b7e72731db355b07f7647bd2c575fe52ad8520b Parents: 6a63742 Author: Eugene Koifman Authored: Wed Jul 12 10:32:24 2017 -0700 Committer: Eugene Koifman Committed: Wed Jul 12 10:32:24 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/Utilities.java | 2 +- .../hadoop/hive/ql/io/AcidInputFormat.java | 5 +- .../hadoop/hive/ql/io/AcidOutputFormat.java | 21 + .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 61 ++- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 63 +-- .../hive/ql/io/orc/OrcRawRecordMerger.java | 406 +++++++++++++++++-- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 9 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 4 +- .../hive/ql/txn/compactor/CompactorMR.java | 9 +- .../apache/hadoop/hive/ql/TestTxnCommands.java | 54 ++- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 62 +++ .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 19 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 18 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 28 +- .../TestVectorizedOrcAcidRowBatchReader.java | 2 +- .../queries/clientpositive/insert_orig_table.q | 20 +- .../clientpositive/insert_values_orig_table.q | 20 +- .../insert_values_orig_table_use_metadata.q | 23 +- .../clientpositive/insert_orig_table.q.out | 68 +++- .../insert_values_orig_table.q.out | 68 +++- .../insert_values_orig_table_use_metadata.q.out | 110 ++++- .../clientpositive/llap/insert_orig_table.q.out | 68 +++- 22 files changed, 984 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 79955e9..c8c2956 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1200,7 +1200,7 @@ public final class Utilities { * Group 6: copy [copy keyword] * Group 8: 2 [copy file index] */ - private static final String COPY_KEYWORD = "_copy_"; // copy keyword + public static final String COPY_KEYWORD = "_copy_"; // copy keyword private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX = Pattern.compile("^.*?"+ // any prefix "([0-9]+)"+ // taskId http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 7c7074d..25177ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -112,10 +112,13 @@ public interface AcidInputFormat private long minTxnId; private long maxTxnId; private List stmtIds; - + public DeltaMetaData() { this(0,0,new ArrayList()); } + /** + * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition + */ DeltaMetaData(long minTxnId, long maxTxnId, List stmtIds) { this.minTxnId = minTxnId; this.maxTxnId = maxTxnId; http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index b85b827..405cfde 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -25,6 +25,7 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.WritableComparable; @@ -51,6 +52,11 @@ public interface AcidOutputFormat extends HiveO private long minimumTransactionId; private long maximumTransactionId; private int bucket; + /** + * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)} + * _copy_N starts with 1. + */ + private int copyNumber = 0; private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id @@ -180,6 +186,18 @@ public interface AcidOutputFormat extends HiveO } /** + * Multiple inserts into legacy (pre-acid) tables can generate multiple copies of each bucket + * file. + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + * @param copyNumber the number of the copy ( > 0) + * @return this + */ + public Options copyNumber(int copyNumber) { + this.copyNumber = copyNumber; + return this; + } + + /** * Whether it should use the old style (0000000_0) filenames. * @param value should use the old style names * @return this @@ -293,6 +311,9 @@ public interface AcidOutputFormat extends HiveO public int getStatementId() { return statementId; } + public int getCopyNumber() { + return copyNumber; + } public Path getFinalDestination() { return finalDestination; } http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index da00bb3..4d23011 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -48,6 +49,8 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; + /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -99,6 +102,10 @@ public class AcidUtils { public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + /** + * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read + * a "delta" dir written by a real Acid write - cannot have any copies + */ public static final PathFilter originalBucketFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -113,6 +120,11 @@ public class AcidUtils { private static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); + /** + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + */ + private static final Pattern ORIGINAL_PATTERN_COPY = + Pattern.compile("[0-9]+_[0-9]+" + COPY_KEYWORD + "[0-9]+"); public static final PathFilter hiddenFileFilter = new PathFilter(){ @Override @@ -243,7 +255,21 @@ public class AcidUtils { .maximumTransactionId(0) .bucket(bucket) .writingBase(true); - } else if (filename.startsWith(BUCKET_PREFIX)) { + } + else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { + //todo: define groups in regex and use parseInt(Matcher.group(2)).... + int bucket = + Integer.parseInt(filename.substring(0, filename.indexOf('_'))); + int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1)); + result + .setOldStyle(true) + .minimumTransactionId(0) + .maximumTransactionId(0) + .bucket(bucket) + .copyNumber(copyNumber) + .writingBase(true); + } + else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { @@ -482,7 +508,7 @@ public class AcidUtils { Path getBaseDirectory(); /** - * Get the list of original files. Not {@code null}. + * Get the list of original files. Not {@code null}. Must be sorted. * @return the list of original files (eg. 000000_0) */ List getOriginalFiles(); @@ -825,7 +851,7 @@ public class AcidUtils { // Okay, we're going to need these originals. Recurse through them and figure out what we // really need. for (FileStatus origDir : originalDirectories) { - findOriginals(fs, origDir, original, useFileIds); + findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles); } } @@ -893,7 +919,20 @@ public class AcidUtils { final Path base = bestBase.status == null ? null : bestBase.status.getPath(); LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); - + /** + * If this sort order is changed and there are tables that have been converted to transactional + * and have had any update/delete/merge operations performed but not yet MAJOR compacted, it + * may result in data loss since it may change how + * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns + * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen). + */ + Collections.sort(original, new Comparator() { + @Override + public int compare(HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) { + //this does "Path.uri.compareTo(that.uri)" + return o1.getFileStatus().compareTo(o2.getFileStatus()); + } + }); return new Directory(){ @Override @@ -1011,7 +1050,7 @@ public class AcidUtils { * @throws IOException */ private static void findOriginals(FileSystem fs, FileStatus stat, - List original, Ref useFileIds) throws IOException { + List original, Ref useFileIds, boolean ignoreEmptyFiles) throws IOException { assert stat.isDir(); List childrenWithId = null; Boolean val = useFileIds.value; @@ -1031,18 +1070,22 @@ public class AcidUtils { if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { if (child.getFileStatus().isDir()) { - findOriginals(fs, child.getFileStatus(), original, useFileIds); + findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles); } else { - original.add(child); + if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) { + original.add(child); + } } } } else { List children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter); for (FileStatus child : children) { if (child.isDir()) { - findOriginals(fs, child, original, useFileIds); + findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles); } else { - original.add(createOriginalObj(null, child)); + if(!ignoreEmptyFiles || child.getLen() > 0) { + original.add(createOriginalObj(null, child)); + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 8fb7211..076bac1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1678,7 +1678,7 @@ public class OrcInputFormat implements InputFormat, // independent split strategies for them. There is a global flag 'isOriginal' that is set // on a per split strategy basis and it has to be same for all the files in that strategy. List> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs, - adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, + adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); for (SplitStrategy splitStrategy : splitStrategies) { @@ -1928,7 +1928,7 @@ public class OrcInputFormat implements InputFormat, } else { root = path.getParent().getParent(); } - } else { + } else {//here path is a delta/ but above it's a partition/ root = path; } @@ -1948,7 +1948,23 @@ public class OrcInputFormat implements InputFormat, final Configuration conf = options.getConfiguration(); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split); - final int bucket = OrcInputFormat.getBucketForSplit(conf, split); + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); + mergerOptions.rootPath(root); + final int bucket; + if (split.hasBase()) { + AcidOutputFormat.Options acidIOOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf); + if(acidIOOptions.getBucket() < 0) { + LOG.warn("Can't determine bucket ID for " + split.getPath() + "; ignoring"); + } + bucket = acidIOOptions.getBucket(); + if(split.isOriginal()) { + mergerOptions.copyIndex(acidIOOptions.getCopyNumber()).bucketPath(split.getPath()); + } + } else { + bucket = (int) split.getStart(); + } + final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf); readOptions.range(split.getStart(), split.getLength()); @@ -1957,7 +1973,7 @@ public class OrcInputFormat implements InputFormat, new ValidReadTxnList(txnString); final OrcRawRecordMerger records = new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, - validTxnList, readOptions, deltas); + validTxnList, readOptions, deltas, mergerOptions); return new RowReader() { OrcStruct innerRecord = records.createValue(); @@ -2009,20 +2025,20 @@ public class OrcInputFormat implements InputFormat, } }; } - - static Path findOriginalBucket(FileSystem fs, + private static Path findOriginalBucket(FileSystem fs, Path directory, int bucket) throws IOException { for(FileStatus stat: fs.listStatus(directory)) { - String name = stat.getPath().getName(); - String numberPart = name.substring(0, name.indexOf('_')); - if (org.apache.commons.lang3.StringUtils.isNumeric(numberPart) && - Integer.parseInt(numberPart) == bucket) { + if(stat.getLen() <= 0) { + continue; + } + AcidOutputFormat.Options bucketInfo = + AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf()); + if(bucketInfo.getBucket() == bucket) { return stat.getPath(); } } - throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + - directory); + throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory); } static Reader.Options createOptionsForReader(Configuration conf) { @@ -2055,14 +2071,6 @@ public class OrcInputFormat implements InputFormat, return reader; } - static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) { - if (orcSplit.hasBase()) { - return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); - } else { - return (int) orcSplit.getStart(); - } - } - public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg, OrcFile.WriterVersion writerVersion, List types, List stripeStats, int stripeCount) { @@ -2135,7 +2143,7 @@ public class OrcInputFormat implements InputFormat, @VisibleForTesting static List> determineSplitStrategies(CombinedCtx combinedCtx, Context context, - FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + FileSystem fs, Path dir, List baseFiles, List parsedDeltas, List readerTypes, @@ -2146,7 +2154,7 @@ public class OrcInputFormat implements InputFormat, // When no baseFiles, we will just generate a single split strategy and return. List acidSchemaFiles = new ArrayList(); if (baseFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2166,7 +2174,7 @@ public class OrcInputFormat implements InputFormat, // Generate split strategy for non-acid schema original files, if any. if (!originalSchemaFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2175,7 +2183,7 @@ public class OrcInputFormat implements InputFormat, // Generate split strategy for acid schema files, if any. if (!acidSchemaFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2187,7 +2195,7 @@ public class OrcInputFormat implements InputFormat, @VisibleForTesting static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, - FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + FileSystem fs, Path dir, List baseFiles, boolean isOriginal, List parsedDeltas, @@ -2262,8 +2270,11 @@ public class OrcInputFormat implements InputFormat, } reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf)); } + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options() + .isCompacting(true) + .rootPath(baseDirectory); return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal, - bucket, validTxnList, new Reader.Options(), deltaDirectory); + bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95b8806..ffcdf6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; @@ -59,11 +62,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ private final long length; private final ValidTxnList validTxnList; private final int columns; - private ReaderKey prevKey = new ReaderKey(); + private final ReaderKey prevKey = new ReaderKey(); // this is the key less than the lowest key we need to process - private RecordIdentifier minKey; + private final RecordIdentifier minKey; // this is the last key we need to process - private RecordIdentifier maxKey; + private final RecordIdentifier maxKey; // an extra value so that we can return it while reading ahead private OrcStruct extraValue; @@ -156,7 +159,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; } - public long getCurrentTransactionId() { + long getCurrentTransactionId() { return currentTransactionId; } @@ -165,7 +168,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ * @param other the value to compare to * @return -1, 0, +1 */ - public int compareRow(RecordIdentifier other) { + int compareRow(RecordIdentifier other) { return compareToInternal(other); } @@ -188,9 +191,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ final Reader reader; final RecordReader recordReader; final ReaderKey key; - final RecordIdentifier maxKey; + private final RecordIdentifier minKey; + private final RecordIdentifier maxKey; final int bucket; private final int statementId; + boolean advancedToMinKey = false; /** * Create a reader that reads from the first key larger than minKey to any @@ -210,21 +215,33 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ ReaderImpl.Options options, int statementId) throws IOException { this.reader = reader; this.key = key; + this.minKey = minKey; this.maxKey = maxKey; this.bucket = bucket; // TODO use stripe statistics to jump over stripes recordReader = reader.rowsOptions(options); this.statementId = statementId; + } + RecordReader getRecordReader() { + return recordReader; + } + /** + * This must be called right after the constructor but not in the constructor to make sure + * sub-classes are fully initialized before their {@link #next(OrcStruct)} is called + */ + void advnaceToMinKey() throws IOException { + advancedToMinKey = true; // advance the reader until we reach the minimum key do { next(nextRecord); } while (nextRecord != null && - (minKey != null && key.compareRow(minKey) <= 0)); + (getMinKey() != null && key.compareRow(getMinKey()) <= 0)); } void next(OrcStruct next) throws IOException { - if (recordReader.hasNext()) { - nextRecord = (OrcStruct) recordReader.next(next); + assert advancedToMinKey : "advnaceToMinKey() was not called"; + if (getRecordReader().hasNext()) { + nextRecord = (OrcStruct) getRecordReader().next(next); // set the key key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord), OrcRecordUpdater.getBucket(nextRecord), @@ -233,10 +250,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ statementId); // if this record is larger than maxKey, we need to stop - if (maxKey != null && key.compareRow(maxKey) > 0) { - LOG.debug("key " + key + " > maxkey " + maxKey); + if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) { + LOG.debug("key " + key + " > maxkey " + getMaxKey()); nextRecord = null; - recordReader.close(); + getRecordReader().close(); } } else { nextRecord = null; @@ -244,6 +261,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ } } + RecordIdentifier getMinKey() { + return minKey; + } + RecordIdentifier getMaxKey() { + return maxKey; + } int getColumns() { return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount(); } @@ -253,18 +276,192 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ * A reader that pretends an original base file is a new version base file. * It wraps the underlying reader's row with an ACID event object and * makes the relevant translations. + * + * Running multiple Insert statements on the same partition (of non acid table) creates files + * like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc. So the OriginalReaderPair must treat all + * of these files as part of a single logical bucket file. + * + * For Compaction, where each split includes the whole bucket, this means reading over all the + * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket. + * + * For a read after the table is marked transactional but before it's rewritten into a base/ + * by compaction, each of the original files may be split into many pieces. For each split we + * must make sure to include only the relevant part of each delta file. + * {@link OrcRawRecordMerger#minKey} and {@link OrcRawRecordMerger#maxKey} are computed for each + * split of the original file and used to filter rows from all the deltas. The ROW__ID.rowid for + * the rows of the 'original' file of course, must be assigned from the beginning of logical + * bucket. */ static final class OriginalReaderPair extends ReaderPair { + private final Options mergerOptions; + /** + * Sum total of all rows in all the files before the 'current' one in {@link #originalFiles} list + */ + private long rowIdOffset = 0; + /** + * See {@link AcidUtils.Directory#getOriginalFiles()}. This list has a fixed sort order. This + * is the full list when compacting and empty when doing a simple read. The later is because we + * only need to read the current split from 1 file for simple read. + */ + private final List originalFiles; + /** + * index into {@link #originalFiles} + */ + private int nextFileIndex = 0; + private long numRowsInCurrentFile = 0; + private RecordReader originalFileRecordReader = null; + private final Configuration conf; + private final Reader.Options options; + private final RecordIdentifier minKey;//shadow parent minKey to make final + private final RecordIdentifier maxKey;//shadow parent maxKey to make final + OriginalReaderPair(ReaderKey key, Reader reader, int bucket, - RecordIdentifier minKey, RecordIdentifier maxKey, - Reader.Options options) throws IOException { + final RecordIdentifier minKey, final RecordIdentifier maxKey, + Reader.Options options, Options mergerOptions, Configuration conf, + ValidTxnList validTxnList) throws IOException { super(key, reader, bucket, minKey, maxKey, options, 0); + this.mergerOptions = mergerOptions; + this.conf = conf; + this.options = options; + assert mergerOptions.getRootPath() != null : "Since we have original files"; + assert bucket >= 0 : "don't support non-bucketed tables yet"; + + RecordIdentifier newMinKey = minKey; + RecordIdentifier newMaxKey = maxKey; + if(mergerOptions.isCompacting()) { + { + //when compacting each split needs to process the whole logical bucket + assert options.getOffset() == 0; + assert options.getMaxOffset() == Long.MAX_VALUE; + assert minKey == null; + assert maxKey == null; + } + AcidUtils.Directory directoryState = AcidUtils.getAcidState( + mergerOptions.getRootPath(), conf, validTxnList, false, true); + originalFiles = directoryState.getOriginalFiles(); + assert originalFiles.size() > 0; + /** + * when there are no copyN files, the {@link #recordReader} will be the the one and only + * file for for 'bucket' but closing here makes flow cleaner and only happens once in the + * life of the table. With copyN files, the caller may pass in any one of the copyN files. + * This is less prone to bugs than expecting the reader to pass in a Reader for the 1st file + * of a logical bucket.*/ + recordReader.close(); + reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket + if(reader == null) { + //Compactor generated a split for a bucket that has no data? + throw new IllegalStateException("No 'original' files found for bucketId=" + bucket + + " in " + mergerOptions.getRootPath()); + } + numRowsInCurrentFile = reader.getNumberOfRows(); + originalFileRecordReader = reader.rowsOptions(options); + } + else { + /** + * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't + * know N a priori so if this is true, then the current split is from 0000_0_copyN file. + * It's needed to correctly set maxKey. In particular, set maxKey==null if this split + * is the tail of the last file for this logical bucket to include all deltas written after + * non-acid to acid table conversion. + */ + boolean isLastFileForThisBucket = false; + boolean haveSeenCurrentFile = false; + originalFiles = Collections.emptyList(); + if (mergerOptions.getCopyIndex() > 0) { + //the split is from something other than the 1st file of the logical bucket - compute offset + + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), + conf, validTxnList, false, true); + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucket() != bucket) { + continue; + } + if(haveSeenCurrentFile) { + //if here we already saw current file and now found another file for the same bucket + //so the current file is not the last file of the logical bucket + isLastFileForThisBucket = false; + break; + } + if(f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) { + /** + * found the file whence the current split is from so we're done + * counting {@link rowIdOffset} + */ + haveSeenCurrentFile = true; + isLastFileForThisBucket = true; + continue; + } + Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(), + OrcFile.readerOptions(conf)); + rowIdOffset += copyReader.getNumberOfRows(); + } + if (rowIdOffset > 0) { + //rowIdOffset could be 0 if all files before current one are empty + /** + * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, + * int, Reader.Options)} need to fix min/max key since these are used by + * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for + * the key. Clear? */ + if (minKey != null) { + minKey.setRowId(minKey.getRowId() + rowIdOffset); + } + else { + /** + * If this is not the 1st file, set minKey 1 less than the start of current file + * (Would not need to set minKey if we knew that there are no delta files) + * {@link #advanceToMinKey()} needs this */ + newMinKey = new RecordIdentifier(0, bucket, rowIdOffset - 1); + } + if (maxKey != null) { + maxKey.setRowId(maxKey.getRowId() + rowIdOffset); + } + } + } else { + isLastFileForThisBucket = true; + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), + conf, validTxnList, false, true); + int numFilesInBucket= 0; + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucket() == bucket) { + numFilesInBucket++; + if(numFilesInBucket > 1) { + isLastFileForThisBucket = false; + break; + } + } + } + } + originalFileRecordReader = recordReader; + if(!isLastFileForThisBucket && maxKey == null) { + /* + * If this is the last file for this bucket, maxKey == null means the split is the tail + * of the file so we want to leave it blank to make sure any insert events in delta + * files are included; Conversely, if it's not the last file, set the maxKey so that + * events from deltas that don't modify anything in the current split are excluded*/ + newMaxKey = new RecordIdentifier(0, bucket, + rowIdOffset + reader.getNumberOfRows() - 1); + } + } + this.minKey = newMinKey; + this.maxKey = newMaxKey; } - - @Override - void next(OrcStruct next) throws IOException { - if (recordReader.hasNext()) { - long nextRowId = recordReader.getRowNumber(); + @Override RecordReader getRecordReader() { + return originalFileRecordReader; + } + @Override RecordIdentifier getMinKey() { + return minKey; + } + @Override RecordIdentifier getMaxKey() { + return maxKey; + } + private boolean nextFromCurrentFile(OrcStruct next) throws IOException { + if (originalFileRecordReader.hasNext()) { + //RecordReader.getRowNumber() produces a file-global row number even with PPD + long nextRowId = originalFileRecordReader.getRowNumber() + rowIdOffset; // have to do initialization here, because the super's constructor // calls next and thus we need to initialize before our constructor // runs @@ -282,7 +479,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(nextRowId)); nextRecord.setFieldValue(OrcRecordUpdater.ROW, - recordReader.next(null)); + originalFileRecordReader.next(null)); } else { nextRecord = next; ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) @@ -296,20 +493,62 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); nextRecord.setFieldValue(OrcRecordUpdater.ROW, - recordReader.next(OrcRecordUpdater.getRow(next))); + originalFileRecordReader.next(OrcRecordUpdater.getRow(next))); } key.setValues(0L, bucket, nextRowId, 0L, 0); if (maxKey != null && key.compareRow(maxKey) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + maxKey); } - nextRecord = null; - recordReader.close(); + return false;//reached End Of Split } - } else { - nextRecord = null; - recordReader.close(); + return true; } + return false;//reached EndOfFile + } + @Override + void next(OrcStruct next) throws IOException { + assert advancedToMinKey : "advnaceToMinKey() was not called"; + while(true) { + if(nextFromCurrentFile(next)) { + return; + } else { + if (originalFiles.size() <= nextFileIndex) { + //no more original files to read + nextRecord = null; + originalFileRecordReader.close(); + return; + } else { + assert mergerOptions.isCompacting() : "originalFiles.size() should be 0 when not compacting"; + rowIdOffset += numRowsInCurrentFile; + originalFileRecordReader.close(); + Reader reader = advanceToNextFile(); + if(reader == null) { + nextRecord = null; + return; + } + numRowsInCurrentFile = reader.getNumberOfRows(); + originalFileRecordReader = reader.rowsOptions(options); + } + } + } + } + /** + * Finds the next file of the logical bucket + * @return {@code null} if there are no more files + */ + private Reader advanceToNextFile() throws IOException { + while(nextFileIndex < originalFiles.size()) { + AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf); + if (bucketOptions.getBucket() == bucket) { + break; + } + nextFileIndex++; + } + if(originalFiles.size() <= nextFileIndex) { + return null;//no more files for current bucket + } + return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf)); } @Override @@ -318,6 +557,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ } } + /** + * The process here reads several (base + some deltas) files each of which is sorted on + * {@link ReaderKey} ascending. The output of this Reader should a global order across these + * files. The root of this tree is always the next 'file' to read from. + */ private final TreeMap readers = new TreeMap(); @@ -327,6 +571,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ // The key of the next lowest reader. private ReaderKey secondaryKey = null; + private static final class KeyInterval { + private final RecordIdentifier minKey; + private final RecordIdentifier maxKey; + private KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) { + this.minKey = minKey; + this.maxKey = maxKey; + } + private RecordIdentifier getMinKey() { + return minKey; + } + private RecordIdentifier getMaxKey() { + return maxKey; + } + } /** * Find the key range for original bucket files. * @param reader the reader @@ -334,14 +592,24 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ * @param options the options for reading with * @throws IOException */ - private void discoverOriginalKeyBounds(Reader reader, int bucket, + private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket, Reader.Options options ) throws IOException { long rowLength = 0; long rowOffset = 0; - long offset = options.getOffset(); - long maxOffset = options.getMaxOffset(); + long offset = options.getOffset();//this would usually be at block boundary + long maxOffset = options.getMaxOffset();//this would usually be at block boundary boolean isTail = true; + RecordIdentifier minKey = null; + RecordIdentifier maxKey = null; + /** + * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't + * necessarily match stripe boundary. So we want to come up with minKey to be one before the 1st + * row of the first stripe that starts after getOffset() and maxKey to be the last row of the + * stripe that contains getMaxOffset(). This breaks if getOffset() and getMaxOffset() are inside + * the sames tripe - in this case we have minKey & isTail=false but rowLength is never set. + * (HIVE-16953) + */ for(StripeInformation stripe: reader.getStripes()) { if (offset > stripe.getOffset()) { rowOffset += stripe.getNumberOfRows(); @@ -358,6 +626,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ if (!isTail) { maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1); } + return new KeyInterval(minKey, maxKey); } /** @@ -366,7 +635,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ * @param options the options for reading with * @throws IOException */ - private void discoverKeyBounds(Reader reader, + private KeyInterval discoverKeyBounds(Reader reader, Reader.Options options) throws IOException { RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader); long offset = options.getOffset(); @@ -374,6 +643,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ int firstStripe = 0; int stripeCount = 0; boolean isTail = true; + RecordIdentifier minKey = null; + RecordIdentifier maxKey = null; + List stripes = reader.getStripes(); for(StripeInformation stripe: stripes) { if (offset > stripe.getOffset()) { @@ -391,6 +663,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ if (!isTail) { maxKey = keyIndex[firstStripe + stripeCount - 1]; } + return new KeyInterval(minKey, maxKey); } /** @@ -416,6 +689,49 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ return result; } + static class Options { + private int copyIndex = 0; + private boolean isCompacting = false; + private Path bucketPath; + private Path rootPath; + Options copyIndex(int copyIndex) { + assert copyIndex >= 0; + this.copyIndex = copyIndex; + return this; + } + Options isCompacting(boolean isCompacting) { + this.isCompacting = isCompacting; + return this; + } + Options bucketPath(Path bucketPath) { + this.bucketPath = bucketPath; + return this; + } + Options rootPath(Path rootPath) { + this.rootPath = rootPath; + return this; + } + /** + * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix + */ + int getCopyIndex() { + return copyIndex; + } + boolean isCompacting() { + return isCompacting; + } + /** + * Full path to the data file + * @return + */ + Path getBucketPath() { + return bucketPath; + } + /** + * Partition folder (Table folder if not partitioned) + */ + Path getRootPath() { return rootPath; } + } /** * Create a reader that merge sorts the ACID events together. * @param conf the configuration @@ -433,7 +749,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ int bucket, ValidTxnList validTxnList, Reader.Options options, - Path[] deltaDirectory) throws IOException { + Path[] deltaDirectory, Options mergerOptions) throws IOException { this.conf = conf; this.collapse = collapseEvents; this.offset = options.getOffset(); @@ -450,32 +766,36 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ Reader.Options eventOptions = createEventOptions(options); if (reader == null) { baseReader = null; + minKey = maxKey = null; } else { - - // find the min/max based on the offset and length + KeyInterval keyInterval; + // find the min/max based on the offset and length (and more for 'original') if (isOriginal) { - discoverOriginalKeyBounds(reader, bucket, options); + keyInterval = discoverOriginalKeyBounds(reader, bucket, options); } else { - discoverKeyBounds(reader, options); + keyInterval = discoverKeyBounds(reader, options); } - LOG.info("min key = " + minKey + ", max key = " + maxKey); + LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // use the min/max instead of the byte range ReaderPair pair; ReaderKey key = new ReaderKey(); if (isOriginal) { options = options.clone(); - pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey, - options); + pair = new OriginalReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), + options, mergerOptions, conf, validTxnList); } else { - pair = new ReaderPair(key, reader, bucket, minKey, maxKey, + pair = new ReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), eventOptions, 0); } - + minKey = pair.getMinKey(); + maxKey = pair.getMaxKey(); + LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); + pair.advnaceToMinKey(); // if there is at least one record, put it in the map if (pair.nextRecord != null) { readers.put(key, pair); } - baseReader = pair.recordReader; + baseReader = pair.getRecordReader(); } // we always want to read all of the deltas @@ -504,6 +824,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ ReaderPair deltaPair; deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); + deltaPair.advnaceToMinKey(); if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } @@ -648,6 +969,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ @Override public float getProgress() throws IOException { + //this is not likely to do the right thing for Compaction of "original" files when there are copyN files return baseReader == null ? 1 : baseReader.getProgress(); } http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680..29f5a8e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; import java.util.Arrays; import java.util.BitSet; -import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; @@ -42,9 +41,6 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; -import org.apache.orc.TypeDescription; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; @@ -360,8 +356,11 @@ public class VectorizedOrcAcidRowBatchReader int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); + assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly"; this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, - validTxnList, readerOptions, deleteDeltas); + validTxnList, readerOptions, deleteDeltas, + mergerOptions); this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey(); this.deleteRecordValue = this.deleteRecords.createValue(); // Initialize the first value in the delete reader. http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f11feb7..68f5003 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3037,14 +3037,14 @@ private void constructOneLBLocationMap(FileStatus fSta, int counter = 1; if (!isRenameAllowed || isBlobStoragePath) { while (destFs.exists(destFilePath)) { - destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : "")); + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); counter++; } } if (isRenameAllowed) { while (!destFs.rename(sourcePath, destFilePath)) { - destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : "")); + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); counter++; } } else if (isSrcLocal) { http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f83b6db..bafed9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -251,6 +251,7 @@ public class CompactorMR { // There are original format files for (HdfsFileStatusWithId stat : originalFiles) { Path path = stat.getFileStatus().getPath(); + //note that originalFiles are all original files recursively not dirs dirsToSearch.add(path); LOG.debug("Adding original file " + path + " to dirs to search"); } @@ -275,6 +276,12 @@ public class CompactorMR { su.gatherStats(); } + + /** + * @param baseDir if not null, it's either table/partition root folder or base_xxxx. + * If it's base_xxxx, it's in dirsToSearch, else the actual original files + * (all leaves recursively) are in the dirsToSearch list + */ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType, StringableList dirsToSearch, List parsedDeltas, @@ -363,7 +370,7 @@ public class CompactorMR { * @param hadoopConf * @param bucket bucket to be processed by this split * @param files actual files this split should process. It is assumed the caller has already - * parsed out the files in base and deltas to populate this list. + * parsed out the files in base and deltas to populate this list. Includes copy_N * @param base directory of the base, or the partition/table location if the files are in old * style. Can be null. * @param deltas directories of the delta files. http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index c8bc119..7a25133 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.output.StringBuilderWriter; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,6 +32,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -105,6 +105,7 @@ public class TestTxnCommands { hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); @@ -904,4 +905,55 @@ public class TestTxnCommands { runStatementOnDriver("create table if not exists e011_02 (c1 float, c2 double, c3 float)"); runStatementOnDriver("merge into merge_test using e011_02 on (merge_test.c1 = e011_02.c1) when not matched then insert values (case when e011_02.c1 > 0 then e011_02.c1 + 1 else e011_02.c1 end, e011_02.c2 + e011_02.c3, coalesce(e011_02.c3, 1))"); } + /** + * HIVE-16177 + * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} + */ + @Test + public void testNonAcidToAcidConversion01() throws Exception { + //create 1 row in a file 000001_0 (and an empty 000000_0) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + //create 1 row in a file 000000_0_copy1 and 1 row in a file 000001_0_copy1 + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(1,5)"); + + //convert the table to Acid + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + //create a delta directory + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)"); + + //make sure we assign correct Ids + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"); + LOG.warn("before compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("", 4, rs.size()); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":1,\"rowid\":0}\t1\t17")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1")); + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"); + LOG.warn("after compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("", 4, rs.size()); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000001/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000001/bucket_00001")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000001/bucket_00001")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":1,\"rowid\":0}\t1\t17")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000001/bucket_00001")); + + //make sure they are the same before and after compaction + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index e2db5b7..6726273 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -319,6 +319,68 @@ public class TestTxnCommands2 { resultData = new int[][] {{3,8}, {5,6}, {9,20}}; Assert.assertEquals(stringifyValues(resultData), rs); } + /** + * see HIVE-16177 + * See also {@link TestTxnCommands#testNonAcidToAcidConversion01()} + */ + @Test + public void testNonAcidToAcidConversion02() throws Exception { + //create 2 rows in a file 000001_0 (and an empty 000000_0) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(1,3)"); + //create 2 rows in a file 000000_0_copy1 and 2 rows in a file 000001_0_copy1 + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(0,13),(1,4),(1,5)"); + //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)"); + + //convert the table to Acid + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + List rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL); + //create a some of delta directories + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)"); + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = 120 where a = 0 and b = 12"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,17)"); + runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1 and b = 3"); + + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b"); + LOG.warn("before compact"); + for(String s : rs) { + LOG.warn(s); + } + /* + * All ROW__IDs are unique on read after conversion to acid + * ROW__IDs are exactly the same before and after compaction + * Also check the file name after compaction for completeness + */ + String[][] expected = { + {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13", "bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":0,\"rowid\":0}\t0\t15", "bucket_00000"}, + {"{\"transactionid\":3,\"bucketid\":0,\"rowid\":0}\t0\t17", "bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6", "bucket_00001"}, + {"{\"transactionid\":1,\"bucketid\":1,\"rowid\":0}\t1\t16", "bucket_00001"} + }; + Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + } + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b"); + LOG.warn("after compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(bucket) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + //make sure they are the same before and after compaction + } /** * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index a7ff9a3..c928732 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile; @@ -142,6 +143,10 @@ public class TestAcidUtils { Configuration conf = new Configuration(); MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/tbl/part1/000000_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1", + 500, new byte[0]), + new MockFile("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "2", + 500, new byte[0]), new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0]), new MockFile("mock:/tbl/part1/000002_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/random", 500, new byte[0]), @@ -154,13 +159,17 @@ public class TestAcidUtils { assertEquals(0, dir.getCurrentDirectories().size()); assertEquals(0, dir.getObsolete().size()); List result = dir.getOriginalFiles(); - assertEquals(5, result.size()); + assertEquals(7, result.size()); assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString()); - assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString()); - assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString()); - assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1", + result.get(1).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "2", + result.get(2).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000001_1", result.get(3).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000002_0", result.get(4).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/random", result.get(5).getFileStatus().getPath().toString()); assertEquals("mock:/tbl/part1/subdir/000000_0", - result.get(4).getFileStatus().getPath().toString()); + result.get(6).getFileStatus().getPath().toString()); } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b003eb8..44a76f6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -911,7 +911,7 @@ public class TestInputOutputFormat { MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategies(combineCtx, context, - adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); } @@ -925,7 +925,7 @@ public class TestInputOutputFormat { OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); } @@ -3398,7 +3398,9 @@ public class TestInputOutputFormat { // call-2: open to read data - split 1 => mock:/mocktable5/0_0 // call-3: open to read footer - split 2 => mock:/mocktable5/0_1 // call-4: open to read data - split 2 => mock:/mocktable5/0_1 - assertEquals(4, readOpsDelta); + // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_0 + // call-6: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_1 + assertEquals(6, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3471,7 +3473,9 @@ public class TestInputOutputFormat { } // call-1: open to read data - split 1 => mock:/mocktable6/0_0 // call-2: open to read data - split 2 => mock:/mocktable6/0_1 - assertEquals(2, readOpsDelta); + // call-3: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_0 + // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_1 + assertEquals(4, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3549,7 +3553,8 @@ public class TestInputOutputFormat { // call-2: open to read data - split 1 => mock:/mocktable7/0_0 // call-3: open side file (flush length) of delta directory // call-4: fs.exists() check for delta_xxx_xxx/bucket_00000 file - assertEquals(4, readOpsDelta); + // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable7/0_0 + assertEquals(5, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3626,7 +3631,8 @@ public class TestInputOutputFormat { // call-1: open to read data - split 1 => mock:/mocktable8/0_0 // call-2: open side file (flush length) of delta directory // call-3: fs.exists() check for delta_xxx_xxx/bucket_00000 file - assertEquals(3, readOpsDelta); + // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable8/0_0 + assertEquals(4, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 1ce1bfb..584bd3b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -189,6 +189,7 @@ public class TestOrcRawRecordMerger { RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, new Reader.Options(), 0); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -215,6 +216,7 @@ public class TestOrcRawRecordMerger { ReaderPair pair = new ReaderPair(key, reader, 20, null, null, new Reader.Options(), 0); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -290,8 +292,14 @@ public class TestOrcRawRecordMerger { RecordIdentifier minKey = new RecordIdentifier(0, 10, 1); RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3); boolean[] includes = new boolean[]{true, true}; + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testOriginalReaderPair"); + fs.makeQualified(root); + fs.create(root); ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, - new Reader.Options().include(includes)); + new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); @@ -319,8 +327,14 @@ public class TestOrcRawRecordMerger { public void testOriginalReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockOriginalReader(); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testOriginalReaderPairNoMin"); + fs.makeQualified(root); + fs.create(root); ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null, - new Reader.Options()); + new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); + pair.advnaceToMinKey(); assertEquals("first", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); @@ -423,7 +437,7 @@ public class TestOrcRawRecordMerger { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader, false, 10, createMaximalTxnList(), - new Reader.Options().range(1000, 1000), null); + new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options()); RecordReader rr = merger.getCurrentReader().recordReader; assertEquals(0, merger.getOtherReaders().size()); @@ -531,7 +545,7 @@ public class TestOrcRawRecordMerger { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); RecordIdentifier key = merger.createKey(); OrcStruct value = merger.createValue(); assertEquals(false, merger.next(key, value)); @@ -606,7 +620,7 @@ public class TestOrcRawRecordMerger { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); assertEquals(null, merger.getMinKey()); assertEquals(null, merger.getMaxKey()); RecordIdentifier id = merger.createKey(); @@ -681,7 +695,7 @@ public class TestOrcRawRecordMerger { // make a merger that doesn't collapse events merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, @@ -780,7 +794,7 @@ public class TestOrcRawRecordMerger { merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, txns, new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); for(int i=0; i < values.length; ++i) { assertEquals(true, merger.next(id, event)); LOG.info("id = " + id + "event = " + event); http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 6bf1312..73bc1ab 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -182,7 +182,7 @@ public class TestVectorizedOrcAcidRowBatchReader { OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null); OrcInputFormat.AcidDirInfo adi = gen.call(); List> splitStrategies = OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); assertEquals(1, splitStrategies.size()); List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/queries/clientpositive/insert_orig_table.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/insert_orig_table.q b/ql/src/test/queries/clientpositive/insert_orig_table.q index de408d9..544fe11 100644 --- a/ql/src/test/queries/clientpositive/insert_orig_table.q +++ b/ql/src/test/queries/clientpositive/insert_orig_table.q @@ -1,9 +1,23 @@ -set hive.strict.checks.bucketing=false; - set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +drop table if exists acid_iot_stage; +create table acid_iot_stage( + ctinyint TINYINT, + csmallint SMALLINT, + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cstring1 STRING, + cstring2 STRING, + ctimestamp1 TIMESTAMP, + ctimestamp2 TIMESTAMP, + cboolean1 BOOLEAN, + cboolean2 BOOLEAN) stored as orc; +LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage; + create table acid_iot( ctinyint TINYINT, csmallint SMALLINT, @@ -18,7 +32,7 @@ create table acid_iot( cboolean1 BOOLEAN, cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true'); -LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot; +insert into acid_iot select * from acid_iot_stage; select count(*) from acid_iot; http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/queries/clientpositive/insert_values_orig_table.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/insert_values_orig_table.q b/ql/src/test/queries/clientpositive/insert_values_orig_table.q index db4fc82..1319a0a 100644 --- a/ql/src/test/queries/clientpositive/insert_values_orig_table.q +++ b/ql/src/test/queries/clientpositive/insert_values_orig_table.q @@ -1,9 +1,23 @@ -set hive.strict.checks.bucketing=false; - set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +drop table if exists acid_ivot_stage; +create table acid_ivot_stage( + ctinyint TINYINT, + csmallint SMALLINT, + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cstring1 STRING, + cstring2 STRING, + ctimestamp1 TIMESTAMP, + ctimestamp2 TIMESTAMP, + cboolean1 BOOLEAN, + cboolean2 BOOLEAN) stored as orc; +LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot_stage; + create table acid_ivot( ctinyint TINYINT, csmallint SMALLINT, @@ -18,7 +32,7 @@ create table acid_ivot( cboolean1 BOOLEAN, cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true'); -LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot; +insert into acid_ivot select * from acid_ivot_stage; select count(*) from acid_ivot; http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q b/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q index 49c5b0a..2f366fb 100644 --- a/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q +++ b/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q @@ -1,10 +1,25 @@ -set hive.strict.checks.bucketing=false; - set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; set hive.compute.query.using.stats=true; +drop table if exists acid_ivot_stage; +create table acid_ivot_stage( + ctinyint TINYINT, + csmallint SMALLINT, + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cstring1 STRING, + cstring2 STRING, + ctimestamp1 TIMESTAMP, + ctimestamp2 TIMESTAMP, + cboolean1 BOOLEAN, + cboolean2 BOOLEAN) stored as orc; + +LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot_stage; + create table acid_ivot( ctinyint TINYINT, csmallint SMALLINT, @@ -21,7 +36,7 @@ create table acid_ivot( desc formatted acid_ivot; -LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot; +insert into acid_ivot select * from acid_ivot_stage; desc formatted acid_ivot; @@ -71,7 +86,7 @@ explain select count(*) from acid_ivot; select count(*) from acid_ivot; -LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot; +insert into acid_ivot select * from acid_ivot_stage; desc formatted acid_ivot; http://git-wip-us.apache.org/repos/asf/hive/blob/0b7e7273/ql/src/test/results/clientpositive/insert_orig_table.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/insert_orig_table.q.out b/ql/src/test/results/clientpositive/insert_orig_table.q.out index 5eea74d..a62e0ec 100644 --- a/ql/src/test/results/clientpositive/insert_orig_table.q.out +++ b/ql/src/test/results/clientpositive/insert_orig_table.q.out @@ -1,3 +1,47 @@ +PREHOOK: query: drop table if exists acid_iot_stage +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists acid_iot_stage +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table acid_iot_stage( + ctinyint TINYINT, + csmallint SMALLINT, + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cstring1 STRING, + cstring2 STRING, + ctimestamp1 TIMESTAMP, + ctimestamp2 TIMESTAMP, + cboolean1 BOOLEAN, + cboolean2 BOOLEAN) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@acid_iot_stage +POSTHOOK: query: create table acid_iot_stage( + ctinyint TINYINT, + csmallint SMALLINT, + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cstring1 STRING, + cstring2 STRING, + ctimestamp1 TIMESTAMP, + ctimestamp2 TIMESTAMP, + cboolean1 BOOLEAN, + cboolean2 BOOLEAN) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@acid_iot_stage +PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@acid_iot_stage +POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@acid_iot_stage PREHOOK: query: create table acid_iot( ctinyint TINYINT, csmallint SMALLINT, @@ -30,14 +74,26 @@ POSTHOOK: query: create table acid_iot( POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@acid_iot -PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot -PREHOOK: type: LOAD -#### A masked pattern was here #### +PREHOOK: query: insert into acid_iot select * from acid_iot_stage +PREHOOK: type: QUERY +PREHOOK: Input: default@acid_iot_stage PREHOOK: Output: default@acid_iot -POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot -POSTHOOK: type: LOAD -#### A masked pattern was here #### +POSTHOOK: query: insert into acid_iot select * from acid_iot_stage +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acid_iot_stage POSTHOOK: Output: default@acid_iot +POSTHOOK: Lineage: acid_iot.cbigint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: acid_iot.cboolean1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cboolean1, type:boolean, comment:null), ] +POSTHOOK: Lineage: acid_iot.cboolean2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cboolean2, type:boolean, comment:null), ] +POSTHOOK: Lineage: acid_iot.cdouble SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: acid_iot.cfloat SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: acid_iot.cint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: acid_iot.csmallint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: acid_iot.cstring1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: acid_iot.cstring2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cstring2, type:string, comment:null), ] +POSTHOOK: Lineage: acid_iot.ctimestamp1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ] +POSTHOOK: Lineage: acid_iot.ctimestamp2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ] +POSTHOOK: Lineage: acid_iot.ctinyint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] PREHOOK: query: select count(*) from acid_iot PREHOOK: type: QUERY PREHOOK: Input: default@acid_iot