Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9B404200C7D for ; Mon, 1 May 2017 18:43:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 99E6B160BAE; Mon, 1 May 2017 16:43:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 242C2160BA0 for ; Mon, 1 May 2017 18:43:35 +0200 (CEST) Received: (qmail 77016 invoked by uid 500); 1 May 2017 16:43:35 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 76994 invoked by uid 99); 1 May 2017 16:43:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 May 2017 16:43:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F0CF4E0896; Mon, 1 May 2017 16:43:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ekoifman@apache.org To: commits@hive.apache.org Date: Mon, 01 May 2017 16:43:34 -0000 Message-Id: <6eca0cdbdeaf4e6a85a63f312ae00942@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] hive git commit: HIVE-12636 Ensure that all queries (with DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng) archived-at: Mon, 01 May 2017 16:43:38 -0000 Repository: hive Updated Branches: refs/heads/master 41c383287 -> 21909601f http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index afebf03..c31241a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -44,32 +45,51 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; /** * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} * Tests here are "end-to-end"ish and simulate concurrent queries. * * The general approach is to use an instance of Driver to use Driver.run() to create tables - * Use Driver.compile() to generate QueryPlan which can then be passed to HiveTxnManager.acquireLocks(). - * Same HiveTxnManager is used to openTxn()/commitTxn() etc. This can exercise almost the entire + * Use Driver.compileAndRespond() (which also starts a txn) to generate QueryPlan which can then be + * passed to HiveTxnManager.acquireLocks(). + * Same HiveTxnManager is used to commitTxn()/rollback etc. This can exercise almost the entire * code path that CLI would but with the advantage that you can create a 2nd HiveTxnManager and then * simulate interleaved transactional/locking operations but all from within a single thread. * The later not only controls concurrency precisely but is the only way to run in UT env with DerbyDB. + * + * A slightly different (and simpler) approach is to use "start transaction/(commit/rollback)" + * command with the Driver.run(). This allows you to "see" the state of the Lock Manager after + * each statement and can also simulate concurrent (but very controlled) work but w/o forking any + * threads. The limitation here is that not all statements are allowed in an explicit transaction. + * For example, "drop table foo". This approach will also cause the query to execute which will + * make tests slower but will exericise the code path that is much closer to the actual user calls. + * + * In either approach, each logical "session" should use it's own Transaction Manager. This requires + * using {@link #swapTxnManager(HiveTxnManager)} since in the SessionState the TM is associated with + * each thread. */ public class TestDbTxnManager2 { + private static final Logger LOG = LoggerFactory.getLogger(TestDbTxnManager2.class); private static HiveConf conf = new HiveConf(Driver.class); private HiveTxnManager txnMgr; private Context ctx; private Driver driver; - TxnStore txnHandler; + private TxnStore txnHandler; public TestDbTxnManager2() throws Exception { conf @@ -106,7 +126,6 @@ public class TestDbTxnManager2 { checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)")); - txnMgr.openTxn(ctx, "one"); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -115,7 +134,6 @@ public class TestDbTxnManager2 { txnMgr.rollbackTxn(); checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)")); - txnMgr.openTxn(ctx, "one"); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -124,13 +142,13 @@ public class TestDbTxnManager2 { txnMgr.rollbackTxn(); checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)")); - txnMgr.openTxn(ctx, "three"); txnMgr.acquireLocks(driver.getPlan(), ctx, "three"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "S", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "R", null, locks); + txnMgr.rollbackTxn(); } @Test public void createTable() throws Exception { @@ -141,7 +159,7 @@ public class TestDbTxnManager2 { List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks); - txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); } @Test @@ -158,7 +176,7 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T2", null, locks); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T3", null, locks); - txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); cpr = driver.run("drop table if exists T1"); checkCmdOnDriver(cpr); @@ -179,7 +197,7 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T5", null, locks); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T4", null, locks); - txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); cpr = driver.run("drop table if exists T5"); checkCmdOnDriver(cpr); @@ -195,23 +213,23 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6 List selectLocks = ctx.getHiveLocks(); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("drop table if exists T6"); checkCmdOnDriver(cpr); //tries to get X lock on T1 and gets Waiting state - LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); + LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks); - txnMgr.getLockManager().releaseLocks(selectLocks);//release S on T6 + txnMgr.rollbackTxn();//release S on T6 //attempt to X on T6 again - succeed lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T6", null, locks); - List xLock = new ArrayList(0); - xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(xLock); + txnMgr2.rollbackTxn(); cpr = driver.run("drop table if exists T6"); locks = getLocks(); Assert.assertEquals("Unexpected number of locks found", 0, locks.size()); @@ -224,26 +242,23 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6"); + cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");//gets SS lock on T7 checkCmdOnDriver(cpr); - txnMgr.openTxn(ctx, "Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); - checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - //txnMgr2.openTxn("Fiddler"); - ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7 + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks); checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks); txnMgr.commitTxn(); - ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); + ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks); - List xLock = new ArrayList(0); - xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr2.getLockManager().releaseLocks(xLock); + txnMgr2.commitTxn(); } @Test public void updateSelectUpdate() throws Exception { @@ -252,12 +267,12 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("delete from T8 where b = 89"); checkCmdOnDriver(cpr); - txnMgr.openTxn(ctx, "Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver.run("start transaction")); cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8 checkCmdOnDriver(cpr); - HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "Fiddler"); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler"); checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1")); ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer @@ -266,13 +281,14 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks); - txnMgr.rollbackTxn(); + driver.releaseLocksAndCommitOrRollback(false, txnMgr); ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(2).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks); - txnMgr2.commitTxn(); + driver.releaseLocksAndCommitOrRollback(true, txnMgr2); + swapTxnManager(txnMgr); cpr = driver.run("drop table if exists T6"); locks = getLocks(); Assert.assertEquals("Unexpected number of locks found", 0, locks.size()); @@ -284,21 +300,20 @@ public class TestDbTxnManager2 { dropTable(new String[] {"T9"}); conf.setIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES, 2); conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true); - HiveTxnManager otherTxnMgr = new DbTxnManager(); - ((DbTxnManager)otherTxnMgr).setHiveConf(conf); CommandProcessorResponse cpr = driver.run("create table T9(a int)"); checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("select * from T9"); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Vincent Vega"); - List locks = getLocks(txnMgr); + List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks); - + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("drop table T9"); checkCmdOnDriver(cpr); try { - otherTxnMgr.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield"); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield"); } catch(LockException ex) { Assert.assertEquals("Got wrong lock exception", ErrorMsg.LOCK_ACQUIRE_TIMEDOUT, ex.getCanonicalErrorMsg()); @@ -306,7 +321,7 @@ public class TestDbTxnManager2 { locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks); - otherTxnMgr.closeTxnManager(); + txnMgr2.closeTxnManager(); } /** @@ -324,13 +339,15 @@ public class TestDbTxnManager2 { cpr = driver.compileAndRespond("select * from TAB_BLOCKED"); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM"); - List locks = getLocks(txnMgr); + List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("drop table TAB_BLOCKED"); checkCmdOnDriver(cpr); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking - locks = getLocks(txnMgr); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks); checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "TAB_BLOCKED", null, locks); @@ -597,9 +614,7 @@ public class TestDbTxnManager2 { List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks); - List relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn();; cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)"); checkCmdOnDriver(cpr); @@ -607,9 +622,7 @@ public class TestDbTxnManager2 { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks); - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)"); checkCmdOnDriver(cpr); @@ -617,9 +630,7 @@ public class TestDbTxnManager2 { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks); - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)"); checkCmdOnDriver(cpr); @@ -627,19 +638,15 @@ public class TestDbTxnManager2 { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks); - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); - + txnMgr.rollbackTxn(); + cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1"); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks); - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1"); checkCmdOnDriver(cpr); @@ -647,9 +654,7 @@ public class TestDbTxnManager2 { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks);//https://issues.apache.org/jira/browse/HIVE-13212 - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); } /** * Check to make sure we acquire proper locks for queries involving acid and non-acid tables @@ -726,6 +731,16 @@ public class TestDbTxnManager2 { throw new IllegalStateException("How did it get here?!"); } + /** + * SessionState is stored in ThreadLoacal; UnitTest runs in a single thread (otherwise Derby wedges) + * {@link HiveTxnManager} instances are per SessionState. + * So to be able to simulate concurrent locks/transactions s/o forking threads we just swap + * the TxnManager instance in the session (hacky but nothing is actually threading so it allows us + * to write good tests) + */ + private static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { + return SessionState.get().setTxnMgr(txnMgr); + } @Test public void testShowLocksFilterOptions() throws Exception { CommandProcessorResponse cpr = driver.run("drop table if exists db1.t14"); @@ -756,26 +771,32 @@ public class TestDbTxnManager2 { // Acquire different locks at different levels + HiveTxnManager txnMgr1 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr1); cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='today') values (1, 2)"); checkCmdOnDriver(cpr); - txnMgr.acquireLocks(driver.getPlan(), ctx, "Tom"); + txnMgr1.acquireLocks(driver.getPlan(), ctx, "Tom"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='tomorrow') values (3, 4)"); checkCmdOnDriver(cpr); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Jerry"); HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr3); cpr = driver.compileAndRespond("select * from db2.t15"); checkCmdOnDriver(cpr); txnMgr3.acquireLocks(driver.getPlan(), ctx, "Donald"); HiveTxnManager txnMgr4 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr4); cpr = driver.compileAndRespond("select * from db2.t16"); checkCmdOnDriver(cpr); txnMgr4.acquireLocks(driver.getPlan(), ctx, "Hillary"); HiveTxnManager txnMgr5 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr5); cpr = driver.compileAndRespond("select * from db2.t14"); checkCmdOnDriver(cpr); txnMgr5.acquireLocks(driver.getPlan(), ctx, "Obama"); @@ -799,6 +820,7 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "db2", "t14", null, locks); // SHOW LOCKS t14 + swapTxnManager(txnMgr); cpr = driver.run("use db1"); checkCmdOnDriver(cpr); locks = getLocksWithFilterOptions(txnMgr, null, "t14", null); @@ -847,14 +869,13 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART")); - txnMgr.openTxn(ctx, "Nicholas"); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas"); - HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr.commitTxn(); - txnMgr2.openTxn(ctx, "Alexandra"); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); - txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas"); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Alexandra"); txnMgr2.commitTxn(); } private void dropTable(String[] tabs) throws Exception { @@ -899,16 +920,17 @@ public class TestDbTxnManager2 { "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); - HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - long txnId = txnMgr.openTxn(ctx, "Known"); - long txnId2 = txnMgr2.openTxn(ctx, "Unknown"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + long txnId = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + long txnId2 = txnMgr2.getCurrentTxnId(); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -933,7 +955,7 @@ public class TestDbTxnManager2 { Assert.assertTrue("Didn't get exception", expectedException != null); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", - "Aborting [txnid:3,3] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:2,3] u/u", + "Aborting [txnid:"+txnId2+","+txnId2+"] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:"+txnId+","+txnId2+"] u/u", expectedException.getCause().getMessage()); } /** @@ -1067,7 +1089,7 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "Horton"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton"); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -1080,6 +1102,7 @@ public class TestDbTxnManager2 { locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); + txnMgr.commitTxn(); TestTxnCommands2.runHouseKeeperService(new AcidWriteSetService(), conf); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); } @@ -1097,40 +1120,40 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab2 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - + swapTxnManager(txnMgr2); //test with predicates such that partition pruning works - txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'")); + long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'")); + long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of txnid:idTxnUpdate1 AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 - + txnMgr2.commitTxn();//txnid:idTxnUpdate1 locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks); - //completion of txnid:3 + //completion of txnid:idTxnUpdate2 adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2", Collections.singletonList("p=one")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//txnid:idTxnUpdate2 //now both txns concurrently updated TAB2 but different partitions. Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1147,8 +1170,9 @@ public class TestDbTxnManager2 { "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4 - txnMgr2.openTxn(ctx, "T5"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + long idTxnUpdate3 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5"); locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1156,8 +1180,9 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T6"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2")); + long idTxnUpdate4 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 4, locks.size()); @@ -1166,24 +1191,24 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks); - //this simulates the completion of txnid:5 + //this simulates the completion of txnid:idTxnUpdate3 adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=one")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:5 + txnMgr2.commitTxn();//txnid:idTxnUpdate3 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - //completion of txnid:6 + //completion of txnid:idTxnUpdate4 adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:6 + txnMgr.commitTxn();//txnid:idTxnUpdate4 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); @@ -1202,10 +1227,11 @@ public class TestDbTxnManager2 { CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "T2"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1213,8 +1239,9 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'")); + long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -1222,23 +1249,23 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of txnid:idTxnUpdate1 AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=one")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//txnid:idTxnUpdate1 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); - //completion of txnid:3 + //completion of txnid:idTxnUpdate2 adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//txnid:idTxnUpdate2 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); @@ -1256,10 +1283,11 @@ public class TestDbTxnManager2 { CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "T2"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1267,8 +1295,9 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + long idTxnDelete1 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -1276,30 +1305,30 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of txnid:idTxnUpdate1 AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=one")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//txnid:idTxnUpdate1 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); - //completion of txnid:3 + //completion of txnid:idTxnUpdate2 adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//txnid:idTxnUpdate2 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'")); + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (idTxnUpdate1 - 1) + " and ctc_table='tab1'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1' and ctc_partition='p=one'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnUpdate1 + " and ctc_table='tab1' and ctc_partition='p=one'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=3 and ctc_table='tab1' and ctc_partition='p=two'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnDelete1 + " and ctc_table='tab1' and ctc_partition='p=two'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1318,7 +1347,7 @@ public class TestDbTxnManager2 { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "T2"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1327,7 +1356,7 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1336,32 +1365,32 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of "Update tab2" txn AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//"Update tab2" ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); - //completion of txnid:3 + //completion of "delete from tab1" txn adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); LockException exception = null; try { - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//"delete from tab1" } catch(LockException e) { exception = e; } Assert.assertNotEquals("Expected exception", null, exception); Assert.assertEquals("Exception msg doesn't match", - "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3] d/u", + "Aborting [txnid:5,5] due to a write conflict on default/tab1/p=two committed by [txnid:4,5] d/u", exception.getCause().getMessage()); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1378,19 +1407,22 @@ public class TestDbTxnManager2 { CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "T2"); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2")); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));//start "delete from tab1" txn + long txnIdDelete = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + //now start concurrent "select * from tab1" txn + swapTxnManager(txnMgr); + checkCmdOnDriver(driver.run("start transaction"));//start explicit txn so that txnMgr knows it checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'")); + long txnIdSelect = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); @@ -1402,12 +1434,12 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of "delete from tab1" txn AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//"delete from tab1" txn ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(4).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); @@ -1415,21 +1447,21 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); - //completion of txnid:3 + //completion of txnid:txnIdSelect adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//"select * from tab1" txn Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete)); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect)); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete)); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect)); Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } @@ -1448,7 +1480,7 @@ public class TestDbTxnManager2 { 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS")); //only expect transactional components to be in COMPLETED_TXN_COMPONENTS Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=6 and ctc_table='tab1'")); } /** @@ -1465,17 +1497,17 @@ public class TestDbTxnManager2 { cpr = driver.run("create table if not exists tab_not_acid (a int, b int, p string)"); checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab_not_acid values(1,1,'one'),(2,2,'two')")); - checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:1 + checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:8 //writing both acid and non-acid resources in the same txn //tab1 write is a dynamic partition insert - checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:2 + checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:9 Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS")); //only expect transactional components to be in COMPLETED_TXN_COMPONENTS Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2")); + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9")); Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1'")); + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9 and ctc_table='tab1'")); } //todo: Concurrent insert/update of same partition - should pass @@ -1546,13 +1578,13 @@ public class TestDbTxnManager2 { "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)")); - long txnId1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " + "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + long txnId1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); - List locks = getLocks(txnMgr); + List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); @@ -1562,13 +1594,14 @@ public class TestDbTxnManager2 { //start concurrent txn DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - long txnId2 = txnMgr2.openTxn(ctx, "T2"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + long txnId2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false); - locks = getLocks(txnMgr2); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 10, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); @@ -1639,7 +1672,7 @@ public class TestDbTxnManager2 { //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); - locks = getLocks(txnMgr2); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source2", null, locks); @@ -1693,9 +1726,9 @@ public class TestDbTxnManager2 { } if(cc) { Assert.assertNotNull("didn't get exception", expectedException); - Assert.assertEquals("Transaction manager has aborted the transaction txnid:3. Reason: " + - "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=3 " + - "committed by [txnid:2,3] u/u", expectedException.getMessage()); + Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " + + "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " + + "committed by [txnid:10,11] u/u", expectedException.getMessage()); Assert.assertEquals( "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), @@ -1753,14 +1786,13 @@ public class TestDbTxnManager2 { "stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)")); checkCmdOnDriver(driver.run("create table source (a int, b int)")); - - long txnid1 = txnMgr.openTxn(ctx, "T1"); if(causeConflict) { checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1")); } else { checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)")); } + long txnid1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + @@ -1774,14 +1806,15 @@ public class TestDbTxnManager2 { LockState.ACQUIRED, "default", "target", null, locks); DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); //start a 2nd (overlapping) txn - long txnid2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " + "on t.a=s.a " + "when matched then delete " + "when not matched then insert values(s.a,s.b)")); + long txnid2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); - locks = getLocks(txnMgr); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); @@ -1801,7 +1834,7 @@ public class TestDbTxnManager2 { //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); - locks = getLocks(txnMgr); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); @@ -1830,7 +1863,7 @@ public class TestDbTxnManager2 { Assert.assertTrue("Didn't get exception", expectedException != null); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", - "Aborting [txnid:3,3] due to a write conflict on default/target committed by [txnid:2,3] d/u", + "Aborting [txnid:7,7] due to a write conflict on default/target committed by [txnid:6,7] d/u", expectedException.getCause().getMessage()); } else { @@ -1922,21 +1955,22 @@ public class TestDbTxnManager2 { checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)")); - long txnId1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1")); + long txnId1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); - List locks = getLocks(txnMgr); + List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); //start a 2nd (overlapping) txn - long txnid2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target using source " + "on target.p=source.p1 and target.a=source.a1 " + "when matched then update set b = 11 " + "when not matched then insert values(a1,b1,p1,q1)")); + long txnid2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 7, locks.size()); @@ -1982,7 +2016,7 @@ public class TestDbTxnManager2 { //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); - locks = getLocks(txnMgr); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); @@ -2031,7 +2065,7 @@ public class TestDbTxnManager2 { Assert.assertTrue("Didn't get exception", expectedException != null); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", - "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=2 committed by [txnid:2,3] u/u", + "Aborting [txnid:7,7] due to a write conflict on default/target/p=1/q=2 committed by [txnid:6,7] u/u", expectedException.getCause().getMessage()); } else { @@ -2046,6 +2080,12 @@ public class TestDbTxnManager2 { TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2)); } } + + /** + * This test is mostly obsolete. The logic in the Driver.java no longer acquires any locks for + * "show tables". Keeping the test for now in case we change that logic. + * @throws Exception + */ @Test public void testShowTablesLock() throws Exception { dropTable(new String[] {"T, T2"}); @@ -2061,6 +2101,8 @@ public class TestDbTxnManager2 { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t", null, locks); DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn(ctx, "Fidler"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("show tables")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler"); locks = getLocks(); @@ -2068,17 +2110,17 @@ public class TestDbTxnManager2 { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks); txnMgr.commitTxn(); - txnMgr2.releaseLocks(txnMgr2.getLockManager().getLocks(false, false)); + txnMgr2.rollbackTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size()); + swapTxnManager(txnMgr); cpr = driver.run( "create table if not exists T2 (a int, b int) partitioned by (p int) clustered by (a) " + "into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); checkCmdOnDriver(cpr); - txnid1 = txnMgr.openTxn(ctx, "Fifer"); checkCmdOnDriver(driver.compileAndRespond("insert into T2 partition(p=1) values(1,3)")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); locks = getLocks(); @@ -2086,6 +2128,8 @@ public class TestDbTxnManager2 { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t2", "p=1", locks); txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn(ctx, "Fidler"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("show tables")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler", false); locks = getLocks(); @@ -2093,7 +2137,7 @@ public class TestDbTxnManager2 { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t2", "p=1", locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks); txnMgr.commitTxn(); - txnMgr2.releaseLocks(txnMgr2.getLockManager().getLocks(false, false)); + txnMgr2.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size()); } http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index f75a1be..ec3e6ab 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -29,6 +30,8 @@ import org.apache.hadoop.hive.metastore.api.LockLevel; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; @@ -223,10 +226,10 @@ public class TestInitiator extends CompactorTest { LockResponse res = txnHandler.lock(req); txnHandler.abortTxn(new AbortTxnRequest(txnid)); - for (int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) { - txnid = openTxn(); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - } + conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50); + OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest( + TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname")); + txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids())); GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/queries/clientpositive/row__id.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/row__id.q b/ql/src/test/queries/clientpositive/row__id.q index a24219b..d9cb7b0 100644 --- a/ql/src/test/queries/clientpositive/row__id.q +++ b/ql/src/test/queries/clientpositive/row__id.q @@ -16,7 +16,7 @@ select tid from (select row__id.transactionid as tid from hello_acid) sub order select tid from (select row__id.transactionid as tid from hello_acid) sub order by tid; explain -select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1; +select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3; -select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1; +select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3; http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/acid_table_stats.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/acid_table_stats.q.out b/ql/src/test/results/clientpositive/acid_table_stats.q.out index 1bf0a98..195278a 100644 --- a/ql/src/test/results/clientpositive/acid_table_stats.q.out +++ b/ql/src/test/results/clientpositive/acid_table_stats.q.out @@ -98,7 +98,7 @@ Partition Parameters: numFiles 2 numRows 0 rawDataSize 0 - totalSize 3837 + totalSize 3852 #### A masked pattern was here #### # Storage Information @@ -136,9 +136,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid - Statistics: Num rows: 1 Data size: 3837 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3852 Basic stats: PARTIAL Column stats: NONE Select Operator - Statistics: Num rows: 1 Data size: 3837 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3852 Basic stats: PARTIAL Column stats: NONE Group By Operator aggregations: count() mode: hash @@ -215,7 +215,7 @@ Partition Parameters: numFiles 2 numRows 1000 rawDataSize 208000 - totalSize 3837 + totalSize 3852 #### A masked pattern was here #### # Storage Information @@ -264,7 +264,7 @@ Partition Parameters: numFiles 2 numRows 1000 rawDataSize 208000 - totalSize 3837 + totalSize 3852 #### A masked pattern was here #### # Storage Information @@ -391,7 +391,7 @@ Partition Parameters: numFiles 4 numRows 1000 rawDataSize 208000 - totalSize 7689 + totalSize 7718 #### A masked pattern was here #### # Storage Information @@ -440,7 +440,7 @@ Partition Parameters: numFiles 4 numRows 2000 rawDataSize 416000 - totalSize 7689 + totalSize 7718 #### A masked pattern was here #### # Storage Information http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/autoColumnStats_4.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out index 3ae2f20..101cc63 100644 --- a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out +++ b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out @@ -201,7 +201,7 @@ Table Parameters: numFiles 2 numRows 0 rawDataSize 0 - totalSize 1714 + totalSize 1724 transactional true #### A masked pattern was here #### @@ -244,7 +244,7 @@ Table Parameters: numFiles 4 numRows 0 rawDataSize 0 - totalSize 2719 + totalSize 2763 transactional true #### A masked pattern was here #### http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out b/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out index 24db44f..fa8417b 100644 --- a/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out +++ b/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out @@ -308,7 +308,7 @@ Table Parameters: numFiles 1 numRows 0 rawDataSize 0 - totalSize 1508 + totalSize 1512 transactional true #### A masked pattern was here #### @@ -336,9 +336,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid_ivot - Statistics: Num rows: 1 Data size: 1508 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 1512 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 1 Data size: 1508 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 1512 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash @@ -430,7 +430,7 @@ Table Parameters: numFiles 2 numRows 0 rawDataSize 0 - totalSize 3016 + totalSize 3024 transactional true #### A masked pattern was here #### @@ -458,9 +458,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid_ivot - Statistics: Num rows: 1 Data size: 3016 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 3024 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 1 Data size: 3016 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 3024 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash @@ -538,7 +538,7 @@ Table Parameters: numFiles 3 numRows 0 rawDataSize 0 - totalSize 380253 + totalSize 380261 transactional true #### A masked pattern was here #### @@ -566,9 +566,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid_ivot - Statistics: Num rows: 1 Data size: 380253 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 380261 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 1 Data size: 380253 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 380261 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out b/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out index d05bf64..357ae7b 100644 --- a/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out +++ b/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out @@ -43,7 +43,7 @@ STAGE PLANS: alias: acidtbldefault filterExpr: (a = 1) (type: boolean) buckets included: [1,] of 16 - Statistics: Num rows: 7972 Data size: 31889 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 7972 Data size: 31888 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false @@ -100,7 +100,7 @@ STAGE PLANS: serialization.ddl struct acidtbldefault { i32 a} serialization.format 1 serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde - totalSize 31889 + totalSize 31888 transactional true transactional_properties default #### A masked pattern was here #### @@ -123,7 +123,7 @@ STAGE PLANS: serialization.ddl struct acidtbldefault { i32 a} serialization.format 1 serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde - totalSize 31889 + totalSize 31888 transactional true transactional_properties default #### A masked pattern was here #### http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/row__id.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/row__id.q.out b/ql/src/test/results/clientpositive/row__id.q.out index 2289883..43c9b60 100644 --- a/ql/src/test/results/clientpositive/row__id.q.out +++ b/ql/src/test/results/clientpositive/row__id.q.out @@ -56,23 +56,23 @@ STAGE PLANS: Map Operator Tree: TableScan alias: hello_acid - Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE Select Operator expressions: ROW__ID.transactionid (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE Reduce Output Operator key expressions: _col0 (type: bigint) sort order: + - Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 2902 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -98,14 +98,14 @@ POSTHOOK: Input: default@hello_acid@load_date=2016-03-01 POSTHOOK: Input: default@hello_acid@load_date=2016-03-02 POSTHOOK: Input: default@hello_acid@load_date=2016-03-03 #### A masked pattern was here #### -1 -2 3 +4 +5 PREHOOK: query: explain -select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1 +select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3 PREHOOK: type: QUERY POSTHOOK: query: explain -select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1 +select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3 POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage @@ -117,17 +117,17 @@ STAGE PLANS: Map Operator Tree: TableScan alias: hello_acid - Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE Filter Operator - predicate: (ROW__ID.transactionid = 1) (type: boolean) - Statistics: Num rows: 1 Data size: 2902 Basic stats: COMPLETE Column stats: NONE + predicate: (ROW__ID.transactionid = 3) (type: boolean) + Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: ROW__ID.transactionid (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 2902 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 2902 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -139,18 +139,18 @@ STAGE PLANS: Processor Tree: ListSink -PREHOOK: query: select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1 +PREHOOK: query: select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3 PREHOOK: type: QUERY PREHOOK: Input: default@hello_acid PREHOOK: Input: default@hello_acid@load_date=2016-03-01 PREHOOK: Input: default@hello_acid@load_date=2016-03-02 PREHOOK: Input: default@hello_acid@load_date=2016-03-03 #### A masked pattern was here #### -POSTHOOK: query: select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1 +POSTHOOK: query: select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3 POSTHOOK: type: QUERY POSTHOOK: Input: default@hello_acid POSTHOOK: Input: default@hello_acid@load_date=2016-03-01 POSTHOOK: Input: default@hello_acid@load_date=2016-03-02 POSTHOOK: Input: default@hello_acid@load_date=2016-03-03 #### A masked pattern was here #### -1 +3