From commits-return-37347-archive-asf-public=cust-asf.ponee.io@hive.apache.org Thu Nov 22 00:06:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EE2E018067B for ; Thu, 22 Nov 2018 00:06:32 +0100 (CET) Received: (qmail 58566 invoked by uid 500); 21 Nov 2018 23:06:31 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 58541 invoked by uid 99); 21 Nov 2018 23:06:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Nov 2018 23:06:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E1651E0954; Wed, 21 Nov 2018 23:06:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ekoifman@apache.org To: commits@hive.apache.org Date: Wed, 21 Nov 2018 23:06:32 -0000 Message-Id: <3588c9a50e7b4cd5bcb5a882831fd55b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hive git commit: HIVE-20823: Make Compactor run in a transaction (Eugene Koifman, reviewed by Vaibhav Gumashta) HIVE-20823: Make Compactor run in a transaction (Eugene Koifman, reviewed by Vaibhav Gumashta) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ddf3b6cd Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ddf3b6cd Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ddf3b6cd Branch: refs/heads/master Commit: ddf3b6cd041a9eae67e70074cc1ab7c6bd413613 Parents: f5b14fc Author: Eugene Koifman Authored: Wed Nov 21 15:06:13 2018 -0800 Committer: Eugene Koifman Committed: Wed Nov 21 15:06:13 2018 -0800 ---------------------------------------------------------------------- .../hive/hcatalog/streaming/TestStreaming.java | 34 ++- .../streaming/mutate/StreamingAssert.java | 14 +- .../apache/hadoop/hive/ql/TestAcidOnTez.java | 15 +- .../hive/ql/txn/compactor/TestCompactor.java | 51 ++-- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 1 + .../hadoop/hive/ql/io/AcidInputFormat.java | 29 +- .../hadoop/hive/ql/io/AcidOutputFormat.java | 17 ++ .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 188 +++++++++--- .../hive/ql/io/orc/OrcRawRecordMerger.java | 6 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 4 +- .../hadoop/hive/ql/parse/ReplicationSpec.java | 17 +- .../hadoop/hive/ql/parse/repl/dump/Utils.java | 2 + .../hadoop/hive/ql/txn/compactor/Cleaner.java | 283 +++++-------------- .../hive/ql/txn/compactor/CompactorMR.java | 36 ++- .../hive/ql/txn/compactor/CompactorThread.java | 7 +- .../hadoop/hive/ql/txn/compactor/Initiator.java | 10 +- .../hadoop/hive/ql/txn/compactor/Worker.java | 73 +++-- .../apache/hadoop/hive/ql/TestTxnCommands.java | 64 ++--- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 118 ++++---- .../apache/hadoop/hive/ql/TestTxnCommands3.java | 155 ++++++++-- .../hive/ql/TestTxnCommandsForMmTable.java | 34 +-- .../hadoop/hive/ql/TestTxnConcatenate.java | 12 +- .../org/apache/hadoop/hive/ql/TestTxnExIm.java | 6 +- .../apache/hadoop/hive/ql/TestTxnLoadData.java | 50 ++-- .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 78 ++--- .../hadoop/hive/ql/TxnCommandsBaseForTests.java | 10 +- .../hadoop/hive/ql/io/TestAcidInputFormat.java | 6 +- .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 41 ++- .../hive/ql/io/orc/TestInputOutputFormat.java | 18 ++ .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 16 +- .../TestVectorizedOrcAcidRowBatchReader.java | 11 + .../hive/ql/lockmgr/TestDbTxnManager2.java | 19 +- .../hive/ql/txn/compactor/CompactorTest.java | 2 +- .../hive/ql/txn/compactor/TestCleaner.java | 239 ---------------- .../hive/ql/txn/compactor/TestWorker.java | 157 +++++----- .../hive/metastore/txn/TxnCommonUtils.java | 2 +- .../metastore/txn/CompactionTxnHandler.java | 87 ++++-- .../hadoop/hive/metastore/txn/TxnStore.java | 10 + .../hadoop/hive/metastore/txn/TxnUtils.java | 45 +-- .../hive/metastore/TestHiveMetaStoreTxns.java | 57 ---- .../hive/common/ValidCompactorWriteIdList.java | 3 + .../hive/common/ValidReaderWriteIdList.java | 6 + .../apache/hive/streaming/TestStreaming.java | 48 ++-- 43 files changed, 1066 insertions(+), 1015 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 137323c..b290a40 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,6 +47,8 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.Validator; @@ -61,11 +64,13 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -407,13 +412,13 @@ public class TestStreaming { rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); - Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000")); Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); - Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000")); Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); - Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000")); } /** @@ -553,7 +558,7 @@ public class TestStreaming { @Deprecated private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { - ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); + ValidWriteIdList writeIds = getTransactionContext(conf); AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); @@ -587,6 +592,7 @@ public class TestStreaming { AcidUtils.setAcidOperationalProperties(job, true, null); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString()); + job.set(ValidTxnList.VALID_TXNS_KEY, conf.get(ValidTxnList.VALID_TXNS_KEY)); InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(numExpectedFiles, splits.length); org.apache.hadoop.mapred.RecordReader rr = @@ -606,8 +612,7 @@ public class TestStreaming { */ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, String validationQuery, boolean vectorize, String... records) throws Exception { - ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf)); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -649,9 +654,15 @@ public class TestStreaming { conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled); } + private ValidWriteIdList getTransactionContext(Configuration conf) throws Exception { + ValidTxnList validTxnList = msClient.getValidTxns(); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + List v = msClient.getValidWriteIds(Collections + .singletonList(TableName.getDbTable(dbName, tblName)), validTxnList.writeToString()); + return TxnCommonUtils.createValidReaderWriteIdList(v.get(0)); + } private void checkNothingWritten(Path partitionPath) throws Exception { - ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)); - AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf)); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -1234,8 +1245,7 @@ public class TestStreaming { /*now both batches have committed (but not closed) so we for each primary file we expect a side file to exist and indicate the true length of primary file*/ FileSystem fs = partLoc.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, - msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); + AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf)); for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); @@ -1260,7 +1270,7 @@ public class TestStreaming { //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0 //has now received more data(logically - it's buffered) but it is not yet committed. //lets check that side files exist, etc - dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName))); + dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf)); for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index 0edf1cd..78cae72 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -25,6 +25,8 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -32,7 +34,9 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -72,6 +76,7 @@ public class StreamingAssert { private IMetaStoreClient metaStoreClient; private Directory dir; private ValidWriteIdList writeIds; + private ValidTxnList validTxnList; private List currentDeltas; private long min; private long max; @@ -83,7 +88,13 @@ public class StreamingAssert { this.table = table; this.partition = partition; - writeIds = metaStoreClient.getValidWriteIds(AcidUtils.getFullTableName(table.getDbName(), table.getTableName())); + + validTxnList = metaStoreClient.getValidTxns(); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + List v = metaStoreClient.getValidWriteIds(Collections + .singletonList(TableName.getDbTable(table.getDbName(), table.getTableName())), validTxnList.writeToString()); + writeIds = TxnCommonUtils.createValidReaderWriteIdList(v.get(0)); + partitionLocation = getPartitionLocation(); dir = AcidUtils.getAcidState(partitionLocation, conf, writeIds); assertEquals(0, dir.getObsolete().size()); @@ -146,6 +157,7 @@ public class StreamingAssert { AcidUtils.setAcidOperationalProperties(job, true, null); job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString()); + job.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); InputSplit[] splits = inputFormat.getSplits(job, 1); assertEquals(numSplitsExpected, splits.length); http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 40dd992..d6a4191 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -608,11 +608,16 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h LOG.warn(s); } String[][] expected2 = { - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/t/base_-9223372036854775808/bucket_00000"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "warehouse/t/base_-9223372036854775808/bucket_00000"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_-9223372036854775808/bucket_00000"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8", "warehouse/t/base_-9223372036854775808/bucket_00000"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_-9223372036854775808/bucket_00000"} + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", + "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", + "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8", + "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", + "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"} }; Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size()); for(int i = 0; i < expected2.length; i++) { http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 9648645..beb36d7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -713,15 +713,15 @@ public class TestCompactor { Path resultFile = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000001_0000004")) { + if (names[i].equals("delta_0000001_0000004_v0000009")) { resultFile = stat[i].getPath(); } } Arrays.sort(names); String[] expected = new String[]{"delta_0000001_0000002", - "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + "delta_0000001_0000004_v0000009", "delta_0000003_0000004", "delta_0000005_0000006"}; if (!Arrays.deepEquals(expected, names)) { - Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names) + ",stat=" + toString(stat)); } checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); @@ -767,7 +767,7 @@ public class TestCompactor { Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - Assert.assertEquals(name, "base_0000004"); + Assert.assertEquals("base_0000004_v0000009", name); checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); } finally { if (connection != null) { @@ -858,13 +858,13 @@ public class TestCompactor { Path resultDelta = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000001_0000004")) { + if (names[i].equals("delta_0000001_0000004_v0000009")) { resultDelta = stat[i].getPath(); } } Arrays.sort(names); String[] expected = new String[]{"delta_0000001_0000002", - "delta_0000001_0000004", "delta_0000003_0000004"}; + "delta_0000001_0000004_v0000009", "delta_0000003_0000004"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } @@ -947,7 +947,7 @@ public class TestCompactor { Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - if (!name.equals("base_0000004")) { + if (!name.equals("base_0000004_v0000009")) { Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004"); } checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); @@ -1312,7 +1312,7 @@ public class TestCompactor { Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - Assert.assertEquals(name, "base_0000004"); + Assert.assertEquals("base_0000004_v0000009", name); checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 1, 1L, 4L, 2); if (connection1 != null) { connection1.close(); @@ -1368,12 +1368,12 @@ public class TestCompactor { Path minorCompactedDelta = null; for (int i = 0; i < deltas.length; i++) { deltas[i] = stat[i].getPath().getName(); - if (deltas[i].equals("delta_0000001_0000003")) { + if (deltas[i].equals("delta_0000001_0000003_v0000006")) { minorCompactedDelta = stat[i].getPath(); } } Arrays.sort(deltas); - String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003_v0000006", "delta_0000002_0000002_0000"}; if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); @@ -1388,12 +1388,12 @@ public class TestCompactor { Path minorCompactedDeleteDelta = null; for (int i = 0; i < deleteDeltas.length; i++) { deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) { + if (deleteDeltas[i].equals("delete_delta_0000001_0000003_v0000006")) { minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); } } Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"}; + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003_v0000006", "delete_delta_0000003_0000003_0000"}; if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } @@ -1446,12 +1446,12 @@ public class TestCompactor { Path minorCompactedDelta = null; for (int i = 0; i < deltas.length; i++) { deltas[i] = stat[i].getPath().getName(); - if (deltas[i].equals("delta_0000001_0000002")) { + if (deltas[i].equals("delta_0000001_0000002_v0000005")) { minorCompactedDelta = stat[i].getPath(); } } Arrays.sort(deltas); - String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002_v0000005", "delta_0000002_0000002_0000"}; if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); @@ -1466,12 +1466,12 @@ public class TestCompactor { Path minorCompactedDeleteDelta = null; for (int i = 0; i < deleteDeltas.length; i++) { deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) { + if (deleteDeltas[i].equals("delete_delta_0000001_0000002_v0000005")) { minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); } } Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"}; + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002_v0000005"}; if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } @@ -1537,13 +1537,13 @@ public class TestCompactor { Path resultFile = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000001_0000004")) { + if (names[i].equals("delta_0000001_0000004_v0000009")) { resultFile = stat[i].getPath(); } } Arrays.sort(names); String[] expected = new String[]{"delta_0000001_0000002", - "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + "delta_0000001_0000004_v0000009", "delta_0000003_0000004", "delta_0000005_0000006"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } @@ -1557,12 +1557,12 @@ public class TestCompactor { Path minorCompactedDeleteDelta = null; for (int i = 0; i < deleteDeltas.length; i++) { deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) { + if (deleteDeltas[i].equals("delete_delta_0000001_0000004_v0000009")) { minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); } } Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"}; + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004_v0000009"}; if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } @@ -1968,4 +1968,15 @@ public class TestCompactor { t.init(stop, looped); t.run(); } + private static String toString(FileStatus[] stat) { + StringBuilder sb = new StringBuilder("stat{"); + if(stat == null) { + return sb.toString(); + } + for(FileStatus f : stat) { + sb.append(f.getPath()).append(","); + } + sb.setCharAt(sb.length() - 1, '}'); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 28d61f9..5d6ae7f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -302,6 +302,7 @@ public class ReplDumpTask extends Task implements Serializable { String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) { + tuple.replicationSpec.setValidTxnList(validTxnList); tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList)); // For transactional table, data would be valid snapshot for current txn and doesn't include data http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 41007e2..bba3960 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -112,21 +112,26 @@ public interface AcidInputFormat private long minWriteId; private long maxWriteId; private List stmtIds; - //would be useful to have enum for Type: insert/delete/load data + /** + * {@link AcidUtils#?} + */ + private long visibilityTxnId; public DeltaMetaData() { - this(0,0,new ArrayList()); + this(0,0,new ArrayList(), 0); } /** * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition + * @param visibilityTxnId maybe 0, if the dir name didn't have it. txnid:0 is always visible */ - DeltaMetaData(long minWriteId, long maxWriteId, List stmtIds) { + DeltaMetaData(long minWriteId, long maxWriteId, List stmtIds, long visibilityTxnId) { this.minWriteId = minWriteId; this.maxWriteId = maxWriteId; if (stmtIds == null) { throw new IllegalArgumentException("stmtIds == null"); } this.stmtIds = stmtIds; + this.visibilityTxnId = visibilityTxnId; } long getMinWriteId() { return minWriteId; @@ -137,6 +142,9 @@ public interface AcidInputFormat List getStmtIds() { return stmtIds; } + long getVisibilityTxnId() { + return visibilityTxnId; + } @Override public void write(DataOutput out) throws IOException { out.writeLong(minWriteId); @@ -145,6 +153,7 @@ public interface AcidInputFormat for(Integer id : stmtIds) { out.writeInt(id); } + out.writeLong(visibilityTxnId); } @Override public void readFields(DataInput in) throws IOException { @@ -155,11 +164,21 @@ public interface AcidInputFormat for(int i = 0; i < numStatements; i++) { stmtIds.add(in.readInt()); } + visibilityTxnId = in.readLong(); + } + String getName() { + assert stmtIds.isEmpty() : "use getName(int)"; + return AcidUtils.addVisibilitySuffix(AcidUtils + .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId); + } + String getName(int stmtId) { + assert !stmtIds.isEmpty() : "use getName()"; + return AcidUtils.addVisibilitySuffix(AcidUtils + .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId); } @Override public String toString() { - //? is Type - when implemented - return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + ")"; + return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + "," + visibilityTxnId + ")"; } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index 05beafe..b45cc8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -51,6 +51,9 @@ public interface AcidOutputFormat extends HiveO private Reporter reporter; private long minimumWriteId; private long maximumWriteId; + /** + * actual bucketId (as opposed to bucket property via BucketCodec) + */ private int bucketId; /** * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)} @@ -61,9 +64,16 @@ public interface AcidOutputFormat extends HiveO private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id //unique within a transaction + /** + * todo: Link to AcidUtils? + */ private int statementId = 0; private Path finalDestination; /** + * todo: link to AcidUtils? + */ + private long visibilityTxnId = 0; + /** * Create the options object. * @param conf Use the given configuration */ @@ -252,6 +262,10 @@ public interface AcidOutputFormat extends HiveO this.finalDestination = p; return this; } + public Options visibilityTxnId(long visibilityTxnId) { + this.visibilityTxnId = visibilityTxnId; + return this; + } public Configuration getConfiguration() { return configuration; @@ -317,6 +331,9 @@ public interface AcidOutputFormat extends HiveO public Path getFinalDestination() { return finalDestination; } + public long getVisibilityTxnId() { + return visibilityTxnId; + } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 71e5131..d36b4d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -32,7 +32,7 @@ import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; -import org.apache.avro.generic.GenericData; +import com.google.common.base.Strings; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; @@ -57,7 +56,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; @@ -181,6 +179,8 @@ public class AcidUtils { return !name.startsWith("_") && !name.startsWith("."); } }; + public static final String VISIBILITY_PREFIX = "_v"; + public static final Pattern VISIBILITY_PATTERN = Pattern.compile(VISIBILITY_PREFIX + "\\d+"); private static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); @@ -294,8 +294,21 @@ public class AcidUtils { options.getMaximumWriteId(), options.getStatementId()); } + subdir = addVisibilitySuffix(subdir, options.getVisibilityTxnId()); return createBucketFile(new Path(directory, subdir), options.getBucketId()); } + + /** + * Since Hive 4.0, compactor produces directories with {@link #VISIBILITY_PATTERN} suffix. + * _v0 is equivalent to no suffix, for backwards compatibility. + */ + static String addVisibilitySuffix(String baseOrDeltaDir, long visibilityTxnId) { + if(visibilityTxnId == 0) { + return baseOrDeltaDir; + } + return baseOrDeltaDir + VISIBILITY_PREFIX + + String.format(DELTA_DIGITS, visibilityTxnId); + } /** * Represents bucketId and copy_N suffix */ @@ -340,20 +353,6 @@ public class AcidUtils { } } /** - * Get the write id from a base directory name. - * @param path the base directory name - * @return the maximum write id that is included - */ - public static long parseBase(Path path) { - String filename = path.getName(); - if (filename.startsWith(BASE_PREFIX)) { - return Long.parseLong(filename.substring(BASE_PREFIX.length())); - } - throw new IllegalArgumentException(filename + " does not start with " + - BASE_PREFIX); - } - - /** * Get the bucket id from the file path * @param bucketFile - bucket file path * @return - bucket id @@ -405,7 +404,7 @@ public class AcidUtils { result .setOldStyle(false) .minimumWriteId(0) - .maximumWriteId(parseBase(bucketFile.getParent())) + .maximumWriteId(ParsedBase.parseBase(bucketFile.getParent()).getWriteId()) .bucket(bucket) .writingBase(true); } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) { @@ -443,11 +442,12 @@ public class AcidUtils { public DirectoryImpl(List abortedDirectories, boolean isBaseInRawFormat, List original, List obsolete, List deltas, Path base) { - this.abortedDirectories = abortedDirectories; + this.abortedDirectories = abortedDirectories == null ? + Collections.emptyList() : abortedDirectories; this.isBaseInRawFormat = isBaseInRawFormat; - this.original = original; - this.obsolete = obsolete; - this.deltas = deltas; + this.original = original == null ? Collections.emptyList() : original; + this.obsolete = obsolete == null ? Collections.emptyList() : obsolete; + this.deltas = deltas == null ? Collections.emptyList() : deltas; this.base = base; } @@ -757,7 +757,45 @@ public class AcidUtils { */ List getAbortedDirectories(); } - + /** + * Since version 3 but prior to version 4, format of a base is "base_X" where X is a writeId. + * If this base was produced by a compactor, X is the highest writeId that the compactor included. + * If this base is produced by Insert Overwrite stmt, X is a writeId of the transaction that + * executed the insert. + * Since Hive Version 4.0, the format of a base produced by a compactor is + * base_X_vY. X is like before, i.e. the highest writeId compactor included and Y is the + * visibilityTxnId of the transaction in which the compactor ran. + * (v(isibility) is a literal to help parsing). + */ + public static final class ParsedBase { + private final long writeId; + private final long visibilityTxnId; + ParsedBase(long writeId) { + this(writeId, 0); + } + ParsedBase(long writeId, long visibilityTxnId) { + this.writeId = writeId; + this.visibilityTxnId = visibilityTxnId; + } + public long getWriteId() { + return writeId; + } + public long getVisibilityTxnId() { + return visibilityTxnId; + } + public static ParsedBase parseBase(Path path) { + String filename = path.getName(); + if(!filename.startsWith(BASE_PREFIX)) { + throw new IllegalArgumentException(filename + " does not start with " + BASE_PREFIX); + } + int idxOfv = filename.indexOf(VISIBILITY_PREFIX); + if(idxOfv < 0) { + return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length()))); + } + return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length(), idxOfv)), + Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length()))); + } + } /** * Immutable */ @@ -770,16 +808,23 @@ public class AcidUtils { private final int statementId; private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' private final boolean isRawFormat; - + /** + * transaction Id of txn which created this delta. This dir should be considered + * invisible unless this txn is committed + * + * TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments + * use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc + */ + private final long visibilityTxnId; /** * for pre 1.3.x delta files */ private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta, - boolean isRawFormat) { - this(min, max, path, -1, isDeleteDelta, isRawFormat); + boolean isRawFormat, long visibilityTxnId) { + this(min, max, path, -1, isDeleteDelta, isRawFormat, visibilityTxnId); } private ParsedDelta(long min, long max, FileStatus path, int statementId, - boolean isDeleteDelta, boolean isRawFormat) { + boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) { this.minWriteId = min; this.maxWriteId = max; this.path = path; @@ -787,6 +832,7 @@ public class AcidUtils { this.isDeleteDelta = isDeleteDelta; this.isRawFormat = isRawFormat; assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format"; + this.visibilityTxnId = visibilityTxnId; } public long getMinWriteId() { @@ -814,6 +860,9 @@ public class AcidUtils { public boolean isRawFormat() { return isRawFormat; } + public long getVisibilityTxnId() { + return visibilityTxnId; + } /** * Compactions (Major/Minor) merge deltas/bases but delete of old files * happens in a different process; thus it's possible to have bases/deltas with @@ -870,6 +919,7 @@ public class AcidUtils { } /** + * todo: rename serializeDeleteDelta()? * Convert the list of deltas into an equivalent list of begin/end * write id pairs. Assumes {@code deltas} is sorted. * @param deltas @@ -879,6 +929,7 @@ public class AcidUtils { List result = new ArrayList<>(deltas.size()); AcidInputFormat.DeltaMetaData last = null; for (ParsedDelta parsedDelta : deltas) { + assert parsedDelta.isDeleteDelta() : "expected delete_delta, got " + parsedDelta.getPath(); if ((last != null) && (last.getMinWriteId() == parsedDelta.getMinWriteId()) && (last.getMaxWriteId() == parsedDelta.getMaxWriteId())) { @@ -886,7 +937,7 @@ public class AcidUtils { continue; } last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinWriteId(), - parsedDelta.getMaxWriteId(), new ArrayList()); + parsedDelta.getMaxWriteId(), new ArrayList<>(), parsedDelta.getVisibilityTxnId()); result.add(last); if (parsedDelta.statementId >= 0) { last.getStmtIds().add(parsedDelta.getStatementId()); @@ -905,14 +956,14 @@ public class AcidUtils { * @return the list of delta paths */ public static Path[] deserializeDeleteDeltas(Path root, final List deleteDeltas) throws IOException { - List results = new ArrayList(deleteDeltas.size()); + List results = new ArrayList<>(deleteDeltas.size()); for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) { if(dmd.getStmtIds().isEmpty()) { - results.add(new Path(root, deleteDeltaSubdir(dmd.getMinWriteId(), dmd.getMaxWriteId()))); + results.add(new Path(root, dmd.getName())); continue; } for(Integer stmtId : dmd.getStmtIds()) { - results.add(new Path(root, deleteDeltaSubdir(dmd.getMinWriteId(), dmd.getMaxWriteId(), stmtId))); + results.add(new Path(root, dmd.getName(stmtId))); } } return results.toArray(new Path[results.size()]); @@ -936,7 +987,7 @@ public class AcidUtils { ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); return new ParsedDelta(p.getMinWriteId(), - p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat()); + p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId); } public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs) @@ -959,6 +1010,12 @@ public class AcidUtils { */ public static ParsedDelta parsedDelta(Path deltaDir, boolean isRawFormat) { String filename = deltaDir.getName(); + int idxOfVis = filename.indexOf(VISIBILITY_PREFIX); + long visibilityTxnId = 0;//visibilityTxnId:0 is always visible + if(idxOfVis >= 0) { + visibilityTxnId = Long.parseLong(filename.substring(idxOfVis + VISIBILITY_PREFIX.length())); + filename = filename.substring(0, idxOfVis); + } boolean isDeleteDelta = filename.startsWith(DELETE_DELTA_PREFIX); //make sure it's null for delete delta no matter what was passed in - this //doesn't apply to delete deltas @@ -966,18 +1023,18 @@ public class AcidUtils { String rest = filename.substring((isDeleteDelta ? DELETE_DELTA_PREFIX : DELTA_PREFIX).length()); int split = rest.indexOf('_'); - //may be -1 if no statementId + //split2 may be -1 if no statementId int split2 = rest.indexOf('_', split + 1); long min = Long.parseLong(rest.substring(0, split)); long max = split2 == -1 ? Long.parseLong(rest.substring(split + 1)) : Long.parseLong(rest.substring(split + 1, split2)); if(split2 == -1) { - return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat); + return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat, visibilityTxnId); } int statementId = Integer.parseInt(rest.substring(split2 + 1)); return new ParsedDelta(min, max, null, statementId, isDeleteDelta, - isRawFormat); + isRawFormat, visibilityTxnId); } @@ -1046,6 +1103,23 @@ public class AcidUtils { Ref useFileIds, boolean ignoreEmptyFiles, Map tblproperties) throws IOException { + ValidTxnList validTxnList = null; + String s = conf.get(ValidTxnList.VALID_TXNS_KEY); + if(!Strings.isNullOrEmpty(s)) { + /** + * getAcidState() is sometimes called on non-transactional tables, e.g. + * OrcInputFileFormat.FileGenerator.callInternal(). e.g. orc_merge3.q In that case + * writeIdList is bogus - doesn't even have a table name. + * see https://issues.apache.org/jira/browse/HIVE-20856. + * + * For now, assert that ValidTxnList.VALID_TXNS_KEY is set only if this is really a read + * of a transactional table. + * see {@link #getChildState(FileStatus, HdfsFileStatusWithId, ValidWriteIdList, List, List, List, List, TxnBase, boolean, List, Map, FileSystem, ValidTxnList)} + */ + validTxnList = new ValidReadTxnList(); + validTxnList.readFromString(s); + } + FileSystem fs = directory.getFileSystem(conf); // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); @@ -1073,13 +1147,13 @@ public class AcidUtils { if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, - obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); + obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, - bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); + bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } } @@ -1219,7 +1293,7 @@ public class AcidUtils { ValidWriteIdList writeIdList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, Map tblproperties, - FileSystem fs) throws IOException { + FileSystem fs, ValidTxnList validTxnList) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (!child.isDirectory()) { @@ -1229,7 +1303,11 @@ public class AcidUtils { return; } if (fn.startsWith(BASE_PREFIX)) { - long writeId = parseBase(p); + ParsedBase parsedBase = ParsedBase.parseBase(p); + if(!isDirUsable(child, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { + return; + } + long writeId = parsedBase.getWriteId(); if(bestBase.oldestBaseWriteId > writeId) { //keep track for error reporting bestBase.oldestBase = p; @@ -1252,13 +1330,14 @@ public class AcidUtils { } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix, fs); - // Handle aborted deltas. Currently this can only happen for MM tables. - if (tblproperties != null && isTransactionalTable(tblproperties) && - ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted( - delta.minWriteId, delta.maxWriteId)) { + if(!isDirUsable(child, delta.getVisibilityTxnId(), aborted, validTxnList)) { + return; + } + if(ValidWriteIdList.RangeResponse.ALL == + writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { aborted.add(child); } - if (writeIdList.isWriteIdRangeValid( + else if (writeIdList.isWriteIdRangeValid( delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { working.add(delta); } @@ -1270,7 +1349,24 @@ public class AcidUtils { originalDirectories.add(child); } } - + /** + * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot + */ + private static boolean isDirUsable(FileStatus child, long visibilityTxnId, + List aborted, ValidTxnList validTxnList) { + if(validTxnList == null) { + throw new IllegalArgumentException("No ValidTxnList for " + child.getPath()); + } + if(!validTxnList.isTxnValid(visibilityTxnId)) { + boolean isAborted = validTxnList.isTxnAborted(visibilityTxnId); + if(isAborted) { + aborted.add(child);//so we can clean it up + } + LOG.debug("getChildState() ignoring(" + aborted + ") " + child); + return false; + } + return true; + } public static HdfsFileStatusWithId createOriginalObj( HdfsFileStatusWithId childWithId, FileStatus child) { return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child); @@ -1818,7 +1914,7 @@ public class AcidUtils { } public static String getFullTableName(String dbName, String tableName) { - return dbName.toLowerCase() + "." + tableName.toLowerCase(); + return TableName.getDbTable(dbName.toLowerCase(), tableName.toLowerCase()); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 8cabf96..fbb931c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1035,7 +1035,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ Options readerPairOptions = mergerOptions; if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) { readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, - AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir()); + AcidUtils.ParsedBase.parseBase(mergerOptions.getBaseDir()).getWriteId(), + mergerOptions.getBaseDir()); } pair = new OriginalReaderPairToCompact(baseKey, bucket, options, readerPairOptions, conf, validWriteIdList, @@ -1223,7 +1224,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader{ boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX); if(isBase || isDelta) { if(isBase) { - return new TransactionMetaData(AcidUtils.parseBase(parent), parent); + return new TransactionMetaData(AcidUtils.ParsedBase.parseBase(parent).getWriteId(), + parent); } else { AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX, http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 374e973..46c51eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -403,13 +403,13 @@ public final class DbTxnManager extends HiveTxnManagerImpl { // Make sure we need locks. It's possible there's nothing to lock in // this operation. if(plan.getInputs().isEmpty() && plan.getOutputs().isEmpty()) { - LOG.debug("No locks needed for queryId" + queryId); + LOG.debug("No locks needed for queryId=" + queryId); return null; } List lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf); //It's possible there's nothing to lock even if we have w/r entities. if(lockComponents.isEmpty()) { - LOG.debug("No locks needed for queryId" + queryId); + LOG.debug("No locks needed for queryId=" + queryId); return null; } rqstBuilder.addLockComponents(lockComponents); http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 7d901f9..3115e83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -44,6 +44,8 @@ public class ReplicationSpec { private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. private boolean isReplace = true; // default is that the import mode is insert overwrite private String validWriteIdList = null; // WriteIds snapshot for replicating ACID/MM tables. + //TxnIds snapshot + private String validTxnList = null; private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT // Key definitions related to replication @@ -54,7 +56,8 @@ public class ReplicationSpec { NOOP("repl.noop"), LAZY("repl.lazy"), IS_REPLACE("repl.is.replace"), - VALID_WRITEID_LIST("repl.valid.writeid.list") + VALID_WRITEID_LIST("repl.valid.writeid.list"), + VALID_TXN_LIST("repl.valid.txnid.list") ; private final String keyName; @@ -143,6 +146,7 @@ public class ReplicationSpec { this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString())); this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString())); this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString()); + this.validTxnList = keyFetcher.apply(KEY.VALID_TXN_LIST.toString()); } /** @@ -342,6 +346,15 @@ public class ReplicationSpec { this.validWriteIdList = validWriteIdList; } + public String getValidTxnList() { + return validTxnList; + } + + public void setValidTxnList(String validTxnList) { + this.validTxnList = validTxnList; + } + + /** * @return whether the current replication dumped object related to ACID/Mm table */ @@ -372,6 +385,8 @@ public class ReplicationSpec { return String.valueOf(isReplace()); case VALID_WRITEID_LIST: return getValidWriteIdList(); + case VALID_TXN_LIST: + return getValidTxnList(); } return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 59ffb90..83a9642 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -209,6 +210,7 @@ public class Utils { throws IOException { if (replicationSpec.isTransactionalTableDump()) { try { + conf.set(ValidTxnList.VALID_TXNS_KEY, replicationSpec.getValidTxnList()); return AcidUtils.getValidDataPaths(fromPath, conf, replicationSpec.getValidWriteIdList()); } catch (FileNotFoundException e) { throw new IOException(ErrorMsg.FILE_NOT_FOUND.format(e.getMessage()), e); http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 3bc1f8a..18253c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,10 +17,17 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -31,9 +38,6 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Database; @@ -45,15 +49,8 @@ import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,9 +65,6 @@ public class Cleaner extends CompactorThread { private long cleanerCheckInterval = 0; private ReplChangeManager replChangeManager; - // List of compactions to clean. - private Map> compactId2LockMap = new HashMap<>(); - private Map compactId2CompactInfoMap = new HashMap<>(); @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { @@ -97,95 +91,9 @@ public class Cleaner extends CompactorThread { try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); startedAt = System.currentTimeMillis(); - // First look for all the compactions that are waiting to be cleaned. If we have not - // seen an entry before, look for all the locks held on that table or partition and - // record them. We will then only clean the partition once all of those locks have been - // released. This way we avoid removing the files while they are in use, - // while at the same time avoiding starving the cleaner as new readers come along. - // This works because we know that any reader who comes along after the worker thread has - // done the compaction will read the more up to date version of the data (either in a - // newer delta or in a newer base). - List toClean = txnHandler.findReadyToClean(); - { - /** - * Since there may be more than 1 instance of Cleaner running we may have state info - * for items which were cleaned by instances. Here we remove them. - * - * In the long run if we add end_time to compaction_queue, then we can check that - * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case - * we know the lock owner is reading files created by this compaction or later. - * The advantage is that we don't have to store the locks. - */ - Set currentToCleanSet = new HashSet<>(); - for (CompactionInfo ci : toClean) { - currentToCleanSet.add(ci.id); - } - Set cleanPerformedByOthers = new HashSet<>(); - for (long id : compactId2CompactInfoMap.keySet()) { - if (!currentToCleanSet.contains(id)) { - cleanPerformedByOthers.add(id); - } - } - for (long id : cleanPerformedByOthers) { - compactId2CompactInfoMap.remove(id); - compactId2LockMap.remove(id); - } - } - if (toClean.size() > 0 || compactId2LockMap.size() > 0) { - ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); - if(LOG.isDebugEnabled()) { - dumpLockState(locksResponse); - } - for (CompactionInfo ci : toClean) { - // Check to see if we have seen this request before. If so, ignore it. If not, - // add it to our queue. - if (!compactId2LockMap.containsKey(ci.id)) { - compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse)); - compactId2CompactInfoMap.put(ci.id, ci); - } - } - - // Now, for each entry in the queue, see if all of the associated locks are clear so we - // can clean - Set currentLocks = buildCurrentLockSet(locksResponse); - List expiredLocks = new ArrayList(); - List compactionsCleaned = new ArrayList(); - try { - for (Map.Entry> queueEntry : compactId2LockMap.entrySet()) { - boolean sawLock = false; - for (Long lockId : queueEntry.getValue()) { - if (currentLocks.contains(lockId)) { - sawLock = true; - break; - } else { - expiredLocks.add(lockId); - } - } - - if (!sawLock) { - // Remember to remove this when we're out of the loop, - // we can't do it in the loop or we'll get a concurrent modification exception. - compactionsCleaned.add(queueEntry.getKey()); - //Future thought: this may be expensive so consider having a thread pool run in parallel - clean(compactId2CompactInfoMap.get(queueEntry.getKey())); - } else { - // Remove the locks we didn't see so we don't look for them again next time - for (Long lockId : expiredLocks) { - queueEntry.getValue().remove(lockId); - } - LOG.info("Skipping cleaning of " + - idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) + - " due to reader present: " + queueEntry.getValue()); - } - } - } finally { - if (compactionsCleaned.size() > 0) { - for (Long compactId : compactionsCleaned) { - compactId2LockMap.remove(compactId); - compactId2CompactInfoMap.remove(compactId); - } - } - } + long minOpenTxnId = txnHandler.findMinOpenTxnId(); + for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + clean(compactionInfo, minOpenTxnId); } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + @@ -213,41 +121,7 @@ public class Cleaner extends CompactorThread { } while (!stop.get()); } - private Set findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { - Set relatedLocks = new HashSet(); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - /** - * Hive QL is not case sensitive wrt db/table/column names - * Partition names get - * normalized (as far as I can tell) by lower casing column name but not partition value. - * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)} - * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)} - * Since user input may start out in any case, compare here case-insensitive for db/table - * but leave partition name as is. - */ - if (ci.dbname.equalsIgnoreCase(lock.getDbname())) { - if ((ci.tableName == null && lock.getTablename() == null) || - (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) { - if ((ci.partName == null && lock.getPartname() == null) || - (ci.partName != null && ci.partName.equals(lock.getPartname()))) { - relatedLocks.add(lock.getLockid()); - } - } - } - } - - return relatedLocks; - } - - private Set buildCurrentLockSet(ShowLocksResponse locksResponse) { - Set currentLocks = new HashSet(locksResponse.getLocks().size()); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - currentLocks.add(lock.getLockid()); - } - return currentLocks; - } - - private void clean(CompactionInfo ci) throws MetaException { + private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { LOG.info("Starting cleaning for " + ci); try { Table t = resolveTable(ci); @@ -271,27 +145,52 @@ public class Cleaner extends CompactorThread { } StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); - + ValidTxnList validTxnList = + TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB); + //save it so that getAcidState() sees it + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); /** - * Each Compaction only compacts as far as the highest txn id such that all txns below it - * are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked - * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorWriteIdList and uses for more info. + * {@code validTxnList} is capped by minOpenTxnGLB so if + * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta + * produced by a compactor, that means every reader that could be active right now see it + * as well. That means if this base/delta shadows some earlier base/delta, the it will be + * used in favor of any files that it shadows. Thus the shadowed files are safe to delete. * - * We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from - * under an active reader. * - * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a - * clean request for D2. - * Cleaner checks existing locks and finds none. - * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction - * completes which creates D4. - * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete - * unless ValidWriteIdList is "capped" at highestWriteId. + * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted + * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID. + * See {@link TxnStore#markCleaned(CompactionInfo)} for details. + * For example given partition P1, txnid:150 starts and sees txnid:149 as open. + * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved + * writeId:17. Compactor will produce base_17_c160. + * Suppose txnid:149 writes delta_18_18 + * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries + * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but + * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by + * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the + * metadata that says that 18 is aborted. + * In a slightly different case, whatever txn created delta_18 (and all other txn) may have + * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove + * it (since it has nothing but aborted data). But we can't tell which actually happened + * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID. + * + * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up + * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed + * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be + * useful if there is all of a sudden a flood of aborted txns. (For another day). */ - final ValidWriteIdList validWriteIdList = (ci.highestWriteId > 0) - ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), - ci.highestWriteId) - : new ValidReaderWriteIdList(); + List tblNames = Collections.singletonList( + TableName.getDbTable(t.getDbName(), t.getTableName())); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames); + rqst.setValidTxnList(validTxnList.writeToString()); + GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst); + //we could have no write IDs for a table if it was never written to but + // since we are in the Cleaner phase of compactions, there must have + // been some delta/base dirs + assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; + //Creating 'reader' list since we are interested in the set of 'obsolete' files + ValidReaderWriteIdList validWriteIdList = + TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)); if (runJobAsSelf(ci.runAs)) { removeFiles(location, validWriteIdList, ci); @@ -328,7 +227,17 @@ public class Cleaner extends CompactorThread { Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List obsoleteDirs = dir.getObsolete(); - List filesToDelete = new ArrayList(obsoleteDirs.size()); + /** + * add anything in 'dir' that only has data from aborted transactions - no one should be + * trying to read anything in that dir (except getAcidState() that only reads the name of + * this dir itself) + * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there + * are no active txns when cleaner runs). The key is to not delete metadata about aborted + * txns with write IDs > {@link CompactionInfo#highestWriteId}. + * See {@link TxnStore#markCleaned(CompactionInfo)} + */ + obsoleteDirs.addAll(dir.getAbortedDirectories()); + List filesToDelete = new ArrayList<>(obsoleteDirs.size()); StringBuilder extraDebugInfo = new StringBuilder("["); for (FileStatus stat : obsoleteDirs) { filesToDelete.add(stat.getPath()); @@ -338,9 +247,6 @@ public class Cleaner extends CompactorThread { } } extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); - List compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet()); - Collections.sort(compactIds); - extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")"); LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); if (filesToDelete.size() < 1) { @@ -361,63 +267,4 @@ public class Cleaner extends CompactorThread { fs.delete(dead, true); } } - private static class LockComparator implements Comparator { - //sort ascending by resource, nulls first - @Override - public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) { - if(o1 == o2) { - return 0; - } - if(o1 == null) { - return -1; - } - if(o2 == null) { - return 1; - } - int v = o1.getDbname().compareToIgnoreCase(o2.getDbname()); - if(v != 0) { - return v; - } - if(o1.getTablename() == null) { - return -1; - } - if(o2.getTablename() == null) { - return 1; - } - v = o1.getTablename().compareToIgnoreCase(o2.getTablename()); - if(v != 0) { - return v; - } - if(o1.getPartname() == null) { - return -1; - } - if(o2.getPartname() == null) { - return 1; - } - v = o1.getPartname().compareToIgnoreCase(o2.getPartname()); - if(v != 0) { - return v; - } - //if still equal, compare by lock ids - v = Long.compare(o1.getLockid(), o2.getLockid()); - if(v != 0) { - return v; - } - return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal()); - - } - } - private void dumpLockState(ShowLocksResponse slr) { - Iterator l = slr.getLocksIterator(); - List sortedList = new ArrayList<>(); - while(l.hasNext()) { - sortedList.add(l.next()); - } - //sort for readability - sortedList.sort(new LockComparator()); - LOG.info("dumping locks"); - for(ShowLocksResponseElement lock : sortedList) { - LOG.info(lock.toString()); - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 92c74e1..c6cb7c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -42,14 +42,14 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; -import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; @@ -59,10 +59,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.DriverUtils; -import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -74,8 +71,6 @@ import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; @@ -100,6 +95,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.Ref; +import org.apache.parquet.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -351,6 +347,7 @@ public class CompactorMR { // Set up the session for driver. conf = new HiveConf(conf); conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + conf.unset(ValidTxnList.VALID_TXNS_KEY);//so Driver doesn't get confused String user = UserGroupInformation.getCurrentUser().getShortUserName(); SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false); @@ -599,7 +596,6 @@ public class CompactorMR { * to use. * @param job the job to update * @param cols the columns of the table - * @param map */ private void setColumnTypes(JobConf job, List cols) { StringBuilder colNames = new StringBuilder(); @@ -1006,7 +1002,17 @@ public class CompactorMR { deleteEventWriter.close(false); } } - + private long getCompactorTxnId() { + String snapshot = jobConf.get(ValidTxnList.VALID_TXNS_KEY); + if(Strings.isNullOrEmpty(snapshot)) { + throw new IllegalStateException(ValidTxnList.VALID_TXNS_KEY + " not found for writing to " + + jobConf.get(FINAL_LOCATION)); + } + ValidTxnList validTxnList = new ValidReadTxnList(); + validTxnList.readFromString(snapshot); + //this is id of the current txn + return validTxnList.getHighWatermark(); + } private void getWriter(Reporter reporter, ObjectInspector inspector, int bucket) throws IOException { if (writer == null) { @@ -1019,7 +1025,8 @@ public class CompactorMR { .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(bucket) - .statementId(-1);//setting statementId == -1 makes compacted delta files use + .statementId(-1)//setting statementId == -1 makes compacted delta files use + .visibilityTxnId(getCompactorTxnId()); //delta_xxxx_yyyy format // Instantiate the underlying output format @@ -1044,8 +1051,9 @@ public class CompactorMR { .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(bucket) - .statementId(-1);//setting statementId == -1 makes compacted delta files use - //delta_xxxx_yyyy format + .statementId(-1)//setting statementId == -1 makes compacted delta files use + // delta_xxxx_yyyy format + .visibilityTxnId(getCompactorTxnId()); // Instantiate the underlying output format @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class @@ -1178,7 +1186,7 @@ public class CompactorMR { FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir //name is that we want to rename; leave it for another day - // TODO: if we expect one dir why don't we enforce it? + //todo: may actually have delta_x_y and delete_delta_x_y for (FileStatus fileStatus : contents) { //newPath is the base/delta dir Path newPath = new Path(finalLocation, fileStatus.getPath().getName()); @@ -1218,6 +1226,8 @@ public class CompactorMR { ValidWriteIdList actualWriteIds) throws IOException { Path fromPath = new Path(from), toPath = new Path(to); FileSystem fs = fromPath.getFileSystem(conf); + //todo: is that true? can it be aborted? does it matter for compaction? probably OK since + //getAcidState() doesn't check if X is valid in base_X_cY for compacted base dirs. // Assume the high watermark can be used as maximum transaction ID. long maxTxn = actualWriteIds.getHighWatermark(); AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index dd0929f..f5b901d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -51,7 +51,7 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa /** * Superclass for all threads in the compactor. */ -abstract class CompactorThread extends Thread implements MetaStoreThread { +public abstract class CompactorThread extends Thread implements MetaStoreThread { static final private String CLASS_NAME = CompactorThread.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); @@ -67,8 +67,9 @@ abstract class CompactorThread extends Thread implements MetaStoreThread { // TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to call other methods with // it. This should be changed to Configuration once everything that this calls that requires // HiveConf is moved to the standalone metastore. - conf = (configuration instanceof HiveConf) ? (HiveConf)configuration : - new HiveConf(configuration, HiveConf.class); + //clone the conf - compactor needs to set properties in it which we don't + // want to bleed into the caller + conf = new HiveConf(configuration, HiveConf.class); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index beb6902..cdcc0e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionRequest; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -143,13 +145,15 @@ public class Initiator extends CompactorThread { ", assuming it has been dropped and moving on."); continue; } - - // Compaction doesn't work under a transaction and hence pass null for validTxnList + ValidTxnList validTxnList = TxnCommonUtils + .createValidReadTxnList(txnHandler.getOpenTxns(), 0); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); // The response will have one entry per table and hence we get only one ValidWriteIdList String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); - ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( + rqst.setValidTxnList(validTxnList.writeToString()); + final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); StorageDescriptor sd = resolveStorageDescriptor(t, p);