hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [09/11] hive git commit: HIVE-17561 Move TxnStore and implementations to standalone metastore (Alan Gates, reviewed by Eugene Koifman)
Date Fri, 06 Oct 2017 16:57:49 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
deleted file mode 100644
index f77900d..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ /dev/null
@@ -1,3667 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.commons.dbcp.ConnectionFactory;
-import org.apache.commons.dbcp.DriverManagerConnectionFactory;
-import org.apache.commons.dbcp.PoolableConnectionFactory;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.hive.common.ServerUtils;
-import org.apache.hadoop.hive.common.classification.InterfaceAudience;
-import org.apache.hadoop.hive.common.classification.InterfaceStability;
-import org.apache.hadoop.hive.common.classification.RetrySemantics;
-import org.apache.hadoop.hive.metastore.DatabaseProduct;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.datasource.BoneCPDataSourceProvider;
-import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
-import org.apache.hadoop.hive.metastore.datasource.HikariCPDataSourceProvider;
-import org.apache.hadoop.hive.metastore.metrics.Metrics;
-import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
-import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.dbcp.PoolingDataSource;
-
-import org.apache.commons.pool.impl.GenericObjectPool;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.StringableMap;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConfUtil;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.util.StringUtils;
-
-import javax.sql.DataSource;
-
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.sql.*;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
-/**
- * A handler to answer transaction related calls that come into the metastore
- * server.
- *
- * Note on log messages:  Please include txnid:X and lockid info using
- * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)}
- * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
- * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
- * so keeping the format consistent makes grep'ing the logs much easier.
- *
- * Note on HIVE_LOCKS.hl_last_heartbeat.
- * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
- * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
- * transaction in TXNS.
- *
- * In general there can be multiple metastores where this logic can execute, thus the DB is
- * used to ensure proper mutexing of operations.
- * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is
- * used to properly sequence operations.  Most notably:
- * 1. various sequence IDs are generated with aid of this mutex
- * 2. ensuring that each (Hive) Transaction state is transitioned atomically.  Transaction state
- *  includes its actual state (Open, Aborted) as well as it's lock list/component list.  Thus all
- *  per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row.
- *  This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks.
- * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock
- *  can be granted, no other (strictly speaking "earlier") lock can change state.
- *
- * The exception to his is Derby which doesn't support proper S4U.  Derby is always running embedded
- * (this is the only supported configuration for Derby)
- * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations.
- *
- * {@link #derbyLock}
-
- * If we ever decide to run remote Derby server, according to
- * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be
- * seriazlied, so that would also work though has not been tested.
- *
- * General design note:
- * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is
- * still valid and active.  In the code this is usually achieved at the same time the txn record
- * is locked for some operation.
- * 
- * Note on retry logic:
- * Metastore has retry logic in both {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient}
- * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}.  The retry logic there is very
- * generic and is not aware whether the operations are idempotent or not.  (This is separate from
- * retry logic here in TxnHander which can/does retry DB errors intelligently).  The worst case is
- * when an op here issues a successful commit against the RDBMS but the calling stack doesn't
- * receive the ack and retries.  (If an op fails before commit, it's trivially idempotent)
- * Thus the ops here need to be made idempotent as much as possible or
- * the metstore call stack should have logic not to retry.  There are {@link RetrySemantics}
- * annotations to document the behavior.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
-
-  static final protected char INITIATED_STATE = 'i';
-  static final protected char WORKING_STATE = 'w';
-  static final protected char READY_FOR_CLEANING = 'r';
-  static final char FAILED_STATE = 'f';
-  static final char SUCCEEDED_STATE = 's';
-  static final char ATTEMPTED_STATE = 'a';
-
-  // Compactor types
-  static final protected char MAJOR_TYPE = 'a';
-  static final protected char MINOR_TYPE = 'i';
-
-  // Transaction states
-  static final protected char TXN_ABORTED = 'a';
-  static final protected char TXN_OPEN = 'o';
-  //todo: make these like OperationType and remove above char constatns
-  enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN}
-
-  // Lock states
-  static final protected char LOCK_ACQUIRED = 'a';
-  static final protected char LOCK_WAITING = 'w';
-
-  // Lock types
-  static final protected char LOCK_EXCLUSIVE = 'e';
-  static final protected char LOCK_SHARED = 'r';
-  static final protected char LOCK_SEMI_SHARED = 'w';
-
-  static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
-  static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
-
-  static private DataSource connPool;
-  private static DataSource connPoolMutex;
-  static private boolean doRetryOnConnPool = false;
-  
-  private enum OpertaionType {
-    SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d');
-    private final char sqlConst;
-    OpertaionType(char sqlConst) {
-      this.sqlConst = sqlConst;
-    }
-    public String toString() {
-      return Character.toString(sqlConst);
-    }
-    public static OpertaionType fromString(char sqlConst) {
-      switch (sqlConst) {
-        case 's':
-          return SELECT;
-        case 'i':
-          return INSERT;
-        case 'u':
-          return UPDATE;
-        case 'd':
-          return DELETE;
-        default:
-          throw new IllegalArgumentException(quoteChar(sqlConst));
-      }
-    }
-    public static OpertaionType fromDataOperationType(DataOperationType dop) {
-      switch (dop) {
-        case SELECT:
-          return OpertaionType.SELECT;
-        case INSERT:
-          return OpertaionType.INSERT;
-        case UPDATE:
-          return OpertaionType.UPDATE;
-        case DELETE:
-          return OpertaionType.DELETE;
-        default:
-          throw new IllegalArgumentException("Unexpected value: " + dop);
-      }
-    }
-  }
-
-  // Maximum number of open transactions that's allowed
-  private static volatile int maxOpenTxns = 0;
-  // Whether number of open transactions reaches the threshold
-  private static volatile boolean tooManyOpenTxns = false;
-  // The AcidHouseKeeperService for counting open transactions
-  private static volatile HouseKeeperService openTxnsCounter = null;
-
-  /**
-   * Number of consecutive deadlocks we have seen
-   */
-  private int deadlockCnt;
-  private long deadlockRetryInterval;
-  protected HiveConf conf;
-  private static DatabaseProduct dbProduct;
-  private static SQLGenerator sqlGenerator;
-
-  // (End user) Transaction timeout, in milliseconds.
-  private long timeout;
-
-  private String identifierQuoteString; // quotes to use for quoting tables, where necessary
-  private long retryInterval;
-  private int retryLimit;
-  private int retryNum;
-  // Current number of open txns
-  private AtomicInteger numOpenTxns;
-
-  /**
-   * Derby specific concurrency control
-   */
-  private static final ReentrantLock derbyLock = new ReentrantLock(true);
-  /**
-   * must be static since even in UT there may be > 1 instance of TxnHandler
-   * (e.g. via Compactor services)
-   */
-  private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
-  private static final String hostname = ServerUtils.hostname();
-
-  // Private methods should never catch SQLException and then throw MetaException.  The public
-  // methods depend on SQLException coming back so they can detect and handle deadlocks.  Private
-  // methods should only throw MetaException when they explicitly know there's a logic error and
-  // they want to throw past the public methods.
-  //
-  // All public methods that write to the database have to check for deadlocks when a SQLException
-  // comes back and handle it if they see one.  This has to be done with the connection pooling
-  // in mind.  To do this they should call checkRetryable() AFTER rolling back the db transaction,
-  // and then they should catch RetryException and call themselves recursively. See commitTxn for an example.
-
-  public TxnHandler() {
-  }
-
-  /**
-   * This is logically part of c'tor and must be called prior to any other method.
-   * Not physically part of c'tor due to use of relfection
-   */
-  public void setConf(HiveConf conf) {
-    this.conf = conf;
-
-    checkQFileTestHack();
-
-    synchronized (TxnHandler.class) {
-      if (connPool == null) {
-        //only do this once per JVM; useful for support
-        LOG.info(HiveConfUtil.dumpConfig(conf).toString());
-
-        Connection dbConn = null;
-        // Set up the JDBC connection pool
-        try {
-          int maxPoolSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_MAX_CONNECTIONS);
-          long getConnectionTimeoutMs = 30000;
-          connPool = setupJdbcConnectionPool(conf, maxPoolSize, getConnectionTimeoutMs);
-          /*the mutex pools should ideally be somewhat larger since some operations require 1
-           connection from each pool and we want to avoid taking a connection from primary pool
-           and then blocking because mutex pool is empty.  There is only 1 thread in any HMS trying
-           to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock.  The CheckLock operation gets a
-           connection from connPool first, then connPoolMutex.  All others, go in the opposite
-           order (not very elegant...).  So number of connection requests for connPoolMutex cannot
-           exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
-          connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + MUTEX_KEY.values().length, getConnectionTimeoutMs);
-          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-          determineDatabaseProduct(dbConn);
-          sqlGenerator = new SQLGenerator(dbProduct, conf);
-        } catch (SQLException e) {
-          String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage();
-          LOG.error(msg);
-          throw new RuntimeException(e);
-        } finally {
-          closeDbConn(dbConn);
-        }
-      }
-    }
-
-    numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS);
-
-    timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
-    buildJumpTable();
-    retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL,
-        TimeUnit.MILLISECONDS);
-    retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
-    deadlockRetryInterval = retryInterval / 10;
-    maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS);
-  }
-  @Override
-  @RetrySemantics.ReadOnly
-  public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
-    try {
-      // We need to figure out the current transaction number and the list of
-      // open transactions.  To avoid needing a transaction on the underlying
-      // database we'll look at the current transaction number first.  If it
-      // subsequently shows up in the open list that's ok.
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        /**
-         * This method can run at READ_COMMITTED as long as long as
-         * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
-         * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with
-         * adding corresponding entries into TXNS.  The reason is that any txnid below HWM
-         * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed.
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "select ntxn_next - 1 from NEXT_TXN_ID";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_txn_id");
-        }
-        long hwm = rs.getLong(1);
-        if (rs.wasNull()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, null record found in next_txn_id");
-        }
-        close(rs);
-        List<TxnInfo> txnInfos = new ArrayList<TxnInfo>();
-        //need the WHERE clause below to ensure consistent results with READ_COMMITTED
-        s = "select txn_id, txn_state, txn_user, txn_host, txn_started, txn_last_heartbeat from " +
-            "TXNS where txn_id <= " + hwm;
-        LOG.debug("Going to execute query<" + s + ">");
-        rs = stmt.executeQuery(s);
-        while (rs.next()) {
-          char c = rs.getString(2).charAt(0);
-          TxnState state;
-          switch (c) {
-            case TXN_ABORTED:
-              state = TxnState.ABORTED;
-              break;
-
-            case TXN_OPEN:
-              state = TxnState.OPEN;
-              break;
-
-            default:
-              throw new MetaException("Unexpected transaction state " + c +
-                " found in txns table");
-          }
-          TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4));
-          txnInfo.setStartedTime(rs.getLong(5));
-          txnInfo.setLastHeartbeatTime(rs.getLong(6));
-          txnInfos.add(txnInfo);
-        }
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-        return new GetOpenTxnsInfoResponse(hwm, txnInfos);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "getOpenTxnsInfo");
-        throw new MetaException("Unable to select from transaction database: " + getMessage(e)
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return getOpenTxnsInfo();
-    }
-  }
-  @Override
-  @RetrySemantics.ReadOnly
-  public GetOpenTxnsResponse getOpenTxns() throws MetaException {
-    try {
-      // We need to figure out the current transaction number and the list of
-      // open transactions.  To avoid needing a transaction on the underlying
-      // database we'll look at the current transaction number first.  If it
-      // subsequently shows up in the open list that's ok.
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        /**
-         * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "select ntxn_next - 1 from NEXT_TXN_ID";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_txn_id");
-        }
-        long hwm = rs.getLong(1);
-        if (rs.wasNull()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, null record found in next_txn_id");
-        }
-        close(rs);
-        List<Long> openList = new ArrayList<Long>();
-        //need the WHERE clause below to ensure consistent results with READ_COMMITTED
-        s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm + " order by txn_id";
-        LOG.debug("Going to execute query<" + s + ">");
-        rs = stmt.executeQuery(s);
-        long minOpenTxn = Long.MAX_VALUE;
-        BitSet abortedBits = new BitSet();
-        while (rs.next()) {
-          long txnId = rs.getLong(1);
-          openList.add(txnId);
-          char c = rs.getString(2).charAt(0);
-          if(c == TXN_OPEN) {
-            minOpenTxn = Math.min(minOpenTxn, txnId);
-          } else if (c == TXN_ABORTED) {
-            abortedBits.set(openList.size() - 1);
-          }
-        }
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-        ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
-        GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer);
-        if(minOpenTxn < Long.MAX_VALUE) {
-          otr.setMin_open_txn(minOpenTxn);
-        }
-        return otr;
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "getOpenTxns");
-        throw new MetaException("Unable to select from transaction database, "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return getOpenTxns();
-    }
-  }
-
-  private static void startHouseKeeperService(HiveConf conf, Class c){
-    try {
-      openTxnsCounter = (HouseKeeperService)c.newInstance();
-      openTxnsCounter.start(conf);
-    } catch (Exception ex) {
-      LOG.error("Failed to start {}" , openTxnsCounter.getClass() +
-              ".  The system will not handle {} " , openTxnsCounter.getServiceDescription(),
-          ".  Root Cause: ", ex);
-    }
-  }
-
-  /**
-   * Retry-by-caller note:
-   * Worst case, it will leave an open txn which will timeout.
-   */
-  @Override
-  @RetrySemantics.Idempotent
-  public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
-    if (openTxnsCounter == null) {
-      synchronized (TxnHandler.class) {
-        try {
-          if (openTxnsCounter == null) {
-            startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService"));
-          }
-        } catch (ClassNotFoundException e) {
-          throw new MetaException(e.getMessage());
-        }
-      }
-    }
-
-    if (!tooManyOpenTxns && numOpenTxns.get() >= maxOpenTxns) {
-      tooManyOpenTxns = true;
-    }
-    if (tooManyOpenTxns) {
-      if (numOpenTxns.get() < maxOpenTxns * 0.9) {
-        tooManyOpenTxns = false;
-      } else {
-        LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " +
-            "reached. Current number of open transactions: " + numOpenTxns);
-        throw new MetaException("Maximum allowed number of open transactions has been reached. " +
-            "See hive.max.open.txns.");
-      }
-    }
-
-    int numTxns = rqst.getNum_txns();
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        lockInternal();
-        /**
-         * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure
-         * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic.
-         * Also, advancing the counter must work when multiple metastores are running.
-         * SELECT ... FOR UPDATE is used to prevent
-         * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID.
-         *
-         * In the current design, there can be several metastore instances running in a given Warehouse.
-         * This makes ideas like reserving a range of IDs to save trips to DB impossible.  For example,
-         * a client may go to MS1 and start a transaction with ID 500 to update a particular row.
-         * Now the same client will start another transaction, except it ends up on MS2 and may get
-         * transaction ID 400 and update the same row.  Now the merge that happens to materialize the snapshot
-         * on read will thing the version of the row from transaction ID 500 is the latest one.
-         *
-         * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations).  This
-         * set could support a write-through cache for added performance.
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        // Make sure the user has not requested an insane amount of txns.
-        int maxTxns = HiveConf.getIntVar(conf,
-          HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
-        if (numTxns > maxTxns) numTxns = maxTxns;
-
-        stmt = dbConn.createStatement();
-        String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction database not properly " +
-            "configured, can't find next transaction id.");
-        }
-        long first = rs.getLong(1);
-        s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-
-        long now = getDbTime(dbConn);
-        List<Long> txnIds = new ArrayList<Long>(numTxns);
-
-        List<String> rows = new ArrayList<>();
-        for (long i = first; i < first + numTxns; i++) {
-          txnIds.add(i);
-          rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()));
-        }
-        List<String> queries = sqlGenerator.createInsertValuesStmt(
-          "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host)", rows);
-        for (String q : queries) {
-          LOG.debug("Going to execute update <" + q + ">");
-          stmt.execute(q);
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return new OpenTxnsResponse(txnIds);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      return openTxns(rqst);
-    }
-  }
-  @Override
-  @RetrySemantics.Idempotent
-  public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
-    long txnid = rqst.getTxnid();
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
-          stmt = dbConn.createStatement();
-          TxnStatus status = findTxnState(txnid,stmt);
-          if(status == TxnStatus.ABORTED) {
-            LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) +
-              ") requested by it is already " + TxnStatus.ABORTED);
-            return;
-          }
-          raiseTxnUnexpectedState(status, txnid);
-        }
-
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "abortTxn(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      abortTxn(rqst);
-    }
-  }
-  @Override
-  @RetrySemantics.Idempotent
-  public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException {
-    List<Long> txnids = rqst.getTxn_ids();
-    try {
-      Connection dbConn = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        int numAborted = abortTxns(dbConn, txnids, false);
-        if (numAborted != txnids.size()) {
-          LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " +
-              txnids.size() + " transactions. It's possible that the other " +
-              (txnids.size() - numAborted) +
-              " transactions have been aborted or committed, or the transaction ids are invalid.");
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "abortTxns(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-            + StringUtils.stringifyException(e));
-      } finally {
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      abortTxns(rqst);
-    }
-  }
-
-  /**
-   * Concurrency/isolation notes:
-   * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
-   * operations using select4update on NEXT_TXN_ID.  Also, mutexes on TXNX table for specific txnid:X
-   * see more notes below.
-   * In order to prevent lost updates, we need to determine if any 2 transactions overlap.  Each txn
-   * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
-   * so that we can compare commit time of txn T with start time of txn S.  This sequence can be thought of
-   * as a logical time counter.  If S.commitTime < T.startTime, T and S do NOT overlap.
-   *
-   * Motivating example:
-   * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
-   * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
-   * that they read appropriately.  In particular, if txns do not overlap, then one follows the other
-   * (assumig they write the same entity), and thus the 2nd must see changes of the 1st.  We ensure
-   * this by locking in snapshot after 
-   * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()})
-   * and mutexing openTxn() with commit().  In other words, once a S.commit() starts we must ensure
-   * that txn T which will be considered a later txn, locks in a snapshot that includes the result
-   * of S's commit (assuming no other txns).
-   * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
-   * were running in parallel).  If T and S both locked in the same snapshot (for example commit of
-   * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
-   * 'x' would be updated to the same value by both, i.e. lost update. 
-   */
-  @Override
-  @RetrySemantics.Idempotent("No-op if already committed")
-  public void commitTxn(CommitTxnRequest rqst)
-    throws NoSuchTxnException, TxnAbortedException,  MetaException {
-    long txnid = rqst.getTxnid();
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet lockHandle = null;
-      ResultSet commitIdRs = null, rs;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        /**
-         * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U ensures that no other
-         * operation can change this txn (such acquiring locks). While lock() and commitTxn()
-         * should not normally run concurrently (for same txn) but could due to bugs in the client
-         * which could then corrupt internal transaction manager state.  Also competes with abortTxn().
-         */
-        lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
-        if (lockHandle == null) {
-          //if here, txn was not found (in expected state)
-          TxnStatus actualTxnStatus = findTxnState(txnid, stmt);
-          if(actualTxnStatus == TxnStatus.COMMITTED) {
-            /**
-             * This makes the operation idempotent
-             * (assume that this is most likely due to retry logic)
-             */
-            LOG.info("Nth commitTxn(" + JavaUtils.txnIdToString(txnid) + ") msg");
-            return;
-          }
-          raiseTxnUnexpectedState(actualTxnStatus, txnid);
-          shouldNeverHappen(txnid);
-          //dbConn is rolled back in finally{}
-        }
-        String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" +
-          quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")";
-        rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix));
-        if (rs.next()) {
-          close(rs);
-          //if here it means currently committing txn performed update/delete and we should check WW conflict
-          /**
-           * This S4U will mutex with other commitTxn() and openTxns(). 
-           * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
-           * Note: it's possible to have several txns have the same commit id.  Suppose 3 txns start
-           * at the same time and no new txns start until all 3 commit.
-           * We could've incremented the sequence for commitId is well but it doesn't add anything functionally.
-           */
-          commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID"));
-          if (!commitIdRs.next()) {
-            throw new IllegalStateException("No rows found in NEXT_TXN_ID");
-          }
-          long commitId = commitIdRs.getLong(1);
-          Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
-          /**
-           * "select distinct" is used below because
-           * 1. once we get to multi-statement txns, we only care to record that something was updated once
-           * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it my create
-           *  duplicate entries in TXN_COMPONENTS
-           * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct
-           * even if it includes all of it's columns
-           */
-          int numCompsWritten = stmt.executeUpdate(
-            "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
-            " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix);
-          /**
-           * see if there are any overlapping txns wrote the same element, i.e. have a conflict
-           * Since entire commit operation is mutexed wrt other start/commit ops,
-           * committed.ws_commit_id <= current.ws_commit_id for all txns
-           * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap
-           * For example, [17,20] is committed, [6,80] is being committed right now - these overlap
-           * [17,20] committed and [21,21] committing now - these do not overlap.
-           * [17,18] committed and [18,19] committing now - these overlap  (here 18 started while 17 was still running)
-           */
-          rs = stmt.executeQuery
-            (sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
-              "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " +
-              "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " +
-              "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
-              "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
-              //For partitioned table we always track writes at partition level (never at table)
-              //and for non partitioned - always at table level, thus the same table should never
-              //have entries with partition key and w/o
-              "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " +
-              "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid
-              // with txnid, though any decent DB should infer this
-              " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
-              // part of this commitTxn() op
-              " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
-              //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
-              " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) +
-              " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")"));
-          if (rs.next()) {
-            //found a conflict
-            String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
-            StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
-            String partitionName = rs.getString(5);
-            if (partitionName != null) {
-              resource.append('/').append(partitionName);
-            }
-            String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
-              " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8);
-            close(rs);
-            //remove WRITE_SET info for current txn since it's about to abort
-            dbConn.rollback(undoWriteSetForCurrentTxn);
-            LOG.info(msg);
-            //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this
-            if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
-              throw new IllegalStateException(msg + " FAILED!");
-            }
-            dbConn.commit();
-            close(null, stmt, dbConn);
-            throw new TxnAbortedException(msg);
-          } else {
-            //no conflicting operations, proceed with the rest of commit sequence
-          }
-        }
-        else {
-          /**
-           * current txn didn't update/delete anything (may have inserted), so just proceed with commit
-           *
-           * We only care about commit id for write txns, so for RO (when supported) txns we don't
-           * have to mutex on NEXT_TXN_ID.
-           * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
-           * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
-           * If RO < W, then there is no reads-from relationship.
-           */
-        }
-        // Move the record from txn_components into completed_txn_components so that the compactor
-        // knows where to look to compact.
-        String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
-          "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
-        LOG.debug("Going to execute insert <" + s + ">");
-        int modCount = 0;
-        if ((modCount = stmt.executeUpdate(s)) < 1) {
-          //this can be reasonable for an empty txn START/COMMIT or read-only txn
-          //also an IUD with DP that didn't match any rows.
-          LOG.info("Expected to move at least one record from txn_components to " +
-            "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
-        }
-        s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
-        s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
-        s = "delete from TXNS where txn_id = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "commitTxn(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(commitIdRs);
-        close(lockHandle, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      commitTxn(rqst);
-    }
-  }
-  @Override
-  @RetrySemantics.SafeToRetry
-  public void performWriteSetGC() {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
-      rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
-      if(!rs.next()) {
-        throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
-      }
-      long highestAllocatedTxnId = rs.getLong(1);
-      close(rs);
-      rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN));
-      if(!rs.next()) {
-        throw new IllegalStateException("Scalar query returned no rows?!?!!");
-      }
-      long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
-      long lowestOpenTxnId = rs.getLong(1);
-      if(rs.wasNull()) {
-        //if here then there are no Open txns and  highestAllocatedTxnId must be
-        //resolved (i.e. committed or aborted), either way
-        //there are no open txns with id <= highestAllocatedTxnId
-        //the +1 is there because "delete ..." below has < (which is correct for the case when
-        //there is an open txn
-        //Concurrency: even if new txn starts (or starts + commits) it is still true that
-        //there are no currently open txns that overlap with any committed txn with 
-        //commitId <= commitHighWaterMark (as set on next line).  So plain READ_COMMITTED is enough.
-        commitHighWaterMark = highestAllocatedTxnId + 1;
-      }
-      else {
-        commitHighWaterMark = lowestOpenTxnId;
-      }
-      int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
-      LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
-      dbConn.commit();
-    } catch (SQLException ex) {
-      LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
-    }
-    finally {
-      close(rs, stmt, dbConn);
-    }
-  }
-  /**
-   * As much as possible (i.e. in absence of retries) we want both operations to be done on the same
-   * connection (but separate transactions).  This avoid some flakiness in BONECP where if you
-   * perform an operation on 1 connection and immediately get another from the pool, the 2nd one
-   * doesn't see results of the first.
-   * 
-   * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case
-   * there will be a duplicate set of locks but both sets will belong to the same txn so they 
-   * will not conflict with each other.  For locks w/o txn context (i.e. read-only query), this
-   * may lead to deadlock (at least a long wait).  (e.g. 1st call creates locks in {@code LOCK_WAITING}
-   * mode and response gets lost.  Then {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient}
-   * retries, and enqueues another set of locks in LOCK_WAITING.  The 2nd LockResponse is delivered
-   * to the DbLockManager, which will keep dong {@link #checkLock(CheckLockRequest)} until the 1st
-   * set of locks times out.
-   */
-  @RetrySemantics.CannotRetry
-  public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException {
-    ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst);
-    try {
-      return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid());
-    }
-    catch(NoSuchLockException e) {
-      // This should never happen, as we just added the lock id
-      throw new MetaException("Couldn't find a lock we just created! " + e.getMessage());
-    }
-  }
-  private static final class ConnectionLockIdPair {
-    private final Connection dbConn;
-    private final long extLockId;
-    private ConnectionLockIdPair(Connection dbConn, long extLockId) {
-      this.dbConn = dbConn;
-      this.extLockId = extLockId;
-    }
-  }
-
-  /**
-   * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read
-   * and then executeUpdate().  One other alternative would be to actually update the row in TXNS but
-   * to the same value as before thus forcing db to acquire write lock for duration of the transaction.
-   *
-   * There is no real reason to return the ResultSet here other than to make sure the reference to it
-   * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock
-   * to be released.
-   * @param txnState the state this txn is expected to be in.  may be null
-   * @return null if no row was found
-   * @throws SQLException
-   * @throws MetaException
-   */
-  private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
-    String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
-    ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query));
-    if(rs.next()) {
-      return rs;
-    }
-    close(rs);
-    return null;
-  }
-
-  /**
-   * This enters locks into the queue in {@link #LOCK_WAITING} mode.
-   *
-   * Isolation Level Notes:
-   * 1. We use S4U (withe read_committed) to generate the next (ext) lock id.  This serializes
-   * any 2 {@code enqueueLockWithRetry()} calls.
-   * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations
-   * @see #checkLockWithRetry(Connection, long, long)
-   */
-  private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException {
-    boolean success = false;
-    Connection dbConn = null;
-    try {
-      Statement stmt = null;
-      ResultSet rs = null;
-      ResultSet lockHandle = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        long txnid = rqst.getTxnid();
-        stmt = dbConn.createStatement();
-        if (isValidTxn(txnid)) {
-          //this also ensures that txn is still there in expected state
-          lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
-          if(lockHandle == null) {
-            ensureValidTxn(dbConn, txnid, stmt);
-            shouldNeverHappen(txnid);
-          }
-        }
-        /** Get the next lock id.
-         * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race.
-         * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running.  1st one generates nl_next=7,
-         * 2nd nl_next=8.  Then 8 goes first to insert into HIVE_LOCKS and acquires the locks.  Then 7 unblocks,
-         * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)}
-         * doesn't block on locks acquired later than one it's checking*/
-        String s = sqlGenerator.addForUpdateClause("select nl_next from NEXT_LOCK_ID");
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_lock_id");
-        }
-        long extLockId = rs.getLong(1);
-        s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-
-        if (txnid > 0) {
-          List<String> rows = new ArrayList<>();
-          // For each component in this lock request,
-          // add an entry to the txn_components table
-          for (LockComponent lc : rqst.getComponent()) {
-            if(lc.isSetIsAcid() && !lc.isIsAcid()) {
-              //we don't prevent using non-acid resources in a txn but we do lock them
-              continue;
-            }
-            boolean updateTxnComponents;
-            if(!lc.isSetOperationType()) {
-              //request came from old version of the client
-              updateTxnComponents = true;//this matches old behavior
-            }
-            else {
-              switch (lc.getOperationType()) {
-                case INSERT:
-                case UPDATE:
-                case DELETE:
-                  if(!lc.isSetIsDynamicPartitionWrite()) {
-                    //must be old client talking, i.e. we don't know if it's DP so be conservative
-                    updateTxnComponents = true;
-                  }
-                  else {
-                    /**
-                     * we know this is part of DP operation and so we'll get
-                     * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
-                     * of partitions actually chaged.
-                     */
-                    updateTxnComponents = !lc.isIsDynamicPartitionWrite();
-                  }
-                  break;
-                case SELECT:
-                  updateTxnComponents = false;
-                  break;
-                case NO_TXN:
-                  /*this constant is a bit of a misnomer since we now always have a txn context.  It
-                   just means the operation is such that we don't care what tables/partitions it
-                   affected as it doesn't trigger a compaction or conflict detection.  A better name
-                   would be NON_TRANSACTIONAL.*/
-                  updateTxnComponents = false;
-                  break;
-                default:
-                  //since we have an open transaction, only 4 values above are expected 
-                  throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
-                    + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid));
-              }
-            }
-            if(!updateTxnComponents) {
-              continue;
-            }
-            String dbName = lc.getDbname();
-            String tblName = lc.getTablename();
-            String partName = lc.getPartitionname();
-            rows.add(txnid + ", '" + dbName + "', " +
-              (tblName == null ? "null" : "'" + tblName + "'") + ", " +
-              (partName == null ? "null" : "'" + partName + "'")+ "," +
-              quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString()));
-          }
-          List<String> queries = sqlGenerator.createInsertValuesStmt(
-            "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
-          for(String query : queries) {
-            LOG.debug("Going to execute update <" + query + ">");
-            int modCount = stmt.executeUpdate(query);
-          }
-        }
-
-        List<String> rows = new ArrayList<>();
-        long intLockId = 0;
-        for (LockComponent lc : rqst.getComponent()) {
-          if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
-            (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEZ_TEST))) {
-            //old version of thrift client should have (lc.isSetOperationType() == false) but they do not
-            //If you add a default value to a variable, isSet() for that variable is true regardless of the where the
-            //message was created (for object variables.  It works correctly for boolean vars, e.g. LockComponent.isAcid).
-            //in test mode, upgrades are not tested, so client version and server version of thrift always matches so
-            //we see UNSET here it means something didn't set the appropriate value.
-            throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component "
-              + lc + " agentInfo=" + rqst.getAgentInfo());
-          }
-          intLockId++;
-          String dbName = lc.getDbname();
-          String tblName = lc.getTablename();
-          String partName = lc.getPartitionname();
-          LockType lockType = lc.getType();
-          char lockChar = 'z';
-          switch (lockType) {
-            case EXCLUSIVE:
-              lockChar = LOCK_EXCLUSIVE;
-              break;
-            case SHARED_READ:
-              lockChar = LOCK_SHARED;
-              break;
-            case SHARED_WRITE:
-              lockChar = LOCK_SEMI_SHARED;
-              break;
-          }
-          long now = getDbTime(dbConn);
-            rows.add(extLockId + ", " + intLockId + "," + txnid + ", " +
-            quoteString(dbName) + ", " +
-            valueOrNullLiteral(tblName) + ", " +
-            valueOrNullLiteral(partName) + ", " +
-            quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
-            //for locks associated with a txn, we always heartbeat txn and timeout based on that
-            (isValidTxn(txnid) ? 0 : now) + ", " +
-            valueOrNullLiteral(rqst.getUser()) + ", " +
-            valueOrNullLiteral(rqst.getHostname()) + ", " +
-            valueOrNullLiteral(rqst.getAgentInfo()));// + ")";
-        }
-        List<String> queries = sqlGenerator.createInsertValuesStmt(
-          "HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, " +
-            "hl_table, hl_partition,hl_lock_state, hl_lock_type, " +
-            "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows);
-        for(String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          int modCount = stmt.executeUpdate(query);
-        }
-        dbConn.commit();
-        success = true;
-        return new ConnectionLockIdPair(dbConn, extLockId);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        close(lockHandle);
-        close(rs, stmt, null);
-        if (!success) {
-          /* This needs to return a "live" connection to be used by operation that follows it.
-          Thus it only closes Connection on failure/retry. */
-          closeDbConn(dbConn);
-        }
-        unlockInternal();
-      }
-    }
-    catch(RetryException e) {
-      return enqueueLockWithRetry(rqst);
-    }
-  }
-  private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId)
-    throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException {
-    try {
-      try {
-        lockInternal();
-        if(dbConn.isClosed()) {
-          //should only get here if retrying this op
-          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        }
-        return checkLock(dbConn, extLockId);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")");
-        throw new MetaException("Unable to update transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        unlockInternal();
-        closeDbConn(dbConn);
-      }
-    }
-    catch(RetryException e) {
-      return checkLockWithRetry(dbConn, extLockId, txnId);
-    }
-  }
-  /**
-   * Why doesn't this get a txnid as parameter?  The caller should either know the txnid or know there isn't one.
-   * Either way getTxnIdFromLockId() will not be needed.  This would be a Thrift change.
-   *
-   * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(),
-   * in practice more often)
-   * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB.
-   *
-   * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired.
-   * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change.
-   *
-   * {@link #checkLock(java.sql.Connection, long)}  must run at SERIALIZABLE (make sure some lock we are checking
-   * against doesn't move from W to A in another txn) but this method can heartbeat in
-   * separate txn at READ_COMMITTED.
-   * 
-   * Retry-by-caller note:
-   * Retryable because {@link #checkLock(Connection, long)} is
-   */
-  @Override
-  @RetrySemantics.SafeToRetry
-  public LockResponse checkLock(CheckLockRequest rqst)
-    throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
-    try {
-      Connection dbConn = null;
-      long extLockId = rqst.getLockid();
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        // Heartbeat on the lockid first, to assure that our lock is still valid.
-        // Then look up the lock info (hopefully in the cache).  If these locks
-        // are associated with a transaction then heartbeat on that as well.
-        LockInfo info = getTxnIdFromLockId(dbConn, extLockId);
-        if(info == null) {
-          throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
-        }
-        if (info.txnId > 0) {
-          heartbeatTxn(dbConn, info.txnId);
-        }
-        else {
-          heartbeatLock(dbConn, extLockId);
-        }
-        //todo: strictly speaking there is a bug here.  heartbeat*() commits but both heartbeat and
-        //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired
-        //extra heartbeat is logically harmless, but ...
-        return checkLock(dbConn, extLockId);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "checkLock(" + rqst + " )");
-        throw new MetaException("Unable to update transaction database " +
-          JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
-      } finally {
-        closeDbConn(dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      return checkLock(rqst);
-    }
-
-  }
-
-  /**
-   * This would have been made simpler if all locks were associated with a txn.  Then only txn needs to
-   * be heartbeated, committed, etc.  no need for client to track individual locks.
-   * When removing locks not associated with txn this potentially conflicts with
-   * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db.
-   * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed
-   */
-  @RetrySemantics.Idempotent
-  public void unlock(UnlockRequest rqst)
-    throws NoSuchLockException, TxnOpenException, MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      long extLockId = rqst.getLockid();
-      try {
-        /**
-         * This method is logically like commit for read-only auto commit queries.
-         * READ_COMMITTED since this only has 1 delete statement and no new entries with the
-         * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are
-         * created in a single atomic operation.
-         * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)}
-         * but hl_lock_ext_id is not known until that method returns.
-         * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}
-         * but using SERIALIZABLE doesn't materially change the interaction.
-         * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg.
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        //hl_txnid <> 0 means it's associated with a transaction
-        String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" +
-          " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))";
-        //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for multi-statement txns where
-        //some query attempted to lock (thus LOCK_WAITING state) but is giving up due to timeout for example
-        LOG.debug("Going to execute update <" + s + ">");
-        int rc = stmt.executeUpdate(s);
-        if (rc < 1) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-          LockInfo info = getTxnIdFromLockId(dbConn, extLockId);
-          if(info == null) {
-            //didn't find any lock with extLockId but at ReadCommitted there is a possibility that
-            //it existed when above delete ran but it didn't have the expected state.
-            LOG.info("No lock in " + LOCK_WAITING + " mode found for unlock(" +
-              JavaUtils.lockIdToString(rqst.getLockid()) + ")");
-            //bail here to make the operation idempotent
-            return;
-          }
-          if(info.txnId != 0) {
-            String msg = "Unlocking locks associated with transaction not permitted.  " + info;
-            //if a lock is associated with a txn we can only "unlock" if if it's in WAITING state
-            // which really means that the caller wants to give up waiting for the lock
-            LOG.error(msg);
-            throw new TxnOpenException(msg);
-          }
-          if(info.txnId == 0) {
-            //we didn't see this lock when running DELETE stmt above but now it showed up
-            //so should "should never happen" happened...
-            String msg = "Found lock in unexpected state " + info;
-            LOG.error(msg);
-            throw new MetaException(msg);
-          }
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "unlock(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database " +
-          JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      unlock(rqst);
-    }
-  }
-
-  /**
-   * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse}
-   */
-  private static class LockInfoExt extends LockInfo {
-    private final ShowLocksResponseElement e;
-    LockInfoExt(ShowLocksResponseElement e) {
-      super(e);
-      this.e = e;
-    }
-  }
-  @RetrySemantics.ReadOnly
-  public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
-    try {
-      Connection dbConn = null;
-      ShowLocksResponse rsp = new ShowLocksResponse();
-      List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
-      List<LockInfoExt> sortedList = new ArrayList<LockInfoExt>();
-      Statement stmt = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
-          "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," +
-          "hl_blockedby_ext_id, hl_blockedby_int_id, hl_agent_info from HIVE_LOCKS";
-
-        // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query.
-        String dbName = rqst.getDbname();
-        String tableName = rqst.getTablename();
-        String partName = rqst.getPartname();
-
-        StringBuilder filter = new StringBuilder();
-        if (dbName != null && !dbName.isEmpty()) {
-          filter.append("hl_db=").append(quoteString(dbName));
-        }
-        if (tableName != null && !tableName.isEmpty()) {
-          if (filter.length() > 0) {
-            filter.append(" and ");
-          }
-          filter.append("hl_table=").append(quoteString(tableName));
-        }
-        if (partName != null && !partName.isEmpty()) {
-          if (filter.length() > 0) {
-            filter.append(" and ");
-          }
-          filter.append("hl_partition=").append(quoteString(partName));
-        }
-        String whereClause = filter.toString();
-
-        if (!whereClause.isEmpty()) {
-          s = s + " where " + whereClause;
-        }
-
-        LOG.debug("Doing to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
-        while (rs.next()) {
-          ShowLocksResponseElement e = new ShowLocksResponseElement();
-          e.setLockid(rs.getLong(1));
-          long txnid = rs.getLong(2);
-          if (!rs.wasNull()) e.setTxnid(txnid);
-          e.setDbname(rs.getString(3));
-          e.setTablename(rs.getString(4));
-          String partition = rs.getString(5);
-          if (partition != null) e.setPartname(partition);
-          switch (rs.getString(6).charAt(0)) {
-            case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break;
-            case LOCK_WAITING: e.setState(LockState.WAITING); break;
-            default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0));
-          }
-          switch (rs.getString(7).charAt(0)) {
-            case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break;
-            case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break;
-            case LOCK_SHARED: e.setType(LockType.SHARED_READ); break;
-            default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0));
-          }
-          e.setLastheartbeat(rs.getLong(8));
-          long acquiredAt = rs.getLong(9);
-          if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
-          e.setUser(rs.getString(10));
-          e.setHostname(rs.getString(11));
-          e.setLockIdInternal(rs.getLong(12));
-          long id = rs.getLong(13);
-          if(!rs.wasNull()) {
-            e.setBlockedByExtId(id);
-          }
-          id = rs.getLong(14);
-          if(!rs.wasNull()) {
-            e.setBlockedByIntId(id);
-          }
-          e.setAgentInfo(rs.getString(15));
-          sortedList.add(new LockInfoExt(e));
-        }
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-      } catch (SQLException e) {
-        checkRetryable(dbConn, e, "showLocks(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-      }
-      //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined
-      //by checkLock() - makes diagnostics easier.
-      Collections.sort(sortedList, new LockInfoComparator());
-      for(LockInfoExt lockInfoExt : sortedList) {
-        elems.add(lockInfoExt.e);
-      }
-      rsp.setLocks(elems);
-      return rsp;
-    } catch (RetryException e) {
-      return showLocks(rqst);
-    }
-  }
-
-  /**
-   * {@code ids} should only have txnid or lockid but not both, ideally.
-   * Currently DBTxnManager.heartbeat() enforces this.
-   */
-  @Override
-  @RetrySemantics.SafeToRetry
-  public void heartbeat(HeartbeatRequest ids)
-    throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, MetaException {
-    try {
-      Connection dbConn = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        heartbeatLock(dbConn, ids.getLockid());
-        heartbeatTxn(dbConn, ids.getTxnid());
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "heartbeat(" + ids + ")");
-        throw new MetaException("Unable to select from transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      heartbeat(ids);
-    }
-  }
-  @Override
-  @RetrySemantics.SafeToRetry
-  public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
-    throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse();
-      Set<Long> nosuch = new HashSet<Long>();
-      Set<Long> aborted = new HashSet<Long>();
-      rsp.setNosuch(nosuch);
-      rsp.setAborted(aborted);
-      try {
-        /**
-         * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)}
-         * only has 1 update statement in it and
-         * we only update existing txns, i.e. nothing can add additional txns that this operation
-         * would care about (which would have required SERIALIZABLE)
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        /*do fast path first (in 1 statement) if doesn't work, rollback and do the long version*/
-        stmt = dbConn.createStatement();
-        List<String> queries = new ArrayList<>();
-        int numTxnsToHeartbeat = (int) (rqst.getMax() - rqst.getMin() + 1);
-        List<Long> txnIds = new ArrayList<>(numTxnsToHeartbeat);
-        for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
-          txnIds.add(txn);
-        }
-        TxnUtils.buildQueryWithINClause(conf, queries,
-          new StringBuilder("update TXNS set txn_last_heartbeat = " + getDbTime(dbConn) +
-            " where txn_state = " + quoteChar(TXN_OPEN) + " and "),
-          new StringBuilder(""), txnIds, "txn_id", true, false);
-        int updateCnt = 0;
-        for (String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          updateCnt += stmt.executeUpdate(query);
-        }
-        if (updateCnt == numTxnsToHeartbeat) {
-          //fast pass worked, i.e. all txns we were asked to heartbeat were Open as expected
-          dbConn.commit();
-          return rsp;
-        }
-        //if here, do the slow path so that we can return info txns which were not in expected state
-        dbConn.rollback();
-        for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
-          try {
-            heartbeatTxn(dbConn, txn);
-          } catch (NoSuchTxnException e) {
-            nosuch.add(txn);
-          } catch (TxnAbortedException e) {
-            aborted.add(txn);
-          }
-        }
-        return rsp;
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "heartbeatTxnRange(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return heartbeatTxnRange(rqst);
-    }
-  }
-
-  long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException {
-    // Get the id for the next entry in the queue
-    String s = sqlGenerator.addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID");
-    LOG.debug("going to execute query <" + s + ">");
-    try (ResultSet rs = stmt.executeQuery(s)) {
-      if (!rs.next()) {
-        throw new IllegalStateException("Transaction tables not properly initiated, "
-            + "no record found in next_compaction_queue_id");
-      }
-      long id = rs.getLong(1);
-      s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
-      LOG.debug("Going to execute update <" + s + ">");
-      stmt.executeUpdate(s);
-      return id;
-    }
-  }
-  @Override
-  @RetrySemantics.Idempotent
-  public CompactionResponse compact(CompactionRequest rqst) throws MetaException {
-    // Put a compaction request in the queue.
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      TxnStore.MutexAPI.LockHandle handle = null;
-      try {
-        lockInternal();
-        /**
-         * MUTEX_KEY.CompactionScheduler lock ensures that there is only 1 entry in
-         * Initiated/Working state for any resource.  This ensures that we don't run concurrent
-         * compactions for any resource.
-         */
-        handle = getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name());
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        long id = generateCompactionQueueId(stmt);
-
-        StringBuilder sb = new StringBuilder("select cq_id, cq_state from COMPACTION_QUEUE where").
-          append(" cq_state IN(").append(quoteChar(INITIATED_STATE)).
-            append(",").append(quoteChar(WORKING_STATE)).
-          append(") AND cq_database=").append(quoteString(rqst.getDbname())).
-          append(" AND cq_table=").append(quoteString(rqst.getTablename())).append(" AND ");
-        if(rqst.getPartitionname() == null) {
-          sb.append("cq_partition is null");
-        }
-        else {
-          sb.append("cq_partition=").append(quoteString(rqst.getPartitionname()));
-        }
-
-        LOG.debug("Going to execute query <" + sb.toString() + ">");
-        ResultSet rs = stmt.executeQuery(sb.toString());
-        if(rs.next()) {
-          long enqueuedId = rs.getLong(1);
-          String state = compactorStateToResponse(rs.getString(2).charAt(0));
-          LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" + rqst.getTablename() +
-            "/" + rqst.getPartitionname() + " since it is already " + quoteString(state) +
-            " with id=" + enqueuedId);
-          return new CompactionResponse(enqueuedId, state, false);
-        }
-        close(rs);
-        StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
-          "cq_table, ");
-        String partName = rqst.getPartitionname();
-        if (partName != null) buf.append("cq_partition, ");
-        buf.append("cq_state, cq_type");
-        if (rqst.getProperties() != null) {
-          buf.append(", cq_tblproperties");
-        }
-        if (rqst.getRunas() != null) buf.append(", cq_run_as");
-        buf.append(") values (");
-        buf.append(id);
-        buf.append(", '");
-        buf.append(rqst.getDbname());
-        buf.append("', '");
-        buf.append(rqst.getTablename());
-        buf.append("', '");
-        if (partName != null) {
-          buf.append(partName);
-          buf.append("', '");
-        }
-        buf.append(INITIATED_STATE);
-        buf.append("', '");
-        switch (rqst.getType()) {
-          case MAJOR:
-            buf.append(MAJOR_TYPE);
-            break;
-
-          case MINOR:
-            buf.append(MINOR_TYPE);
-            break;
-
-          default:
-            LOG.debug("Going to rollback");
-            dbConn.rollback();
-            throw new MetaException("Unexpected compaction type " + rqst.getType().toString());
-        }
-        if (rqst.getProperties() != null) {
-          buf.append("', '");
-          buf.append(new StringableMap(rqst.getProperties()).toString());
-        }
-        if (rqst.getRunas() != null) {
-          buf.append("', '");
-          buf.append(rqst.getRunas());
-        }
-        buf.append("')");
-        String s = buf.toString();
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return new CompactionResponse(id, INITIATED_RESPONSE, true);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "compact(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-        if(handle != null) {
-          handle.releaseLocks();
-        }
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      return compact(rqst);
-    }
-  }
-
-  private static String compactorStateToResponse(char s) {
-    switch (s) {
-      case INITIATED_STATE: return INITIATED_RESPONSE;
-      case WORKING_STATE: return WORKING_RESPONSE;
-      case READY_FOR_CLEANING: return CLEANING_RESPONSE;
-      case FAILED_STATE: return FAILED_RESPONSE;
-      case SUCCEEDED_STATE: return SUCCEEDED_RESPONSE;
-      case ATTEMPTED_STATE: return ATTEMPTED_RESPONSE;
-      default:
-        return Character.toString(s);
-    }
-  }
-  @RetrySemantics.ReadOnly
-  public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
-    ShowCompactResponse response = new ShowCompactResponse(new ArrayList<ShowCompactResponseElement>());
-    Connection dbConn = null;
-    Statement stmt = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
-          //-1 because 'null' literal doesn't work for all DBs...
-          "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
-          "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
-          "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
-        //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013)
-        //to sort so that currently running jobs are at the end of the list (bottom of screen)
-        //and currently running ones are in sorted by start time
-        //w/o order by likely currently running compactions will be first (LHS of Union)
-        LOG.debug("Going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
-        while (rs.next()) {
-          ShowCompactResponseElement e = new ShowCompactResponseElement();
-          e.setDbname(rs.getString(1));
-          e.setTablename(rs.getString(2));
-          e.setPartitionname(rs.getString(3));
-          e.setState(compactorStateToResponse(rs.getString(4).charAt(0)));
-          switch (rs.getString(5).charAt(0)) {
-            case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
-            case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
-            default:
-              //do nothing to handle RU/D if we add another status
-          }
-          e.setWorkerid(rs.getString(6));
-          long start = rs.getLong(7);
-          if(!rs.wasNull()) {
-            e.setStart(start);
-          }
-          long endTime = rs.getLong(8);
-          if(endTime != -1) {
-            e.setEndTime(endTime);
-          }
-          e.setRunAs(rs.getString(9));
-          e.setHadoopJobId(rs.getString(10));
-          e.setId(rs.getLong(11));
-          response.addToCompacts(e);
-        }
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "showCompact(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-      }
-      return response;
-    } catch (RetryException e) {
-      return showCompact(rqst);
-    }
-  }
-
-  private static void shouldNeverHappen(long txnid) {
-    throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid));
-  }
-  private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) {
-    throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
-      + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
-  }
-
-  /**
-   * Retry-by-caller note:
-   * This may be retried after dbConn.commit.  At worst, it will create duplicate entries in
-   * TXN_COMPONENTS which won't affect anything.  See more comments in {@link #commitTxn(CommitTxnRequest)}
-   */
-  @Override
-  @RetrySemantics.SafeToRetry
-  public void addDynamicPartitions(AddDynamicPartitions rqst)
-      throws NoSuchTxnException,  TxnAbortedException, MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet lockHandle = null;
-    ResultSet rs = null;
-    try {
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN);
-        if(lockHandle == null) {
-          //ensures txn is still there and in expected state
-          ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
-          shouldNeverHappen(rqst.getTxnid());
-        }
-        //for RU this may be null so we should default it to 'u' which is most restrictive
-        OpertaionType ot = OpertaionType.UPDATE;
-        if(rqst.isSetOperationType()) {
-          ot = OpertaionType.fromDataOperationType(rqst.getOperationType());
-        }
-        List<String> rows = new ArrayList<>();
-        for (String partName : rqst.getPartitionnames()) {
-          rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
-            "," + quoteString(partName) + "," + quoteChar(ot.sqlConst));
-        }
-        int modCount = 0;
-        //record partitions that were written to
-        List<String> queries = sqlGenerator.createInsertValuesStmt(
-          "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
-        for(String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          modCount = stmt.executeUpdate(query);
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "addDynamicPartitions(" + rqst + ")");
-        throw new MetaException("Unable to insert into from transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        close(lockHandle, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      addDynamicPartitions(rqst);
-    }
-  }
-
-  /**
-   * Clean up corresponding records in metastore tables when corresponding object is dropped,
-   * specifically: TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS
-   * Retry-by-caller note: this is only idempotent assuming it's only called by dropTable/Db/etc
-   * operations.
-   */
-  @Override
-  @RetrySemantics.Idempotent
-  public void cleanupRecords(HiveObjectType type, Database db, Table table,
-                             Iterator<Partition> partitionIterator) throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-
-      try {
-        String dbName;
-        String tblName;
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        List<String> queries = new ArrayList<String>();
-        StringBuilder buff = new StringBuilder();
-
-        switch (type) {
-          case DATABASE:
-            dbName = db.getName();
-
-            buff.append("delete from TXN_COMPONENTS where tc_database='");
-            buff.append(dbName);
-            buff.append("'");
-            queries.add(buff.toString());
-
-            buff.setLength(0);
-            buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
-            buff.append(dbName);
-            buff.append("'");
-            queries.add(buff.toString());
-
-            buff.setLength(0);
-            buff.append("delete from COMPACTION_QUEUE where cq_database='");
-            buff.append(dbName);
-            buff.append("'");
-            queries.add(buff.toString());
-
-            buff.setLength(0);
-            buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
-            buff.append(dbName);
-            buff.append("'");
-            queries.add(buff.toString());
-
-            break;
-          case TABLE:
-            dbName = table.getDbName();
-            tblName = table.getTableName();
-
-            buff.append("delete from TXN_COMPONENTS where tc_database='");
-            buff.append(dbName);
-            buff.append("' and tc_table='");
-            buff.append(tblName);
-            buff.append("'");
-            queries.add(buff.toString());
-
-            buff.setLength(0);
-            buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
-            buff.append(dbName);
-            buff.append("' and ctc_table='");
-            buff.append(tblName);
-            buff.append("'");
-            queries.add(buff.toString());
-
-            buff.setLength(0);
-            buff.append("delete from COMPACTION_QUEUE where cq_database='");
-            buff.append(dbName);
-            buff.append("' and cq_table='");
-            buff.append(tblName);
-            buff.append("'");
-            queries.add(buff.toString());
-
-            buff.setLength(0);
-            buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
-            buff.append(dbName);
-            buff.append("' and cc_table='");
-            buff.append(tblName);
-            buff.append("'");
-            queries.add(buff.toString());
-
-            break;
-          case PARTITION:
-            dbName = table.getDbName();
-            tblName = table.getTableName();
-            List<FieldSchema> partCols = table.getPartitionKeys();  // partition columns
-            List<String> partVals;                                  // partition values
-            String partName;
-
-            while (partitionIterator.hasNext()) {
-              Partition p = partitionIterator.next();
-              partVals = p.getValues();
-              partName = Warehouse.makePartName(partCols, partVals);
-
-              buff.append("delete from TXN_COMPONENTS where tc_database='");
-              buff.append(dbName);
-              buff.append("' and tc_table='");
-              buff.append(tblName);
-              buff.append("' and tc_partition='");
-              buff.append(partName);
-              buff.append("'");
-              queries.add(buff.toString());
-
-              buff.setLength(0);
-              buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
-              buff.append(dbName);
-              buff.append("' and ctc_table='");
-              buff.append(tblName);
-              buff.append("' and ctc_partition='");
-              buff.append(partName);
-              buff.append("'");
-              queries.add(buff.toString());
-
-              buff.setLength(0);
-              buff.append("delete from COMPACTION_QUEUE where cq_database='");
-              buff.append(dbName);
-              buff.append("' and cq_table='");
-              buff.append(tblName);
-              buff.append("' and cq_partition='");
-              buff.append(partName);
-              buff.append("'");
-              queries.add(buff.toString());
-
-              buff.setLength(0);
-              buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
-              buff.append(dbName);
-              buff.append("' and cc_table='");
-              buff.append(tblName);
-              buff.append("' and cc_partition='");
-              buff.append(partName);
-              buff.append("'");
-              queries.add(buff.toString());
-            }
-
-            break;
-          default:
-            throw new MetaException("Invalid object type for cleanup: " + type);
-        }
-
-        for (String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          stmt.executeUpdate(query);
-        }
-
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "cleanupRecords");
-        if (e.getMessage().contains("does not exist")) {
-          LOG.warn("Cannot perform cleanup since metastore table does not exist");
-        } else {
-          throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e));
-        }
-      } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      cleanupRecords(type, db, table, partitionIterator);
-    }
-  }
-
-  /**
-   * For testing only, do not use.
-   */
-  @VisibleForTesting
-  public int numLocksInLockTable() throws SQLException, MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
-      String s = "select count(*) from HIVE_LOCKS";
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      rs.next();
-      int rc = rs.getInt(1);
-      // Necessary to clean up the transaction in the db.
-      dbConn.rollback

<TRUNCATED>

Mime
View raw message