Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1F4DD200B4C for ; Fri, 22 Jul 2016 18:56:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1DBF1160A5A; Fri, 22 Jul 2016 16:56:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9A668160A6D for ; Fri, 22 Jul 2016 18:56:52 +0200 (CEST) Received: (qmail 84046 invoked by uid 500); 22 Jul 2016 16:56:51 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 83974 invoked by uid 99); 22 Jul 2016 16:56:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jul 2016 16:56:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6946DE058E; Fri, 22 Jul 2016 16:56:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ekoifman@apache.org To: commits@hive.apache.org Date: Fri, 22 Jul 2016 16:56:51 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hive git commit: HIVE-14292 ACID table creation fails on mysql with MySQLIntegrityConstraintViolationException(Eugene Koifman, reviewed by Wei Zheng) archived-at: Fri, 22 Jul 2016 16:56:55 -0000 Repository: hive Updated Branches: refs/heads/branch-2.1 e96994b10 -> 0efafc0df http://git-wip-us.apache.org/repos/asf/hive/blob/0efafc0d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig deleted file mode 100644 index bc818e0..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig +++ /dev/null @@ -1,3233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Service; -import com.jolbox.bonecp.BoneCPConfig; -import com.jolbox.bonecp.BoneCPDataSource; -import org.apache.commons.dbcp.ConnectionFactory; -import org.apache.commons.dbcp.DriverManagerConnectionFactory; -import org.apache.commons.dbcp.PoolableConnectionFactory; -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.hive.common.ServerUtils; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.common.classification.InterfaceStability; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.commons.dbcp.PoolingDataSource; - -import org.apache.commons.pool.ObjectPool; -import org.apache.commons.pool.impl.GenericObjectPool; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.*; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.util.StringUtils; - -import javax.sql.DataSource; - -import java.io.IOException; -import java.sql.*; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; - -/** - * A handler to answer transaction related calls that come into the metastore - * server. - * - * Note on log messages: Please include txnid:X and lockid info using - * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} - * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages. - * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated, - * so keeping the format consistent makes grep'ing the logs much easier. - * - * Note on HIVE_LOCKS.hl_last_heartbeat. - * For locks that are part of transaction, we set this 0 (would rather set it to NULL but - * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding - * transaction in TXNS. - * - * In general there can be multiple metastores where this logic can execute, thus the DB is - * used to ensure proper mutexing of operations. - * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is - * used to properly sequence operations. Most notably: - * 1. various sequence IDs are generated with aid of this mutex - * 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state - * includes its actual state (Open, Aborted) as well as it's lock list/component list. Thus all - * per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row. - * This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks. - * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock - * can be granted, no other (strictly speaking "earlier") lock can change state. - * - * The exception to his is Derby which doesn't support proper S4U. Derby is always running embedded - * (this is the only supported configuration for Derby) - * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations. - * - * {@link #derbyLock} - - * If we ever decide to run remote Derby server, according to - * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be - * seriazlied, so that would also work though has not been tested. - * - * General design note: - * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is - * still valid and active. In the code this is usually achieved at the same time the txn record - * is locked for some operation. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { - - static final protected char INITIATED_STATE = 'i'; - static final protected char WORKING_STATE = 'w'; - static final protected char READY_FOR_CLEANING = 'r'; - static final char FAILED_STATE = 'f'; - static final char SUCCEEDED_STATE = 's'; - static final char ATTEMPTED_STATE = 'a'; - - // Compactor types - static final protected char MAJOR_TYPE = 'a'; - static final protected char MINOR_TYPE = 'i'; - - // Transaction states - static final protected char TXN_ABORTED = 'a'; - static final protected char TXN_OPEN = 'o'; - - // Lock states - static final protected char LOCK_ACQUIRED = 'a'; - static final protected char LOCK_WAITING = 'w'; - - // Lock types - static final protected char LOCK_EXCLUSIVE = 'e'; - static final protected char LOCK_SHARED = 'r'; - static final protected char LOCK_SEMI_SHARED = 'w'; - - static final private int ALLOWED_REPEATED_DEADLOCKS = 10; - static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); - - static private DataSource connPool; - static private boolean doRetryOnConnPool = false; - - private enum OpertaionType { - INSERT('i'), UPDATE('u'), DELETE('d'); - private final char sqlConst; - OpertaionType(char sqlConst) { - this.sqlConst = sqlConst; - } - public String toString() { - return Character.toString(sqlConst); - } - public static OpertaionType fromString(char sqlConst) { - switch (sqlConst) { - case 'i': - return INSERT; - case 'u': - return UPDATE; - case 'd': - return DELETE; - default: - throw new IllegalArgumentException(quoteChar(sqlConst)); - } - } - //we should instead just pass in OpertaionType from client (HIVE-13622) - @Deprecated - public static OpertaionType fromLockType(LockType lockType) { - switch (lockType) { - case SHARED_READ: - return INSERT; - case SHARED_WRITE: - return UPDATE; - default: - throw new IllegalArgumentException("Unexpected lock type: " + lockType); - } - } - } - - /** - * Number of consecutive deadlocks we have seen - */ - private int deadlockCnt; - private long deadlockRetryInterval; - protected HiveConf conf; - protected DatabaseProduct dbProduct; - - // (End user) Transaction timeout, in milliseconds. - private long timeout; - - private String identifierQuoteString; // quotes to use for quoting tables, where necessary - private long retryInterval; - private int retryLimit; - private int retryNum; - /** - * Derby specific concurrency control - */ - private static final ReentrantLock derbyLock = new ReentrantLock(true); - /** - * must be static since even in UT there may be > 1 instance of TxnHandler - * (e.g. via Compactor services) - */ - private final static ConcurrentHashMap derbyKey2Lock = new ConcurrentHashMap<>(); - private static final String hostname = ServerUtils.hostname(); - - // Private methods should never catch SQLException and then throw MetaException. The public - // methods depend on SQLException coming back so they can detect and handle deadlocks. Private - // methods should only throw MetaException when they explicitly know there's a logic error and - // they want to throw past the public methods. - // - // All public methods that write to the database have to check for deadlocks when a SQLException - // comes back and handle it if they see one. This has to be done with the connection pooling - // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction, - // and then they should catch RetryException and call themselves recursively. See commitTxn for an example. - - public TxnHandler() { - } - - /** - * This is logically part of c'tor and must be called prior to any other method. - * Not physically part of c'tor due to use of relfection - */ - public void setConf(HiveConf conf) { - this.conf = conf; - - checkQFileTestHack(); - - Connection dbConn = null; - // Set up the JDBC connection pool - try { - setupJdbcConnectionPool(conf); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - determineDatabaseProduct(dbConn); - } catch (SQLException e) { - String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); - LOG.error(msg); - throw new RuntimeException(e); - } - finally { - closeDbConn(dbConn); - } - - timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); - buildJumpTable(); - retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, - TimeUnit.MILLISECONDS); - retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); - deadlockRetryInterval = retryInterval / 10; - } - - public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { - try { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - /** - * This method can run at READ_COMMITTED as long as long as - * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. - * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with - * adding corresponding entries into TXNS. The reason is that any txnid below HWM - * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. - */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); - List txnInfo = new ArrayList(); - //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "select txn_id, txn_state, txn_user, txn_host from TXNS where txn_id <= " + hwm; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - char c = rs.getString(2).charAt(0); - TxnState state; - switch (c) { - case TXN_ABORTED: - state = TxnState.ABORTED; - break; - - case TXN_OPEN: - state = TxnState.OPEN; - break; - - default: - throw new MetaException("Unexpected transaction state " + c + - " found in txns table"); - } - txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4))); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return new GetOpenTxnsInfoResponse(hwm, txnInfo); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "getOpenTxnsInfo"); - throw new MetaException("Unable to select from transaction database: " + getMessage(e) - + StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - return getOpenTxnsInfo(); - } - } - - public GetOpenTxnsResponse getOpenTxns() throws MetaException { - try { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - /** - * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} -\ */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + - "initialized, null record found in next_txn_id"); - } - close(rs); - Set openList = new HashSet(); - //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "select txn_id from TXNS where txn_id <= " + hwm; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - openList.add(rs.getLong(1)); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return new GetOpenTxnsResponse(hwm, openList); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "getOpenTxns"); - throw new MetaException("Unable to select from transaction database, " - + StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - return getOpenTxns(); - } - } - public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { - int numTxns = rqst.getNum_txns(); - try { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - lockInternal(); - /** - * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure - * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. - * Also, advancing the counter must work when multiple metastores are running. - * SELECT ... FOR UPDATE is used to prevent - * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. - * - * In the current design, there can be several metastore instances running in a given Warehouse. - * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, - * a client may go to MS1 and start a transaction with ID 500 to update a particular row. - * Now the same client will start another transaction, except it ends up on MS2 and may get - * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot - * on read will thing the version of the row from transaction ID 500 is the latest one. - * - * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This - * set could support a write-through cache for added performance. - */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - // Make sure the user has not requested an insane amount of txns. - int maxTxns = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); - if (numTxns > maxTxns) numTxns = maxTxns; - - stmt = dbConn.createStatement(); - String s = addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); - } - long first = rs.getLong(1); - s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - - long now = getDbTime(dbConn); - List txnIds = new ArrayList(numTxns); - ArrayList queries = new ArrayList(); - String query; - String insertClause = "insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host) values "; - StringBuilder valuesClause = new StringBuilder(); - - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - - if (i > first && - (i - first) % conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { - // wrap up the current query, and start a new one - query = insertClause + valuesClause.toString(); - queries.add(query); - - valuesClause.setLength(0); - valuesClause.append("(").append(i).append(", 'o', ").append(now).append(", ").append(now) - .append(", '").append(rqst.getUser()).append("', '").append(rqst.getHostname()) - .append("')"); - - continue; - } - - if (i > first) { - valuesClause.append(", "); - } - - valuesClause.append("(").append(i).append(", 'o', ").append(now).append(", ").append(now) - .append(", '").append(rqst.getUser()).append("', '").append(rqst.getHostname()) - .append("')"); - } - - query = insertClause + valuesClause.toString(); - queries.add(query); - - for (String q : queries) { - LOG.debug("Going to execute update <" + q + ">"); - stmt.execute(q); - } - LOG.debug("Going to commit"); - dbConn.commit(); - return new OpenTxnsResponse(txnIds); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " - + StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - unlockInternal(); - } - } catch (RetryException e) { - return openTxns(rqst); - } - } - - public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException { - long txnid = rqst.getTxnid(); - try { - Connection dbConn = null; - try { - lockInternal(); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); - } - - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "abortTxn(" + rqst + ")"); - throw new MetaException("Unable to update transaction database " - + StringUtils.stringifyException(e)); - } finally { - closeDbConn(dbConn); - unlockInternal(); - } - } catch (RetryException e) { - abortTxn(rqst); - } - } - - public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { - List txnids = rqst.getTxn_ids(); - try { - Connection dbConn = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - int numAborted = abortTxns(dbConn, txnids, false); - if (numAborted != txnids.size()) { - LOG.warn("Abort Transactions command only abort " + numAborted + " out of " + - txnids.size() + " transactions. It's possible that the other " + - (txnids.size() - numAborted) + - " transactions have been aborted or committed, or the transaction ids are invalid."); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "abortTxns(" + rqst + ")"); - throw new MetaException("Unable to update transaction database " - + StringUtils.stringifyException(e)); - } finally { - closeDbConn(dbConn); - } - } catch (RetryException e) { - abortTxns(rqst); - } - } - - /** - * Concurrency/isolation notes: - * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)} - * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X - * see more notes below. - * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn - * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence - * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of - * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap. - * - * Motivating example: - * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1 - * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot - * that they read appropriately. In particular, if txns do not overlap, then one follows the other - * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure - * this by locking in snapshot after - * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()}) - * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure - * that txn T which will be considered a later txn, locks in a snapshot that includes the result - * of S's commit (assuming no other txns). - * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions - * were running in parallel). If T and S both locked in the same snapshot (for example commit of - * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed) - * 'x' would be updated to the same value by both, i.e. lost update. - */ - public void commitTxn(CommitTxnRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { - long txnid = rqst.getTxnid(); - try { - Connection dbConn = null; - Statement stmt = null; - ResultSet lockHandle = null; - ResultSet commitIdRs = null, rs; - try { - lockInternal(); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - /** - * This S4U will mutex with other commitTxn() and openTxns(). - * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial - * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start - * at the same time and no new txns start until all 3 commit. - * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. - */ - commitIdRs = stmt.executeQuery(addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID")); - if(!commitIdRs.next()) { - throw new IllegalStateException("No rows found in NEXT_TXN_ID"); - } - long commitId = commitIdRs.getLong(1); - /** - * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other - * operation can change this txn (such acquiring locks). While lock() and commitTxn() - * should not normally run concurrently (for same txn) but could due to bugs in the client - * which could then corrupt internal transaction manager state. Also competes with abortTxn(). - */ - lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); - if(lockHandle == null) { - //this also ensures that txn is still there and in expected state (hasn't been timed out) - ensureValidTxn(dbConn, txnid, stmt); - shouldNeverHappen(txnid); - } - Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); - int numCompsWritten = stmt.executeUpdate("insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + - " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + - "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"); - if(numCompsWritten == 0) { - /** - * current txn didn't update/delete anything (may have inserted), so just proceed with commit - * - * We only care about commit id for write txns, so for RO (when supported) txns we don't - * have to mutex on NEXT_TXN_ID. - * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's - * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn. - * If RO < W, then there is no reads-from relationship. - */ - } - else { - /** - * see if there are any overlapping txns wrote the same element, i.e. have a conflict - * Since entire commit operation is mutexed wrt other start/commit ops, - * committed.ws_commit_id <= current.ws_commit_id for all txns - * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap - * For example, [17,20] is committed, [6,80] is being committed right now - these overlap - * [17,20] committed and [21,21] committing now - these do not overlap. - * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) - */ - rs = stmt.executeQuery - (addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," + - "committed.ws_table, committed.ws_partition, cur.ws_commit_id " + - "from WRITE_SET committed INNER JOIN WRITE_SET cur " + - "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " + - //For partitioned table we always track writes at partition level (never at table) - //and for non partitioned - always at table level, thus the same table should never - //have entries with partition key and w/o - "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " + - "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid - // with txnid, though any decent DB should infer this - " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as - // part of this commitTxn() op - " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns - //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all - " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + - " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")")); - if(rs.next()) { - //found a conflict - String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; - StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); - String partitionName = rs.getString(5); - if(partitionName != null) { - resource.append('/').append(partitionName); - } - String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource + - " committed by " + committedTxn; - close(rs); - //remove WRITE_SET info for current txn since it's about to abort - dbConn.rollback(undoWriteSetForCurrentTxn); - LOG.info(msg); - //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this - if(abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { - throw new IllegalStateException(msg + " FAILED!"); - } - dbConn.commit(); - close(null, stmt, dbConn); - throw new TxnAbortedException(msg); - } - else { - //no conflicting operations, proceed with the rest of commit sequence - } - } - // Move the record from txn_components into completed_txn_components so that the compactor - // knows where to look to compact. - String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + - "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; - LOG.debug("Going to execute insert <" + s + ">"); - if (stmt.executeUpdate(s) < 1) { - //this can be reasonable for an empty txn START/COMMIT or read-only txn - LOG.info("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); - } - s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "delete from TXNS where txn_id = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "commitTxn(" + rqst + ")"); - throw new MetaException("Unable to update transaction database " - + StringUtils.stringifyException(e)); - } finally { - close(commitIdRs); - close(lockHandle, stmt, dbConn); - unlockInternal(); - } - } catch (RetryException e) { - commitTxn(rqst); - } - } - @Override - public void performWriteSetGC() { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID"); - if(!rs.next()) { - throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); - } - long highestAllocatedTxnId = rs.getLong(1); - close(rs); - rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN)); - if(!rs.next()) { - throw new IllegalStateException("Scalar query returned no rows?!?!!"); - } - long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark - long lowestOpenTxnId = rs.getLong(1); - if(rs.wasNull()) { - //if here then there are no Open txns and highestAllocatedTxnId must be - //resolved (i.e. committed or aborted), either way - //there are no open txns with id <= highestAllocatedTxnId - //the +1 is there because "delete ..." below has < (which is correct for the case when - //there is an open txn - //Concurrency: even if new txn starts (or starts + commits) it is still true that - //there are no currently open txns that overlap with any committed txn with - //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. - commitHighWaterMark = highestAllocatedTxnId + 1; - } - else { - commitHighWaterMark = lowestOpenTxnId; - } - int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark); - LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET"); - dbConn.commit(); - } catch (SQLException ex) { - LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); - } - finally { - close(rs, stmt, dbConn); - } - } - /** - * As much as possible (i.e. in absence of retries) we want both operations to be done on the same - * connection (but separate transactions). This avoid some flakiness in BONECP where if you - * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one - * doesn't see results of the first. - */ - public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { - ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); - try { - return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid()); - } - catch(NoSuchLockException e) { - // This should never happen, as we just added the lock id - throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); - } - } - private static final class ConnectionLockIdPair { - private final Connection dbConn; - private final long extLockId; - private ConnectionLockIdPair(Connection dbConn, long extLockId) { - this.dbConn = dbConn; - this.extLockId = extLockId; - } - } - - /** - * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read - * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but - * to the same value as before thus forcing db to acquire write lock for duration of the transaction. - * - * There is no real reason to return the ResultSet here other than to make sure the reference to it - * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock - * to be released. - * @param txnState the state this txn is expected to be in. may be null - * @return null if no row was found - * @throws SQLException - * @throws MetaException - */ - private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { - String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); - ResultSet rs = stmt.executeQuery(addForUpdateClause(query)); - if(rs.next()) { - return rs; - } - close(rs); - return null; - } - - /** - * This enters locks into the queue in {@link #LOCK_WAITING} mode. - * - * Isolation Level Notes: - * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes - * any 2 {@code enqueueLockWithRetry()} calls. - * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations - * @see #checkLockWithRetry(Connection, long, long) - */ - private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { - boolean success = false; - Connection dbConn = null; - try { - Statement stmt = null; - ResultSet rs = null; - ResultSet lockHandle = null; - try { - lockInternal(); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - long txnid = rqst.getTxnid(); - stmt = dbConn.createStatement(); - if (isValidTxn(txnid)) { - //this also ensures that txn is still there in expected state - lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); - if(lockHandle == null) { - ensureValidTxn(dbConn, txnid, stmt); - shouldNeverHappen(txnid); - } - } - /** Get the next lock id. - * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. - * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, - * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks, - * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} - * doesn't block on locks acquired later than one it's checking*/ - String s = addForUpdateClause("select nl_next from NEXT_LOCK_ID"); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - - if (txnid > 0) { - /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get) - * So if we add that to LockRequest we'll know that here - * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest - * to contain LockComponent for multiple operations. - * Deriving it from lock info doesn't distinguish between Update and Delete - * - * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc - * FileSinkDesc.table is ql.metadata.Table - * Table.tableSpec which is TableSpec, which has specType which is SpecType - * So maybe this can work to know that this is part of dynamic partition insert in which case - * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here. - * In any case, that's an optimization for now; will be required when adding multi-stmt txns - */ - // For each component in this lock request, - // add an entry to the txn_components table - // This must be done before HIVE_LOCKS is accessed - for (LockComponent lc : rqst.getComponent()) { - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); - s = "insert into TXN_COMPONENTS " + - "(tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) " + - "values (" + txnid + ", '" + dbName + "', " + - (tblName == null ? "null" : "'" + tblName + "'") + ", " + - (partName == null ? "null" : "'" + partName + "'")+ "," + - quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - } - } - - long intLockId = 0; - for (LockComponent lc : rqst.getComponent()) { - intLockId++; - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: - lockChar = LOCK_EXCLUSIVE; - break; - case SHARED_READ: - lockChar = LOCK_SHARED; - break; - case SHARED_WRITE: - lockChar = LOCK_SEMI_SHARED; - break; - } - long now = getDbTime(dbConn); - s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - +intLockId + "," + txnid + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - (isValidTxn(txnid) ? 0 : now) + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - } - dbConn.commit(); - success = true; - return new ConnectionLockIdPair(dbConn, extLockId); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")"); - throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(lockHandle); - close(rs, stmt, null); - if (!success) { - /* This needs to return a "live" connection to be used by operation that follows it. - Thus it only closes Connection on failure/retry. */ - closeDbConn(dbConn); - } - unlockInternal(); - } - } - catch(RetryException e) { - return enqueueLockWithRetry(rqst); - } - } - private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) - throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { - try { - try { - lockInternal(); - if(dbConn.isClosed()) { - //should only get here if retrying this op - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - } - return checkLock(dbConn, extLockId); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); - throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); - } finally { - unlockInternal(); - closeDbConn(dbConn); - } - } - catch(RetryException e) { - return checkLockWithRetry(dbConn, extLockId, txnId); - } - } - /** - * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one. - * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change. - * - * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(), - * in practice more often) - * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB. - * - * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired. - * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change. - * - * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking - * against doesn't move from W to A in another txn) but this method can heartbeat in - * separate txn at READ_COMMITTED. - */ - public LockResponse checkLock(CheckLockRequest rqst) - throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { - try { - Connection dbConn = null; - long extLockId = rqst.getLockid(); - try { - lockInternal(); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - // Heartbeat on the lockid first, to assure that our lock is still valid. - // Then look up the lock info (hopefully in the cache). If these locks - // are associated with a transaction then heartbeat on that as well. - LockInfo info = getTxnIdFromLockId(dbConn, extLockId); - if(info == null) { - throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); - } - if (info.txnId > 0) { - heartbeatTxn(dbConn, info.txnId); - } - else { - heartbeatLock(dbConn, extLockId); - } - //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and - //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired - //extra heartbeat is logically harmless, but ... - return checkLock(dbConn, extLockId); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); - throw new MetaException("Unable to update transaction database " + - JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); - } finally { - closeDbConn(dbConn); - unlockInternal(); - } - } catch (RetryException e) { - return checkLock(rqst); - } - - } - - /** - * This would have been made simpler if all locks were associated with a txn. Then only txn needs to - * be heartbeated, committed, etc. no need for client to track individual locks. - * When removing locks not associated with txn this potentially conflicts with - * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db. - * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed - */ - public void unlock(UnlockRequest rqst) - throws NoSuchLockException, TxnOpenException, MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - long extLockId = rqst.getLockid(); - try { - /** - * This method is logically like commit for read-only auto commit queries. - * READ_COMMITTED since this only has 1 delete statement and no new entries with the - * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are - * created in a single atomic operation. - * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)} - * but hl_lock_ext_id is not known until that method returns. - * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)} - * but using SERIALIZABLE doesn't materially change the interaction. - * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg. - */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - //hl_txnid <> 0 means it's associated with a transaction - String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" + - " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))"; - //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for multi-statement txns where - //some query attempted to lock (thus LOCK_WAITING state) but is giving up due to timeout for example - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); - if (rc < 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - LockInfo info = getTxnIdFromLockId(dbConn, extLockId); - if(info == null) { - //didn't find any lock with extLockId but at ReadCommitted there is a possibility that - //it existed when above delete ran but it didn't have the expected state. - LOG.error("No lock in " + LOCK_WAITING + " mode found for unlock(" + rqst + ")"); - throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); - } - if(info.txnId != 0) { - String msg = "Unlocking locks associated with transaction not permitted. " + info; - LOG.error(msg); - throw new TxnOpenException(msg); - } - if(info.txnId == 0) { - //we didn't see this lock when running DELETE stmt above but now it showed up - //so should "should never happen" happened... - String msg = "Found lock in unexpected state " + info; - LOG.error(msg); - throw new MetaException(msg); - } - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "unlock(" + rqst + ")"); - throw new MetaException("Unable to update transaction database " + - JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); - } - } catch (RetryException e) { - unlock(rqst); - } - } - - /** - * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse} - */ - private static class LockInfoExt extends LockInfo { - private final ShowLocksResponseElement e; - LockInfoExt(ShowLocksResponseElement e) { - super(e); - this.e = e; - } - } - public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { - try { - Connection dbConn = null; - ShowLocksResponse rsp = new ShowLocksResponse(); - List elems = new ArrayList(); - List sortedList = new ArrayList(); - Statement stmt = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - - String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + - "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS"; - - // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query. - String dbName = rqst.getDbname(); - String tableName = rqst.getTablename(); - String partName = rqst.getPartname(); - - StringBuilder filter = new StringBuilder(); - if (dbName != null && !dbName.isEmpty()) { - filter.append("hl_db=").append(quoteString(dbName)); - } - if (tableName != null && !tableName.isEmpty()) { - if (filter.length() > 0) { - filter.append(" and "); - } - filter.append("hl_table=").append(quoteString(tableName)); - } - if (partName != null && !partName.isEmpty()) { - if (filter.length() > 0) { - filter.append(" and "); - } - filter.append("hl_partition=").append(quoteString(partName)); - } - String whereClause = filter.toString(); - - if (!whereClause.isEmpty()) { - s = s + " where " + whereClause; - } - - LOG.debug("Doing to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - ShowLocksResponseElement e = new ShowLocksResponseElement(); - e.setLockid(rs.getLong(1)); - long txnid = rs.getLong(2); - if (!rs.wasNull()) e.setTxnid(txnid); - e.setDbname(rs.getString(3)); - e.setTablename(rs.getString(4)); - String partition = rs.getString(5); - if (partition != null) e.setPartname(partition); - switch (rs.getString(6).charAt(0)) { - case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break; - case LOCK_WAITING: e.setState(LockState.WAITING); break; - default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0)); - } - switch (rs.getString(7).charAt(0)) { - case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break; - case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break; - case LOCK_SHARED: e.setType(LockType.SHARED_READ); break; - default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0)); - } - e.setLastheartbeat(rs.getLong(8)); - long acquiredAt = rs.getLong(9); - if (!rs.wasNull()) e.setAcquiredat(acquiredAt); - e.setUser(rs.getString(10)); - e.setHostname(rs.getString(11)); - e.setLockIdInternal(rs.getLong(12)); - long id = rs.getLong(13); - if(!rs.wasNull()) { - e.setBlockedByExtId(id); - } - id = rs.getLong(14); - if(!rs.wasNull()) { - e.setBlockedByIntId(id); - } - sortedList.add(new LockInfoExt(e)); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - checkRetryable(dbConn, e, "showLocks(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); - } - //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined - //by checkLock() - makes diagnostics easier. - Collections.sort(sortedList, new LockInfoComparator()); - for(LockInfoExt lockInfoExt : sortedList) { - elems.add(lockInfoExt.e); - } - rsp.setLocks(elems); - return rsp; - } catch (RetryException e) { - return showLocks(rqst); - } - } - - /** - * {@code ids} should only have txnid or lockid but not both, ideally. - * Currently DBTxnManager.heartbeat() enforces this. - */ - public void heartbeat(HeartbeatRequest ids) - throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { - try { - Connection dbConn = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - heartbeatLock(dbConn, ids.getLockid()); - heartbeatTxn(dbConn, ids.getTxnid()); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "heartbeat(" + ids + ")"); - throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeDbConn(dbConn); - } - } catch (RetryException e) { - heartbeat(ids); - } - } - - public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) - throws MetaException { - try { - Connection dbConn = null; - HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse(); - Set nosuch = new HashSet(); - Set aborted = new HashSet(); - rsp.setNosuch(nosuch); - rsp.setAborted(aborted); - try { - /** - * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)} - * only has 1 update statement in it and - * we only update existing txns, i.e. nothing can add additional txns that this operation - * would care about (which would have required SERIALIZABLE) - */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { - try { - //todo: do all updates in 1 SQL statement and check update count - //if update count is less than was requested, go into more expensive checks - //for each txn - heartbeatTxn(dbConn, txn); - } catch (NoSuchTxnException e) { - nosuch.add(txn); - } catch (TxnAbortedException e) { - aborted.add(txn); - } - } - return rsp; - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "heartbeatTxnRange(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeDbConn(dbConn); - } - } catch (RetryException e) { - return heartbeatTxnRange(rqst); - } - } - - long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException { - // Get the id for the next entry in the queue - String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); - LOG.debug("going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new IllegalStateException("Transaction tables not properly initiated, " + - "no record found in next_compaction_queue_id"); - } - long id = rs.getLong(1); - s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - return id; - } - public long compact(CompactionRequest rqst) throws MetaException { - // Put a compaction request in the queue. - try { - Connection dbConn = null; - Statement stmt = null; - try { - lockInternal(); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - - long id = generateCompactionQueueId(stmt); - - StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + - "cq_table, "); - String partName = rqst.getPartitionname(); - if (partName != null) buf.append("cq_partition, "); - buf.append("cq_state, cq_type"); - if (rqst.getRunas() != null) buf.append(", cq_run_as"); - buf.append(") values ("); - buf.append(id); - buf.append(", '"); - buf.append(rqst.getDbname()); - buf.append("', '"); - buf.append(rqst.getTablename()); - buf.append("', '"); - if (partName != null) { - buf.append(partName); - buf.append("', '"); - } - buf.append(INITIATED_STATE); - buf.append("', '"); - switch (rqst.getType()) { - case MAJOR: - buf.append(MAJOR_TYPE); - break; - - case MINOR: - buf.append(MINOR_TYPE); - break; - - default: - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Unexpected compaction type " + rqst.getType().toString()); - } - if (rqst.getRunas() != null) { - buf.append("', '"); - buf.append(rqst.getRunas()); - } - buf.append("')"); - String s = buf.toString(); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - return id; - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "compact(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); - unlockInternal(); - } - } catch (RetryException e) { - return compact(rqst); - } - } - - public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { - ShowCompactResponse response = new ShowCompactResponse(new ArrayList()); - Connection dbConn = null; - Statement stmt = null; - try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + - "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " + - "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " + - "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS"; - //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013) - //to sort so that currently running jobs are at the end of the list (bottom of screen) - //and currently running ones are in sorted by start time - //w/o order by likely currently running compactions will be first (LHS of Union) - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - ShowCompactResponseElement e = new ShowCompactResponseElement(); - e.setDbname(rs.getString(1)); - e.setTablename(rs.getString(2)); - e.setPartitionname(rs.getString(3)); - switch (rs.getString(4).charAt(0)) { - case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break; - case WORKING_STATE: e.setState(WORKING_RESPONSE); break; - case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; - case FAILED_STATE: e.setState(FAILED_RESPONSE); break; - case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break; - case ATTEMPTED_STATE: e.setState(ATTEMPTED_RESPONSE); break; - default: - //do nothing to handle RU/D if we add another status - } - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; - case MINOR_TYPE: e.setType(CompactionType.MINOR); break; - default: - //do nothing to handle RU/D if we add another status - } - e.setWorkerid(rs.getString(6)); - e.setStart(rs.getLong(7)); - long endTime = rs.getLong(8); - if(endTime != -1) { - e.setEndTime(endTime); - } - e.setRunAs(rs.getString(9)); - e.setHadoopJobId(rs.getString(10)); - long id = rs.getLong(11);//for debugging - response.addToCompacts(e); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "showCompact(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); - } - return response; - } catch (RetryException e) { - return showCompact(rqst); - } - } - - private static void shouldNeverHappen(long txnid) { - throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid)); - } - private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) { - throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " " - + JavaUtils.lockIdToString(extLockId) + " " + intLockId); - } - - public void addDynamicPartitions(AddDynamicPartitions rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet lockHandle = null; - ResultSet rs = null; - try { - try { - lockInternal(); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); - if(lockHandle == null) { - //ensures txn is still there and in expected state - ensureValidTxn(dbConn, rqst.getTxnid(), stmt); - shouldNeverHappen(rqst.getTxnid()); - } - //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to - //for multi stmt txns if same table is written more than once per tx - // MoveTask knows if it's I/U/D - // MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions() - // which ends up here so we'd need to add a field to AddDynamicPartitions. - String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() - + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename()); - //do limit 1 on this; currently they will all have the same operations - rs = stmt.executeQuery(addLimitClause(1, findOperationType)); - if(!rs.next()) { - throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid())); - } - OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0)); - - //what if a txn writes the same table > 1 time... let's go with this for now, but really - //need to not write this in the first place, i.e. make this delete not needed - //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS - String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" + - quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename()); - //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is - //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which - //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually - //written to - stmt.executeUpdate(deleteSql); - for (String partName : rqst.getPartitionnames()) { - String s = - "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" + - rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) + - "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "addDynamicPartitions(" + rqst + ")"); - throw new MetaException("Unable to insert into from transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(lockHandle, stmt, dbConn); - unlockInternal(); - } - } catch (RetryException e) { - addDynamicPartitions(rqst); - } - } - - /** - * Clean up corresponding records in metastore tables, specifically: - * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS - */ - public void cleanupRecords(HiveObjectType type, Database db, Table table, - Iterator partitionIterator) throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - - try { - String dbName; - String tblName; - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - List queries = new ArrayList(); - StringBuilder buff = new StringBuilder(); - - switch (type) { - case DATABASE: - dbName = db.getName(); - - buff.append("delete from TXN_COMPONENTS where tc_database='"); - buff.append(dbName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); - buff.append(dbName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPACTION_QUEUE where cq_database='"); - buff.append(dbName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); - buff.append(dbName); - buff.append("'"); - queries.add(buff.toString()); - - break; - case TABLE: - dbName = table.getDbName(); - tblName = table.getTableName(); - - buff.append("delete from TXN_COMPONENTS where tc_database='"); - buff.append(dbName); - buff.append("' and tc_table='"); - buff.append(tblName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); - buff.append(dbName); - buff.append("' and ctc_table='"); - buff.append(tblName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPACTION_QUEUE where cq_database='"); - buff.append(dbName); - buff.append("' and cq_table='"); - buff.append(tblName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); - buff.append(dbName); - buff.append("' and cc_table='"); - buff.append(tblName); - buff.append("'"); - queries.add(buff.toString()); - - break; - case PARTITION: - dbName = table.getDbName(); - tblName = table.getTableName(); - List partCols = table.getPartitionKeys(); // partition columns - List partVals; // partition values - String partName; - - while (partitionIterator.hasNext()) { - Partition p = partitionIterator.next(); - partVals = p.getValues(); - partName = Warehouse.makePartName(partCols, partVals); - - buff.append("delete from TXN_COMPONENTS where tc_database='"); - buff.append(dbName); - buff.append("' and tc_table='"); - buff.append(tblName); - buff.append("' and tc_partition='"); - buff.append(partName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); - buff.append(dbName); - buff.append("' and ctc_table='"); - buff.append(tblName); - buff.append("' and ctc_partition='"); - buff.append(partName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPACTION_QUEUE where cq_database='"); - buff.append(dbName); - buff.append("' and cq_table='"); - buff.append(tblName); - buff.append("' and cq_partition='"); - buff.append(partName); - buff.append("'"); - queries.add(buff.toString()); - - buff.setLength(0); - buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); - buff.append(dbName); - buff.append("' and cc_table='"); - buff.append(tblName); - buff.append("' and cc_partition='"); - buff.append(partName); - buff.append("'"); - queries.add(buff.toString()); - } - - break; - default: - throw new MetaException("Invalid object type for cleanup: " + type); - } - - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - stmt.executeUpdate(query); - } - - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "cleanupRecords"); - if (e.getMessage().contains("does not exist")) { - LOG.warn("Cannot perform cleanup since metastore table does not exist"); - } else { - throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e)); - } - } finally { - closeStmt(stmt); - closeDbConn(dbConn); - } - } catch (RetryException e) { - cleanupRecords(type, db, table, partitionIterator); - } - } - - /** - * For testing only, do not use. - */ - @VisibleForTesting - public int numLocksInLockTable() throws SQLException, MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "select count(*) from HIVE_LOCKS"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - rs.next(); - int rc = rs.getInt(1); - // Necessary to clean up the transaction in the db. - dbConn.rollback(); - return rc; - } finally { - close(rs, stmt, dbConn); - } - } - - /** - * For testing only, do not use. - */ - public long setTimeout(long milliseconds) { - long previous_timeout = timeout; - timeout = milliseconds; - return previous_timeout; - } - - protected class RetryException extends Exception { - - } - - protected Connection getDbConn(int isolationLevel) throws SQLException { - int rc = doRetryOnConnPool ? 10 : 1; - Connection dbConn = null; - while (true) { - try { - dbConn = connPool.getConnection(); - dbConn.setAutoCommit(false); - dbConn.setTransactionIsolation(isolationLevel); - return dbConn; - } catch (SQLException e){ - closeDbConn(dbConn); - if ((--rc) <= 0) throw e; - LOG.error("There is a problem with a connection from the pool, retrying(rc=" + rc + "): " + - getMessage(e), e); - } - } - } - - static void rollbackDBConn(Connection dbConn) { - try { - if (dbConn != null && !dbConn.isClosed()) dbConn.rollback(); - } catch (SQLException e) { - LOG.warn("Failed to rollback db connection " + getMessage(e)); - } - } - protected static void closeDbConn(Connection dbConn) { - try { - if (dbConn != null && !dbConn.isClosed()) { - dbConn.close(); - } - } catch (SQLException e) { - LOG.warn("Failed to close db connection " + getMessage(e)); - } - } - - /** - * Close statement instance. - * @param stmt statement instance. - */ - protected static void closeStmt(Statement stmt) { - try { - if (stmt != null && !stmt.isClosed()) stmt.close(); - } catch (SQLException e) { - LOG.warn("Failed to close statement " + getMessage(e)); - } - } - - /** - * Close the ResultSet. - * @param rs may be {@code null} - */ - static void close(ResultSet rs) { - try { - if (rs != null && !rs.isClosed()) { - rs.close(); - } - } - catch(SQLException ex) { - LOG.warn("Failed to close statement " + getMessage(ex)); - } - } - - /** - * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn} - */ - static void close(ResultSet rs, Statement stmt, Connection dbConn) { - close(rs); - closeStmt(stmt); - closeDbConn(dbConn); - } - /** - * Determine if an exception was such that it makes sense to retry. Unfortunately there is no standard way to do - * this, so we have to inspect the error messages and catch the telltale signs for each - * different database. This method will throw {@code RetryException} - * if the error is retry-able. - * @param conn database connection - * @param e exception that was thrown. - * @param caller name of the method calling this (and other info useful to log) - * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when the operation should be retried - */ - protected void checkRetryable(Connection conn, - SQLException e, - String caller) throws RetryException, MetaException { - - // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() - // to test these changes. - // MySQL and MSSQL use 40001 as the state code for rollback. Postgres uses 40001 and 40P01. - // Oracle seems to return different SQLStates and messages each time, - // so I've tried to capture the different error messages (there appear to be fewer different - // error messages than SQL states). - // Derby and newer MySQL driver use the new SQLTransactionRollbackException - boolean sendRetrySignal = false; - try { - if(dbProduct == null) { - throw new IllegalStateException("DB Type not determined yet."); - } - if (e instanceof SQLTransactionRollbackException || - ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || - dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) || - (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) || - (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected") - || e.getMessage().contains("can't serialize access for this transaction")))) { - if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { - long waitInterval = deadlockRetryInterval * deadlockCnt; - LOG.warn("Deadlock detected in " + caller + ". Will wait " + waitInterval + - "ms try again up to " + (ALLOWED_REPEATED_DEADLOCKS - deadlockCnt + 1) + " times."); - // Pause for a just a bit for retrying to avoid immediately jumping back into the deadlock. - try { - Thread.sleep(waitInterval); - } catch (InterruptedException ie) { - // NOP - } - sendRetrySignal = true; - } else { - LOG.error("Too many repeated deadlocks in " + caller + ", giving up."); - } - } else if (isRetryable(conf, e)) { - //in MSSQL this means Communication Link Failure - if (retryNum++ < retryLimit) { - LOG.warn("Retryable error detected in " + caller + ". Will wait " + retryInterval + - "ms and retry up to " + (retryLimit - retryNum + 1) + " times. Error: " + getMessage(e)); - try { - Thread.sleep(retryInterval); - } catch (InterruptedException ex) { - // - } - sendRetrySignal = true; - } else { - LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e)); - } - } - else { - //make sure we know we saw an error that we don't recognize - LOG.info("Non-retryable error: " + getMessage(e)); - } - } - finally { - /*if this method ends with anything except a retry signal, the caller should fail the operation - and propagate the error up to the its caller (Metastore client); thus must reset retry counters*/ - if(!sendRetrySignal) { - deadlockCnt = 0; - retryNum = 0; - } - } - if(sendRetrySignal) { - throw new RetryException(); - } - } - - /** - * Determine the current time, using the RDBMS as a source of truth - * @param conn database connection - * @return current time in milliseconds - * @throws org.apache.hadoop.hive.metastore.api.MetaException if the time cannot be determined - */ - protected long getDbTime(Connection conn) throws MetaException { - Statement stmt = null; - try { - stmt = conn.createStatement(); - String s; - switch (dbProduct) { - case DERBY: - s = "values current_timestamp"; - break; - - case MYSQL: - case POSTGRES: - case SQLSERVER: - s = "select current_timestamp"; - break; - - case ORACLE: - s = "select current_timestamp from dual"; - break; - - default: - String msg = "Unknown database product: " + dbProduct.toString(); - LOG.error(msg); - throw new MetaException(msg); - } - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) throw new MetaException("No results from date query"); - return rs.getTimestamp(1).getTime(); - } catch (SQLException e) { - String msg = "Unable to determine current time: " + e.getMessage(); - LOG.error(msg); - throw new MetaException(msg); - } finally { - closeStmt(stmt); - } - } - - /** - * Determine the String that should be used to quote identifiers. - * @param conn Active connection - * @return quotes - * @throws SQLException - */ - protected String getIdentifierQuoteString(Connection conn) throws SQLException { - if (identifierQuoteString == null) { - identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); - } - return identifierQuoteString; - } - - protected enum DatabaseProduct { DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER} - - /** - * Determine the database product type - * @param conn database connection - * @return database product type - */ - private DatabaseProduct determineDatabaseProduct(Connection conn) { - if (dbProduct == null) { - try { - String s = conn.getMetaData().getDatabaseProductName(); - if (s == null) { - String msg = "getDatabaseProductName returns null, can't determine database product"; - LOG.error(msg); - throw new IllegalStateException(msg); - } else if (s.equals("Apache Derby")) { - dbProduct = DatabaseProduct.DERBY; - } else if (s.equals("Microsoft SQL Server")) { - dbProduct = DatabaseProduct.SQLSERVER; - } else if (s.equals("MySQL")) { - dbProduct = DatabaseProduct.MYSQL; - } else if (s.equals("Oracle")) { - dbProduct = DatabaseProduct.ORACLE; - } else if (s.equals("PostgreSQL")) { - dbProduct = DatabaseProduct.POSTGRES; - } else { - String msg = "Unrecognized database product name <" + s + ">"; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - } catch (SQLException e) { - String msg = "Unable to get database product name: " + e.getMessage(); - LOG.error(msg); - throw new IllegalStateException(msg); - } - } - return dbProduct; - } - - private static class LockInfo { - private final long extLockId; - private final long intLockId; - //0 means there is no transaction, i.e. it a select statement which is not part of - //explicit transaction or a IUD statement that is not writing to ACID table - private final long txnId; - private final String db; - private final String table; - private final String partition; - private final LockState state; - private final LockType type; - - // Assumes the result set is set to a valid row - LockInfo(ResultSet rs) throws SQLException, MetaException { - extLockId = rs.getLong("hl_lock_ext_id"); // can't be null - intLockId = rs.getLong("hl_lock_int_id"); // can't be null - db = rs.getString("hl_db"); // can't be null - String t = rs.getString("hl_table"); - table = (rs.wasNull() ? null : t); - String p = rs.getString("hl_partition"); - partition = (rs.wasNull() ? null : p); - switch (rs.getString("hl_lock_state").charAt(0)) { - case LOCK_WAITING: state = LockState.WAITING; break; - case LOCK_ACQUIRED: state = LockState.ACQUIRED; break; - default: - throw new MetaException("Unknown lock state " + rs.getString("hl_lock_state").charAt(0)); - } - switch (rs.getString("hl_lock_type").charAt(0)) { - case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break; - case LOCK_SHARED: type = LockType.SHARED_READ; break; - case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break; - default: - throw new MetaException("Unknown lock type " + rs.getString("hl_lock_type").charAt(0)); - } - txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL - } - LockInfo(ShowLocksResponseElement e) { - extLockId = e.getLockid(); - intLockId = e.getLockIdInternal(); - txnId = e.getTxnid(); - db = e.getDbname(); - table = e.getTablename(); - partition = e.getPartname(); - state = e.getState(); - type = e.getType(); - } - - public boolean equals(Object other) { - if (!(other instanceof LockInfo)) return false; - LockInfo o = (LockInfo)other; - // Lock ids are unique across the system. - return extLockId == o.extLockId && intLockId == o.intLockId; - } - - @Override - public String toString() { - return JavaUtils.lockIdToString(extLockId) + " intLockId:" + - intLockId + " " + JavaUtils.txnIdToString(txnId) - + " db:" + db + " table:" + table + " partition:" + - partition + " state:" + (state == null ? "null" : state.toString()) - + " type:" + (type == null ? "null" : type.toString()); - } - private boolean isDbLock() { - return db != null && table == null && partition == null; - } - private boolean isTableLock() { - return db != null && table != null && partition == null; - } - } - - private static class LockInfoComparator implements Comparator { - private static final LockTypeComparator lockTypeComparator = new LockTypeComparator(); - public boolean equals(Object other) { - return this == other; - } - - public int compare(LockInfo info1, LockInfo info2) { - // We sort by state (acquired vs waiting) and then by LockType, they by id - if (info1.state == LockState.ACQUIRED && - info2.state != LockState .ACQUIRED) { - return -1; - } - if (info1.state != LockState.ACQUIRED && - info2.state == LockState .ACQUIRED) { - return 1; - } - - int sortByType = lockTypeComparator.compare(info1.type, info2.type); - if(sortByType != 0) { - return sortByType; - } - if (info1.extLockId < info2.extLockId) { - return -1; - } else if (info1.extLockId > info2.extLockId) { - return 1; - } else { - if (info1.intLockId < info2.intLockId) { - return -1; - } el