Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 24BE9200D15 for ; Thu, 5 Oct 2017 19:52:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 23EF7160BDC; Thu, 5 Oct 2017 17:52:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 18BD01609D2 for ; Thu, 5 Oct 2017 19:52:01 +0200 (CEST) Received: (qmail 28426 invoked by uid 500); 5 Oct 2017 17:52:01 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 28415 invoked by uid 99); 5 Oct 2017 17:52:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Oct 2017 17:52:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 213EBF5824; Thu, 5 Oct 2017 17:52:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: prasanthj@apache.org To: commits@hive.apache.org Date: Thu, 05 Oct 2017 17:52:01 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] hive git commit: HIVE-17562: ACID 1.0 + ETL strategy should treat empty compacted files as uncovered deltas (Prasanth Jayachandran reviewed by Eugene Koifman) archived-at: Thu, 05 Oct 2017 17:52:03 -0000 Repository: hive Updated Branches: refs/heads/branch-2.2 0e795debd -> c6e60d1e3 refs/heads/branch-2.3 f4d288fad -> 0c56cf696 HIVE-17562: ACID 1.0 + ETL strategy should treat empty compacted files as uncovered deltas (Prasanth Jayachandran reviewed by Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b71a88f8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b71a88f8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b71a88f8 Branch: refs/heads/branch-2.2 Commit: b71a88f8e049bf2996fdf51de2aba0d68f4f28bb Parents: 0e795de Author: Prasanth Jayachandran Authored: Wed Sep 27 12:57:33 2017 -0700 Committer: Prasanth Jayachandran Committed: Thu Oct 5 10:37:39 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 58 +++++--- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 135 +++++++++++++++++++ 2 files changed, 172 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b71a88f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index fc7bbe4..44b5011 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1122,7 +1122,8 @@ public class OrcInputFormat implements InputFormat, private final boolean hasBase; private OrcFile.WriterVersion writerVersion; private long projColsUncompressedSize; - private final List deltaSplits; + private List deltaSplits; + private final SplitInfo splitInfo; private final ByteBuffer ppdResult; private final UserGroupInformation ugi; private final boolean allowSyntheticFileIds; @@ -1145,6 +1146,7 @@ public class OrcInputFormat implements InputFormat, this.hasBase = splitInfo.hasBase; this.projColsUncompressedSize = -1; this.deltaSplits = splitInfo.getSplits(); + this.splitInfo = splitInfo; this.allowSyntheticFileIds = allowSyntheticFileIds; this.ppdResult = splitInfo.ppdResult; } @@ -1319,6 +1321,7 @@ public class OrcInputFormat implements InputFormat, stripeStats, stripes.size(), file.getPath(), evolution); } } + return generateSplitsFromStripes(includeStripe); } } @@ -1351,31 +1354,44 @@ public class OrcInputFormat implements InputFormat, private List generateSplitsFromStripes(boolean[] includeStripe) throws IOException { List splits = new ArrayList<>(stripes.size()); - // if we didn't have predicate pushdown, read everything - if (includeStripe == null) { - includeStripe = new boolean[stripes.size()]; - Arrays.fill(includeStripe, true); - } - OffsetAndLength current = new OffsetAndLength(); - int idx = -1; - for (StripeInformation stripe : stripes) { - idx++; - - if (!includeStripe[idx]) { - // create split for the previous unfinished stripe - if (current.offset != -1) { - splits.add(createSplit(current.offset, current.length, orcTail)); - current.offset = -1; - } - continue; + // after major compaction, base files may become empty base files. Following sequence is an example + // 1) insert some rows + // 2) delete all rows + // 3) major compaction + // 4) insert some rows + // In such cases, consider base files without any stripes as uncovered delta + if (stripes == null || stripes.isEmpty()) { + AcidOutputFormat.Options options = AcidUtils.parseBaseOrDeltaBucketFilename(file.getPath(), context.conf); + int bucket = options.getBucket(); + splitInfo.covered[bucket] = false; + deltaSplits = splitInfo.getSplits(); + } else { + // if we didn't have predicate pushdown, read everything + if (includeStripe == null) { + includeStripe = new boolean[stripes.size()]; + Arrays.fill(includeStripe, true); } - current = generateOrUpdateSplit( + OffsetAndLength current = new OffsetAndLength(); + int idx = -1; + for (StripeInformation stripe : stripes) { + idx++; + + if (!includeStripe[idx]) { + // create split for the previous unfinished stripe + if (current.offset != -1) { + splits.add(createSplit(current.offset, current.length, orcTail)); + current.offset = -1; + } + continue; + } + + current = generateOrUpdateSplit( splits, current, stripe.getOffset(), stripe.getLength(), orcTail); + } + generateLastSplit(splits, current, orcTail); } - generateLastSplit(splits, current, orcTail); - // Add uncovered ACID delta splits. splits.addAll(deltaSplits); return splits; http://git-wip-us.apache.org/repos/asf/hive/blob/b71a88f8/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index ddd59a1..6d6b54a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -312,6 +312,141 @@ public class TestTxnCommands2 { Assert.assertEquals(stringifyValues(resultData), rs); } + @Test + public void testBICompactedNoStripes() throws Exception { + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] resultData = new int[][] {{1,2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + resultData = new int[][] {{3,4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + resultData = new int[][] {{3,4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "HYBRID"); + } + + @Test + public void testETLCompactedNoStripes() throws Exception { + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] resultData = new int[][] {{1,2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + resultData = new int[][] {{3,4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + resultData = new int[][] {{3,4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "HYBRID"); + } + + /** + * see HIVE-16177 + * See also {@link TestTxnCommands#testNonAcidToAcidConversion01()} + */ + @Test + public void testNonAcidToAcidConversion02() throws Exception { + //create 2 rows in a file 000001_0 (and an empty 000000_0) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(1,3)"); + //create 2 rows in a file 000000_0_copy1 and 2 rows in a file 000001_0_copy1 + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(0,13),(1,4),(1,5)"); + //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)"); + + //convert the table to Acid + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + List rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL); + //create a some of delta directories + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)"); + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = 120 where a = 0 and b = 12"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,17)"); + runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1 and b = 3"); + + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b"); + LOG.warn("before compact"); + for(String s : rs) { + LOG.warn(s); + } + /* + * All ROW__IDs are unique on read after conversion to acid + * ROW__IDs are exactly the same before and after compaction + * Also check the file name after compaction for completeness + */ + String[][] expected = { + {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13", "bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":0,\"rowid\":0}\t0\t15", "bucket_00000"}, + {"{\"transactionid\":3,\"bucketid\":0,\"rowid\":0}\t0\t17", "bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6", "bucket_00001"}, + {"{\"transactionid\":1,\"bucketid\":1,\"rowid\":0}\t1\t16", "bucket_00001"} + }; + Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + } + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b"); + LOG.warn("after compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(bucket) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + //make sure they are the same before and after compaction + } + /** * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction * 1. Insert a row to Non-ACID table