Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D16E9200BC0 for ; Tue, 15 Nov 2016 21:20:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CFE84160B1E; Tue, 15 Nov 2016 20:20:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D7F2F160B19 for ; Tue, 15 Nov 2016 21:20:39 +0100 (CET) Received: (qmail 52702 invoked by uid 500); 15 Nov 2016 20:20:34 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 51840 invoked by uid 99); 15 Nov 2016 20:20:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 20:20:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DECA5EF9A9; Tue, 15 Nov 2016 20:20:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Tue, 15 Nov 2016 20:21:00 -0000 Message-Id: In-Reply-To: <5971ac757d5c46249bed5820e3e14550@git.apache.org> References: <5971ac757d5c46249bed5820e3e14550@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/50] [abbrv] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates) archived-at: Tue, 15 Nov 2016 20:20:42 -0000 http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 4e1122d..637a01a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -45,6 +46,7 @@ import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -99,6 +101,7 @@ public class TestDbTxnManager2 { } @Test public void testLocksInSubquery() throws Exception { + dropTable(new String[] {"T","S", "R"}); checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)")); checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); @@ -132,6 +135,7 @@ public class TestDbTxnManager2 { } @Test public void createTable() throws Exception { + dropTable(new String[] {"T"}); CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)"); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); @@ -143,6 +147,7 @@ public class TestDbTxnManager2 { } @Test public void insertOverwriteCreate() throws Exception { + dropTable(new String[] {"T2", "T3"}); CommandProcessorResponse cpr = driver.run("create table if not exists T2(a int)"); checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists T3(a int)"); @@ -163,6 +168,7 @@ public class TestDbTxnManager2 { } @Test public void insertOverwritePartitionedCreate() throws Exception { + dropTable(new String[] {"T4"}); CommandProcessorResponse cpr = driver.run("create table if not exists T4 (name string, gpa double) partitioned by (age int)"); checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists T5(name string, age int, gpa double)"); @@ -183,6 +189,7 @@ public class TestDbTxnManager2 { } @Test public void basicBlocking() throws Exception { + dropTable(new String[] {"T6"}); CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)"); checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("select a from T6"); @@ -213,6 +220,7 @@ public class TestDbTxnManager2 { } @Test public void lockConflictDbTable() throws Exception { + dropTable(new String[] {"temp.T7"}); CommandProcessorResponse cpr = driver.run("create database if not exists temp"); checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -240,6 +248,7 @@ public class TestDbTxnManager2 { } @Test public void updateSelectUpdate() throws Exception { + dropTable(new String[] {"T8"}); CommandProcessorResponse cpr = driver.run("create table T8(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("delete from T8 where b = 89"); @@ -273,6 +282,7 @@ public class TestDbTxnManager2 { @Test public void testLockRetryLimit() throws Exception { + dropTable(new String[] {"T9"}); conf.setIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES, 2); conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true); HiveTxnManager otherTxnMgr = new DbTxnManager(); @@ -309,6 +319,7 @@ public class TestDbTxnManager2 { */ @Test public void testLockBlockedBy() throws Exception { + dropTable(new String[] {"TAB_BLOCKED"}); CommandProcessorResponse cpr = driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("select * from TAB_BLOCKED"); @@ -330,6 +341,7 @@ public class TestDbTxnManager2 { @Test public void testDummyTxnManagerOnAcidTable() throws Exception { + dropTable(new String[] {"T10", "T11"}); // Create an ACID table with DbTxnManager CommandProcessorResponse cpr = driver.run("create table T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -389,6 +401,8 @@ public class TestDbTxnManager2 { */ @Test public void testMetastoreTablesCleanup() throws Exception { + dropTable(new String[] {"temp.T10", "temp.T11", "temp.T12p", "temp.T13p"}); + CommandProcessorResponse cpr = driver.run("create database if not exists temp"); checkCmdOnDriver(cpr); @@ -569,6 +583,7 @@ public class TestDbTxnManager2 { */ @Test public void checkExpectedLocks() throws Exception { + dropTable(new String[] {"acidPart", "nonAcidPart"}); CommandProcessorResponse cpr = null; cpr = driver.run("create table acidPart(a int, b int) partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -640,8 +655,7 @@ public class TestDbTxnManager2 { */ @Test public void checkExpectedLocks2() throws Exception { - checkCmdOnDriver(driver.run("drop table if exists tab_acid")); - checkCmdOnDriver(driver.run("drop table if exists tab_not_acid")); + dropTable(new String[] {"tab_acid", "tab_not_acid"}); checkCmdOnDriver(driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.run("create table if not exists tab_not_acid (na int, nb int) partitioned by (np string) " + @@ -676,7 +690,7 @@ public class TestDbTxnManager2 { } /** The list is small, and the object is generated, so we don't use sets/equals/etc. */ - public static void checkLock(LockType expectedType, LockState expectedState, String expectedDb, + public static ShowLocksResponseElement checkLock(LockType expectedType, LockState expectedState, String expectedDb, String expectedTable, String expectedPartition, List actuals) { for (ShowLocksResponseElement actual : actuals) { if (expectedType == actual.getType() && expectedState == actual.getState() @@ -684,11 +698,12 @@ public class TestDbTxnManager2 { && StringUtils.equals(normalizeCase(expectedTable), normalizeCase(actual.getTablename())) && StringUtils.equals( normalizeCase(expectedPartition), normalizeCase(actual.getPartname()))) { - return; + return actual; } } Assert.fail("Could't find {" + expectedType + ", " + expectedState + ", " + expectedDb + ", " + expectedTable + ", " + expectedPartition + "} in " + actuals); + throw new IllegalStateException("How did it get here?!"); } @Test @@ -878,6 +893,7 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking1() throws Exception { + dropTable(new String[] {"TAB_PART"}); CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -893,11 +909,17 @@ public class TestDbTxnManager2 { txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas"); txnMgr2.commitTxn(); } + private void dropTable(String[] tabs) throws Exception { + for(String tab : tabs) { + driver.run("drop table if exists " + tab); + } + } /** * txns overlap in time but do not update same resource - no conflict */ @Test public void testWriteSetTracking2() throws Exception { + dropTable(new String[] {"TAB_PART", "TAB2"}); CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -919,33 +941,42 @@ public class TestDbTxnManager2 { txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine"); txnMgr2.commitTxn(); } - /** * txns overlap and update the same resource - can't commit 2nd txn */ @Test public void testWriteSetTracking3() throws Exception { + dropTable(new String[] {"TAB_PART"}); CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr.openTxn("Known"); - txnMgr2.openTxn("Unknown"); + long txnId = txnMgr.openTxn("Known"); + long txnId2 = txnMgr2.openTxn("Unknown"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks); + AddDynamicPartitions adp = new AddDynamicPartitions(txnId, "default", "TAB_PART", + Collections.singletonList("p=blah")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); + + adp.setTxnid(txnId2); + txnHandler.addDynamicPartitions(adp); LockException expectedException = null; try { + //with HIVE-15032 this should use static parts and thus not need addDynamicPartitions txnMgr2.commitTxn(); } catch (LockException e) { @@ -953,8 +984,8 @@ public class TestDbTxnManager2 { } Assert.assertTrue("Didn't get exception", expectedException != null); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); - Assert.assertEquals("Exception msg didn't match", - "Aborting [txnid:2,2] due to a write conflict on default/tab_part committed by [txnid:1,2]", + Assert.assertEquals("Exception msg didn't match", + "Aborting [txnid:3,3] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:2,3] u/u", expectedException.getCause().getMessage()); } /** @@ -963,6 +994,7 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking4() throws Exception { + dropTable(new String[] {"TAB_PART", "TAB2"}); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -1040,26 +1072,33 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking5() throws Exception { + dropTable(new String[] {"TAB_PART"}); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr.openTxn("Known"); - txnMgr2.openTxn("Unknown"); + long txnId = txnMgr2.openTxn("Unknown"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks); txnMgr.rollbackTxn(); + + AddDynamicPartitions adp = new AddDynamicPartitions(txnId, "default", "TAB_PART", + Arrays.asList("p=blah")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); txnMgr2.commitTxn();//since conflicting txn rolled back, commit succeeds Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -1069,6 +1108,7 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking6() throws Exception { + dropTable(new String[] {"TAB2"}); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " + "by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -1102,6 +1142,7 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking7() throws Exception { + dropTable(new String[] {"tab2", "TAB2"}); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); CommandProcessorResponse cpr = driver.run("create table if not exists tab2 (a int, b int) " + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); @@ -1209,6 +1250,7 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking8() throws Exception { + dropTable(new String[] {"tab1", "TAB1"}); CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -1262,6 +1304,7 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking9() throws Exception { + dropTable(new String[] {"TAB1"}); CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -1321,6 +1364,7 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking10() throws Exception { + dropTable(new String[] {"TAB1"}); CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -1369,7 +1413,7 @@ public class TestDbTxnManager2 { } Assert.assertNotEquals("Expected exception", null, exception); Assert.assertEquals("Exception msg doesn't match", - "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]", + "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3] d/u", exception.getCause().getMessage()); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1382,6 +1426,7 @@ public class TestDbTxnManager2 { */ @Test public void testWriteSetTracking11() throws Exception { + dropTable(new String[] {"TAB1"}); CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -1442,6 +1487,7 @@ public class TestDbTxnManager2 { } @Test public void testCompletedTxnComponents() throws Exception { + dropTable(new String[] {"TAB1", "tab_not_acid2"}); CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); @@ -1462,6 +1508,7 @@ public class TestDbTxnManager2 { */ @Test public void testMultiInsert() throws Exception { + dropTable(new String[] {"TAB1", "tab_not_acid"}); checkCmdOnDriver(driver.run("drop table if exists tab1")); checkCmdOnDriver(driver.run("drop table if exists tab_not_acid")); CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + @@ -1518,4 +1565,568 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "XYZ", null, locks); Assert.assertEquals("Wrong AgentInfo", driver.getPlan().getQueryId(), locks.get(0).getAgentInfo()); } + @Test + public void testMerge3Way01() throws Exception { + testMerge3Way(false); + } + @Test + public void testMerge3Way02() throws Exception { + testMerge3Way(true); + } + + /** + * @param cc whether to cause a WW conflict or not + * @throws Exception + */ + private void testMerge3Way(boolean cc) throws Exception { + dropTable(new String[] {"target","source", "source2"}); + checkCmdOnDriver(driver.run("create table target (a int, b int) " + + "partitioned by (p int, q int) clustered by (a) into 2 buckets " + + "stored as orc TBLPROPERTIES ('transactional'='true')")); + //in practice we don't really care about the data in any of these tables (except as far as + //it creates partitions, the SQL being test is not actually executed and results of the + //wrt ACID metadata is supplied manually via addDynamicPartitions(). But having data makes + //it easier to follow the intent + checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); + checkCmdOnDriver(driver.run("create table source (a int, b int, p int, q int)")); + checkCmdOnDriver(driver.run("insert into source values " + + // I-(1/2) D-(1/2) I-(1/3) U-(1/3) D-(2/2) I-(1/1) - new part + "(9,10,1,2), (3,4,1,2), (11,12,1,3), (5,13,1,3), (7,8,2,2), (14,15,1,1)")); + checkCmdOnDriver(driver.run("create table source2 (a int, b int, p int, q int)")); + checkCmdOnDriver(driver.run("insert into source2 values " + + //cc ? -:U-(1/2) D-(1/2) cc ? U-(1/3):- D-(2/2) I-(1/1) - new part 2 + "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)")); + + + long txnId1 = txnMgr.openTxn("T1"); + checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " + + "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3 + "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 + "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + List locks = getLocks(txnMgr, true); + Assert.assertEquals("Unexpected lock count", 5, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); + + //start concurrent txn + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + long txnId2 = txnMgr2.openTxn("T2"); + checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + + "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 + "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 + "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false); + locks = getLocks(txnMgr2, true); + Assert.assertEquals("Unexpected lock count", 10, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); + + long extLockId = checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "target", null, locks).getLockid(); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source2", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=2", locks); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=3", locks); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=2/q=2", locks); + + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1)); + //complete 1st txn + AddDynamicPartitions adp = new AddDynamicPartitions(txnId1, "default", "target", + Collections.singletonList("p=1/q=3"));//update clause + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); + adp = new AddDynamicPartitions(txnId1, "default", "target", + Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); + adp = new AddDynamicPartitions(txnId1, "default", "target", + Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause + adp.setOperationType(DataOperationType.INSERT); + txnHandler.addDynamicPartitions(adp); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 1, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + + " and tc_operation_type='u'")); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 2, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + + " and tc_operation_type='d'")); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 3, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + + " and tc_operation_type='i'")); + txnMgr.commitTxn();//commit T1 + Assert.assertEquals( + "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 6, + TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId1)); + Assert.assertEquals( + "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 + + " and ws_operation_type='u'")); + Assert.assertEquals( + "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 2, + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 + + " and ws_operation_type='d'")); + + //re-check locks which were in Waiting state - should now be Acquired + ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); + locks = getLocks(txnMgr2, true); + Assert.assertEquals("Unexpected lock count", 5, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source2", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); + + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2)); + //complete 2nd txn + adp = new AddDynamicPartitions(txnId2, "default", "target", + Collections.singletonList(cc ? "p=1/q=3" : "p=1/p=2"));//update clause + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); + adp = new AddDynamicPartitions(txnId2, "default", "target", + Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); + adp = new AddDynamicPartitions(txnId2, "default", "target", + Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause + adp.setOperationType(DataOperationType.INSERT); + txnHandler.addDynamicPartitions(adp); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 1, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + + " and tc_operation_type='u'")); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 2, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + + " and tc_operation_type='d'")); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 3, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 + + " and tc_operation_type='i'")); + + LockException expectedException = null; + try { + txnMgr2.commitTxn();//commit T2 + } + catch (LockException e) { + expectedException = e; + } + if(cc) { + Assert.assertNotNull("didn't get exception", expectedException); + Assert.assertEquals("Transaction manager has aborted the transaction txnid:3. Reason: " + + "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=3 " + + "committed by [txnid:2,3] u/u", expectedException.getMessage()); + Assert.assertEquals( + "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2)); + Assert.assertEquals( + "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 0, + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2)); + } + else { + Assert.assertNull("Unexpected exception " + expectedException, expectedException); + Assert.assertEquals( + "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 6, + TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2)); + Assert.assertEquals( + "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2 + + " and ws_operation_type='u'")); + Assert.assertEquals( + "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 2, + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2 + + " and ws_operation_type='d'")); + } + + + } + @Test + public void testMergeUnpartitioned01() throws Exception { + testMergeUnpartitioned(true); + } + @Test + public void testMergeUnpartitioned02() throws Exception { + testMergeUnpartitioned(false); + } + + /** + * run a merge statement using un-partitioned target table and a concurrent op on the target + * Check that proper locks are acquired and Write conflict detection works and the state + * of internal table. + * @param causeConflict true to make 2 operations such that they update the same entity + * @throws Exception + */ + private void testMergeUnpartitioned(boolean causeConflict) throws Exception { + dropTable(new String[] {"target","source"}); + checkCmdOnDriver(driver.run("create table target (a int, b int) " + + "clustered by (a) into 2 buckets " + + "stored as orc TBLPROPERTIES ('transactional'='true')")); + checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)")); + checkCmdOnDriver(driver.run("create table source (a int, b int)")); + + long txnid1 = txnMgr.openTxn("T1"); + if(causeConflict) { + checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1")); + } + else { + checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)")); + } + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 1,//no DP, so it's populated from lock info + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1)); + + List locks = getLocks(txnMgr, true); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(causeConflict ? LockType.SHARED_WRITE : LockType.SHARED_READ, + LockState.ACQUIRED, "default", "target", null, locks); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + //start a 2nd (overlapping) txn + long txnid2 = txnMgr2.openTxn("T2"); + checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " + + "on t.a=s.a " + + "when matched then delete " + + "when not matched then insert values(s.a,s.b)")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); + locks = getLocks(txnMgr, true); + + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_READ, causeConflict ? LockState.WAITING : LockState.ACQUIRED, + "default", "source", null, locks); + long extLockId = checkLock(LockType.SHARED_WRITE, causeConflict ? LockState.WAITING : LockState.ACQUIRED, + "default", "target", null, locks).getLockid(); + + txnMgr.commitTxn();//commit T1 + + Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + causeConflict ? 1 : 0,//Inserts are not tracked by WRITE_SET + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid1 + + " and ws_operation_type=" + (causeConflict ? "'u'" : "'i'"))); + + + //re-check locks which were in Waiting state - should now be Acquired + ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); + locks = getLocks(txnMgr, true); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); + + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 1,// + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 1,// + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + + "and tc_operation_type='d'")); + + //complete T2 txn + LockException expectedException = null; + try { + txnMgr2.commitTxn(); + } + catch (LockException e) { + expectedException = e; + } + if(causeConflict) { + Assert.assertTrue("Didn't get exception", expectedException != null); + Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); + Assert.assertEquals("Exception msg didn't match", + "Aborting [txnid:3,3] due to a write conflict on default/target committed by [txnid:2,3] d/u", + expectedException.getCause().getMessage()); + } + else { + Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1,//Unpartitioned table: 1 row for Delete; Inserts are not tracked in WRITE_SET + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2 + + " and ws_operation_type='d'")); + } + } + /** + * Check that DP with partial spec properly updates TXN_COMPONENTS + * @throws Exception + */ + @Test + public void testDynamicPartitionInsert() throws Exception { + dropTable(new String[] {"target"}); + checkCmdOnDriver(driver.run("create table target (a int, b int) " + + "partitioned by (p int, q int) clustered by (a) into 2 buckets " + + "stored as orc TBLPROPERTIES ('transactional'='true')")); + long txnid1 = txnMgr.openTxn("T1"); + checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + List locks = getLocks(txnMgr, true); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + //table is empty, so can only lock the table + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + Assert.assertEquals( + "HIVE_LOCKS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + + TxnDbUtil.queryToString("select * from HIVE_LOCKS"), + 1, + TxnDbUtil.countQueryAgent("select count(*) from HIVE_LOCKS where hl_txnid=" + txnid1)); + txnMgr.rollbackTxn(); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1)); + //now actually write to table to generate some partitions + checkCmdOnDriver(driver.run("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)")); + driver.run("select count(*) from target"); + List r = new ArrayList<>(); + driver.getResults(r); + Assert.assertEquals("", "4", r.get(0)); + Assert.assertEquals(//look in COMPLETED_TXN_COMPONENTS because driver.run() committed!!!! + "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1 + 1) + "): " + + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 2,//2 distinct partitions created + //txnid+1 because we want txn used by previous driver.run("insert....) + TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (txnid1 + 1))); + + + long txnid2 = txnMgr.openTxn("T1"); + checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + locks = getLocks(txnMgr, true); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + //Plan is using DummyPartition, so can only lock the table... unfortunately + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + AddDynamicPartitions adp = new AddDynamicPartitions(txnid2, "default", "target", Arrays.asList("p=1/q=2","p=1/q=2")); + adp.setOperationType(DataOperationType.INSERT); + txnHandler.addDynamicPartitions(adp); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 2,//2 distinct partitions modified + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); + txnMgr.commitTxn(); + } + @Test + public void testMergePartitioned01() throws Exception { + testMergePartitioned(false); + } + @Test + public void testMergePartitioned02() throws Exception { + testMergePartitioned(true); + } + /** + * "run" an Update and Merge concurrently; Check that correct locks are acquired. + * Check state of auxiliary ACID tables. + * @param causeConflict - true to make the operations cause a Write conflict + * @throws Exception + */ + private void testMergePartitioned(boolean causeConflict) throws Exception { + dropTable(new String[] {"target","source"}); + checkCmdOnDriver(driver.run("create table target (a int, b int) " + + "partitioned by (p int, q int) clustered by (a) into 2 buckets " + + "stored as orc TBLPROPERTIES ('transactional'='true')")); + checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); + checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)")); + + long txnId1 = txnMgr.openTxn("T1"); + checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); + List locks = getLocks(txnMgr, true); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + //start a 2nd (overlapping) txn + long txnid2 = txnMgr2.openTxn("T2"); + checkCmdOnDriver(driver.compileAndRespond("merge into target using source " + + "on target.p=source.p1 and target.a=source.a1 " + + "when matched then update set b = 11 " + + "when not matched then insert values(a1,b1,p1,q1)")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); + locks = getLocks(txnMgr, true); + Assert.assertEquals("Unexpected lock count", 7, locks.size()); + /** + * W locks from T1 are still there, so all locks from T2 block. + * The Update part of Merge requests W locks for each existing partition in target. + * The Insert part doesn't know which partitions may be written to: thus R lock on target table. + * */ + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source", null, locks); + long extLockId = checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "target", null, locks).getLockid(); + + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=2", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=3", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=2/q=2", locks); + + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 0,//because it's using a DP write + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1)); + //complete T1 transaction (simulate writing to 2 partitions) + AddDynamicPartitions adp = new AddDynamicPartitions(txnId1, "default", "target", + Arrays.asList("p=1/q=2","p=1/q=3")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 2, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 + + " and tc_operation_type='u'")); + txnMgr.commitTxn();//commit T1 + Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 2,//2 partitions updated + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 + + " and ws_operation_type='u'")); + + + //re-check locks which were in Waiting state - should now be Acquired + ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); + locks = getLocks(txnMgr, true); + Assert.assertEquals("Unexpected lock count", 5, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks); + + + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 0,//because it's using a DP write + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); + //complete T2 txn + //simulate Insert into 2 partitions + adp = new AddDynamicPartitions(txnid2, "default", "target", + Arrays.asList("p=1/q=2","p=1/q=3")); + adp.setOperationType(DataOperationType.INSERT); + txnHandler.addDynamicPartitions(adp); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 2, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + " and tc_operation_type='i'")); + //simulate Update of 1 partitions; depending on causeConflict, choose one of the partitions + //which was modified by the T1 update stmt or choose a non-conflicting one + adp = new AddDynamicPartitions(txnid2, "default", "target", + Collections.singletonList(causeConflict ? "p=1/q=2" : "p=1/q=1")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); + Assert.assertEquals( + "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + + TxnDbUtil.queryToString("select * from TXN_COMPONENTS"), + 1, + TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + " and tc_operation_type='u'")); + + + LockException expectedException = null; + try { + txnMgr2.commitTxn(); + } + catch (LockException e) { + expectedException = e; + } + if(causeConflict) { + Assert.assertTrue("Didn't get exception", expectedException != null); + Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); + Assert.assertEquals("Exception msg didn't match", + "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=2 committed by [txnid:2,3] u/u", + expectedException.getCause().getMessage()); + } + else { + Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1,//1 partitions updated + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2 + + " and ws_operation_type='u'")); + Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1,//1 partitions updated (and no other entries) + TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2)); + } + } + //https://issues.apache.org/jira/browse/HIVE-15048 + @Test + @Ignore("for some reason this fails with NPE in setUp() when run as part of the suite, but not standalone..") + public void testUpdateWithSubquery() throws Exception { + dropTable(new String[] {"target", "source"}); + checkCmdOnDriver(driver.run("create table target (a int, b int) " + + "partitioned by (p int, q int) clustered by (a) into 2 buckets " + + "stored as orc TBLPROPERTIES ('transactional'='true')")); + checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); + + checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); + + checkCmdOnDriver(driver.run( +"update target set b = 1 where p in (select t.q1 from source t where t.a1=5)")); +/** + * So the above query fails with invalid reference 'p' (in subquery) (as as if u use t.p) + * But before it fails, here is inputs/outpus before/after UpdateDeleteSemanticAnalyzer +* Before UDSA +* inputs: [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2] +* outputs: [default@target] +* +* after UDSA +* inputs: [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2] +* outputs: [default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2] +* +* So it looks like.... +*/ + checkCmdOnDriver(driver.run( + "update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)")); + + } } http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java index 3d2e648..467de26 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java @@ -32,7 +32,6 @@ import org.junit.Test; */ public class TestIUD { private static HiveConf conf; - private ParseDriver pd; @BeforeClass @@ -47,6 +46,9 @@ public class TestIUD { } ASTNode parse(String query) throws ParseException { + return parse(query, pd, conf); + } + static ASTNode parse(String query, ParseDriver pd, HiveConf conf) throws ParseException { ASTNode nd = null; try { nd = pd.parse(query, new Context(conf)); http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java new file mode 100644 index 0000000..7481e1a --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java @@ -0,0 +1,246 @@ +package org.apache.hadoop.hive.ql.parse; + +import org.antlr.runtime.tree.RewriteEmptyStreamException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; + +/** + * Testing parsing for SQL Merge statement + */ +public class TestMergeStatement { + private static HiveConf conf; + private ParseDriver pd; + + @BeforeClass + public static void initialize() { + conf = new HiveConf(SemanticAnalyzer.class); + SessionState.start(conf); + } + + @Before + public void setup() throws SemanticException, IOException { + pd = new ParseDriver(); + } + + ASTNode parse(String query) throws ParseException { + return TestIUD.parse(query, pd, conf); + } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void test() throws ParseException { + ASTNode ast = parse(//using target.a breaks this + "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN UPDATE set a = source.b, c=d+1"); + Assert.assertEquals( + "(tok_merge " + + "(tok_tabref (tok_tabname target)) " + + "(tok_tabref (tok_tabname source)) " + + "(= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " + + "(tok_matched " + + "(tok_update " + + "(tok_set_columns_clause " + + "(= (tok_table_or_col a) (. (tok_table_or_col source) b)) " + + "(= (tok_table_or_col c) (+ (tok_table_or_col d) 1))" + + ")" + + ")" + + ")" + + ")", ast.toStringTree()); + } + @Test + public void test1() throws ParseException { + //testing MATCHED AND with CASE statement + ASTNode ast = parse(//using target.a breaks this + "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED " + + "AND source.c2 < current_time() " + + "THEN UPDATE set a = source.b, b = case when c1 is null then c1 else c1 end"); + Assert.assertEquals( + "(tok_merge " + + "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " + + "(tok_matched " + + "(tok_update " + + "(tok_set_columns_clause " + + "(= (tok_table_or_col a) (. (tok_table_or_col source) b)) " + + "(= (tok_table_or_col b) (tok_function when (tok_function tok_isnull (tok_table_or_col c1)) (tok_table_or_col c1) (tok_table_or_col c1)))" + + ")" + + ") " + + "(< (. (tok_table_or_col source) c2) (tok_function current_time)))" + + ")", ast.toStringTree()); + } + @Test + public void test2() throws ParseException { + ASTNode + ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN DELETE"); + Assert.assertEquals( + "(tok_merge " + + "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " + + "(tok_matched " + + "tok_delete)" + + ")", ast.toStringTree()); + } + @Test + public void test3() throws ParseException { + ASTNode + ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND target.a + source.b > 8 THEN DELETE"); + Assert.assertEquals( + "(tok_merge " + + "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " + + "(tok_matched " + + "tok_delete " + + "(> (+ (. (tok_table_or_col target) a) (. (tok_table_or_col source) b)) 8))" + + ")", ast.toStringTree()); + } + @Test + public void test4() throws ParseException { + ASTNode + ast = parse( + "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(source.a, case when source.b is null then target.b else source.b end)"); + Assert.assertEquals( + "(tok_merge " + + "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " + + "(tok_not_matched " + + "(tok_insert " + + "(tok_value_row " + + "(. (tok_table_or_col source) a) " + + "(tok_function when " + + "(tok_function tok_isnull (. (tok_table_or_col source) b)) (. (tok_table_or_col target) b) " + + "(. (tok_table_or_col source) b)" + + ")" + + ")" + + ")" + + ")" + + ")", ast.toStringTree()); + + } + /** + * both UPDATE and INSERT + * @throws ParseException + */ + @Test + public void test5() throws ParseException { + ASTNode + ast = parse( + "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN UPDATE set a = source.b, c=d+1 WHEN NOT MATCHED THEN INSERT VALUES(source.a, 2, current_date())"); + Assert.assertEquals( + "(tok_merge " + + "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " + + "(tok_matched " + + "(tok_update " + + "(tok_set_columns_clause (= (tok_table_or_col a) (. (tok_table_or_col source) b)) (= (tok_table_or_col c) (+ (tok_table_or_col d) 1)))" + + ")" + + ") " + + "(tok_not_matched " + + "(tok_insert " + + "(tok_value_row " + + "(. (tok_table_or_col source) a) " + + "2 " + + "(tok_function current_date)" + + ")" + + ")" + + ")" + + ")", ast.toStringTree()); + + } + @Test + public void testNegative() throws ParseException { + expectedException.expect(ParseException.class); + expectedException.expectMessage("line 1:74 cannot recognize input near 'INSERT' '' '' in WHEN MATCHED THEN clause"); + ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN INSERT"); + } + @Test + public void testNegative1() throws ParseException { + expectedException.expect(ParseException.class); + expectedException.expectMessage("line 1:78 mismatched input 'DELETE' expecting INSERT near 'THEN' in WHEN NOT MATCHED clause"); + ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN DELETE"); + } + @Test + public void test8() throws ParseException { + ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND a = 1 THEN UPDATE set a = b WHEN MATCHED THEN DELETE"); + } + @Test + public void test9() throws ParseException { + ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk " + + "WHEN MATCHED AND a = 1 THEN UPDATE set a = b " + + "WHEN MATCHED THEN DELETE " + + "WHEN NOT MATCHED AND d < e THEN INSERT VALUES(1,2)"); + } + @Test + public void test10() throws ParseException { + ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk " + + "WHEN MATCHED AND a = 1 THEN DELETE " + + "WHEN MATCHED THEN UPDATE set a = b " + + "WHEN NOT MATCHED AND d < e THEN INSERT VALUES(1,2)"); + } + /** + * we always expect 0 or 1 update/delete WHEN clause and 0 or 1 insert WHEN clause (and 1 or 2 WHEN clauses altogether) + * @throws ParseException + */ + @Test + public void testNegative3() throws ParseException { + expectedException.expect(ParseException.class); + expectedException.expectMessage("line 1:119 cannot recognize input near 'INSERT' 'VALUES' '(' in WHEN MATCHED THEN clause"); + ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND a = 1 THEN UPDATE set a = b WHEN MATCHED THEN INSERT VALUES(1,2)"); + } + /** + * here we reverse the order of WHEN MATCHED/WHEN NOT MATCHED - should we allow it? + * @throws ParseException + */ + @Test + public void testNegative4() throws ParseException { + expectedException.expect(ParseException.class); + expectedException.expectMessage("line 1:104 missing EOF at 'WHEN' near ')'"); + ASTNode ast = parse( + "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,source.b) WHEN MATCHED THEN DELETE"); + } + + /** + * why does this fail but next one passes + * @throws ParseException + */ + @Test + public void testNegative5() throws ParseException { + expectedException.expect(ParseException.class); + expectedException.expectMessage("line 1:103 mismatched input '+' expecting ) near 'b' in value row constructor"); + //todo: why does this fail but next one passes? + ASTNode ast = parse( + "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,source.b + 1)"); + } + @Test + public void test6() throws ParseException { + ASTNode ast = parse( + "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,(source.b + 1))"); + } + @Test + public void testNegative6() throws ParseException { + expectedException.expect(RewriteEmptyStreamException.class); + expectedException.expectMessage("rule whenClauses"); + ASTNode ast = parse( + "MERGE INTO target USING source ON target.pk = source.pk"); + } + @Test + public void test7() throws ParseException { + ASTNode ast = parse("merge into acidTbl" + + " using nonAcidPart2 source ON acidTbl.a = source.a2 " + + "WHEN MATCHED THEN UPDATE set b = source.b2 " + + "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)"); + Assert.assertEquals(ast.toStringTree(), + "(tok_merge " + + "(tok_tabref (tok_tabname acidtbl)) (tok_tabref (tok_tabname nonacidpart2) source) " + + "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " + + "(tok_matched " + + "(tok_update " + + "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " + + "(tok_not_matched " + + "(tok_insert " + + "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2)))))"); + } +}