Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 83FB218C0D for ; Sat, 16 Jan 2016 20:31:41 +0000 (UTC) Received: (qmail 67511 invoked by uid 500); 16 Jan 2016 20:31:41 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 67462 invoked by uid 500); 16 Jan 2016 20:31:41 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 67448 invoked by uid 99); 16 Jan 2016 20:31:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Jan 2016 20:31:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F0E22E0484; Sat, 16 Jan 2016 20:31:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ekoifman@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-12366 Refactor Heartbeater logic for transaction (Wei Zheng via Eugene Koifman) Date: Sat, 16 Jan 2016 20:31:40 +0000 (UTC) Repository: hive Updated Branches: refs/heads/master f3ea7773b -> aa0f8e062 HIVE-12366 Refactor Heartbeater logic for transaction (Wei Zheng via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa0f8e06 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa0f8e06 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa0f8e06 Branch: refs/heads/master Commit: aa0f8e062827245b05c353d02537e51b9957bf36 Parents: f3ea777 Author: Eugene Koifman Authored: Sat Jan 16 12:31:29 2016 -0800 Committer: Eugene Koifman Committed: Sat Jan 16 12:31:29 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../java/org/apache/hadoop/hive/ql/Driver.java | 4 +- .../apache/hadoop/hive/ql/exec/Heartbeater.java | 90 ------------ .../hadoop/hive/ql/exec/mr/ExecDriver.java | 2 +- .../hive/ql/exec/mr/HadoopJobExecHelper.java | 16 +-- .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 22 +-- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 2 +- .../hadoop/hive/ql/io/merge/MergeFileTask.java | 2 +- .../ql/io/rcfile/stats/PartialScanTask.java | 2 +- .../io/rcfile/truncate/ColumnTruncateTask.java | 2 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 140 +++++++++++++++++++ .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 9 ++ .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 10 ++ .../apache/hadoop/hive/ql/TestTxnCommands.java | 2 +- .../hive/ql/lockmgr/TestDbTxnManager.java | 91 +++++++++--- 15 files changed, 244 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1e8c34b..2c25cae 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -173,6 +173,7 @@ public class HiveConf extends Configuration { HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, HiveConf.ConfVars.HIVE_TXN_MANAGER, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, + HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED, @@ -1508,6 +1509,8 @@ public class HiveConf extends Configuration { "no transactions."), HIVE_TXN_TIMEOUT("hive.txn.timeout", "300s", new TimeValidator(TimeUnit.SECONDS), "time after which transactions are declared aborted if the client has not sent a heartbeat."), + HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE("hive.txn.heartbeat.threadpool.size", 5, "The number of " + + "threads to use for heartbeating. For Hive CLI, 1 is enough. For HiveServer2, we need a few"), TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT("hive.txn.manager.dump.lock.state.on.acquire.timeout", false, "Set this to true so that when attempt to acquire a lock on resource times out, the current state" + " of the lock manager is dumped to log file. This is for debugging. See also " + http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 9bff08f..020f037 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1026,9 +1026,7 @@ public class Driver implements CommandProcessor { if (ctx != null && ctx.getHiveLocks() != null) { hiveLocks.addAll(ctx.getHiveLocks()); } - if (!hiveLocks.isEmpty()) { - txnMgr.getLockManager().releaseLocks(hiveLocks); - } + txnMgr.releaseLocks(hiveLocks); } hiveLocks.clear(); if (ctx != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java deleted file mode 100644 index ff64563..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.LockException; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * Class to handle heartbeats for MR and Tez tasks. - */ -public class Heartbeater { - private long lastHeartbeat = 0; - private long heartbeatInterval = 0; - private boolean dontHeartbeat = false; - private HiveTxnManager txnMgr; - private Configuration conf; - - static final private Logger LOG = LoggerFactory.getLogger(Heartbeater.class.getName()); - - /** - * - * @param txnMgr transaction manager for this operation - * @param conf Configuration for this operation - */ - public Heartbeater(HiveTxnManager txnMgr, Configuration conf) { - this.txnMgr = txnMgr; - this.conf = conf; - } - - /** - * Send a heartbeat to the metastore for locks and transactions. - * @throws IOException - */ - public void heartbeat() throws IOException { - if (dontHeartbeat) return; - - if (txnMgr == null) { - LOG.debug("txnMgr null, not heartbeating"); - dontHeartbeat = true; - return; - } - - if (heartbeatInterval == 0) { - // Multiply the heartbeat interval by 1000 to convert to milliseconds, - // but divide by 2 to give us a safety factor. - heartbeatInterval = HiveConf.getTimeVar( - conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2; - if (heartbeatInterval == 0) { - LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent"); - dontHeartbeat = true; - LOG.debug("heartbeat interval 0, not heartbeating"); - return; - } - } - long now = System.currentTimeMillis(); - if (now - lastHeartbeat > heartbeatInterval) { - try { - LOG.debug("heartbeating"); - txnMgr.heartbeat(); - } catch (LockException e) { - LOG.warn("Failed trying to heartbeat " + e.getMessage()); - throw new IOException(e); - } - lastHeartbeat = now; - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index ab7fd93..472e8ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -429,7 +429,7 @@ public class ExecDriver extends Task implements Serializable, Hadoop // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager()); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 5f35630..1b296b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -33,13 +33,11 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; -import org.apache.hadoop.hive.ql.exec.Heartbeater; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskHandle; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -230,14 +228,12 @@ public class HadoopJobExecHelper { int numReduce = -1; List clientStatPublishers = getClientStatPublishers(); final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job); - Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job); while (!rj.isComplete()) { try { Thread.sleep(pullInterval); } catch (InterruptedException e) { } - heartbeater.heartbeat(); if (initializing && rj.getJobState() == JobStatus.PREP) { // No reason to poll untill the job is initialized @@ -451,7 +447,6 @@ public class HadoopJobExecHelper { private static class ExecDriverTaskHandle extends TaskHandle { JobClient jc; RunningJob rj; - HiveTxnManager txnMgr; JobClient getJobClient() { return jc; @@ -461,14 +456,9 @@ public class HadoopJobExecHelper { return rj; } - HiveTxnManager getTxnManager() { - return txnMgr; - } - - public ExecDriverTaskHandle(JobClient jc, RunningJob rj, HiveTxnManager txnMgr) { + public ExecDriverTaskHandle(JobClient jc, RunningJob rj) { this.jc = jc; this.rj = rj; - this.txnMgr = txnMgr; } public void setRunningJob(RunningJob job) { @@ -522,7 +512,7 @@ public class HadoopJobExecHelper { } - public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws IOException { + public int progress(RunningJob rj, JobClient jc) throws IOException { jobId = rj.getID(); int returnVal = 0; @@ -543,7 +533,7 @@ public class HadoopJobExecHelper { runningJobs.add(rj); - ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, txnMgr); + ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); jobInfo(rj); MapRedStats mapRedStats = progress(th); http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index e81b73d..479bc93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -39,11 +39,9 @@ import java.util.TreeSet; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.Heartbeater; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -63,21 +61,6 @@ import org.fusesource.jansi.Ansi; import com.google.common.base.Preconditions; -import java.io.IOException; -import java.io.PrintStream; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - /** * TezJobMonitor keeps track of a tez job while it's being executed. It will * print status to the console and retrieve final status of the job after @@ -220,11 +203,10 @@ public class TezJobMonitor { * monitorExecution handles status printing, failures during execution and final status retrieval. * * @param dagClient client that was used to kick off the job - * @param txnMgr transaction manager for this operation * @param conf configuration file for this operation * @return int 0 - success, 1 - killed, 2 - failed */ - public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf, + public int monitorExecution(final DAGClient dagClient, HiveConf conf, DAG dag) throws InterruptedException { long monitorStartTime = System.currentTimeMillis(); DAGStatus status = null; @@ -238,7 +220,6 @@ public class TezJobMonitor { DAGStatus.State lastState = null; String lastReport = null; Set opts = new HashSet(); - Heartbeater heartbeater = new Heartbeater(txnMgr, conf); long startTime = 0; boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(conf); @@ -255,7 +236,6 @@ public class TezJobMonitor { status = dagClient.getDAGStatus(opts, checkInterval); Map progressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); - heartbeater.heartbeat(); if (state != lastState || state == RUNNING) { lastState = state; http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 88fba58..3cb7439 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -191,7 +191,7 @@ public class TezTask extends Task { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap()); - rc = monitor.monitorExecution(dagClient, ctx.getHiveTxnManager(), conf, dag); + rc = monitor.monitorExecution(dagClient, conf, dag); if (rc != 0) { this.setException(new HiveException(monitor.getDiagnostics())); } http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 7453145..e23a969 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -149,7 +149,7 @@ public class MergeFileTask extends Task implements Serializable, // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index 188e9a6..829a9f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -221,7 +221,7 @@ public class PartialScanTask extends Task implements // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 08e3d80..34c067a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -185,7 +185,7 @@ public class ColumnTruncateTask extends Task implements Seri // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 552367c..3617699 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.lockmgr; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; @@ -39,6 +42,13 @@ import org.apache.thrift.TException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * An implementation of HiveTxnManager that stores the transactions in the @@ -65,7 +75,25 @@ public class DbTxnManager extends HiveTxnManagerImpl { */ private int statementId = -1; + // ExecutorService for sending heartbeat to metastore periodically. + private static ScheduledExecutorService heartbeatExecutorService = null; + private ScheduledFuture heartbeatTask = null; + private Runnable shutdownRunner = null; + static final int SHUTDOWN_HOOK_PRIORITY = 0; + DbTxnManager() { + shutdownRunner = new Runnable() { + @Override + public void run() { + if (heartbeatExecutorService != null + && !heartbeatExecutorService.isShutdown() + && !heartbeatExecutorService.isTerminated()) { + LOG.info("Shutting down Heartbeater thread pool."); + heartbeatExecutorService.shutdown(); + } + } + }; + ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY); } @Override @@ -104,6 +132,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { acquireLocks(plan, ctx, username, true); + startHeartbeat(); } /** @@ -245,6 +274,25 @@ public class DbTxnManager extends HiveTxnManagerImpl { ctx.setHiveLocks(locks); return lockState; } + /** + * This is for testing only. + * @param delay time to delay for first heartbeat + * @return null if no locks were needed + */ + @VisibleForTesting + void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { + acquireLocks(plan, ctx, username, true); + startHeartbeat(delay); + } + + + @Override + public void releaseLocks(List hiveLocks) throws LockException { + if (lockMgr != null) { + stopHeartbeat(); + lockMgr.releaseLocks(hiveLocks); + } + } @Override public void commitTxn() throws LockException { @@ -253,6 +301,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { } try { lockMgr.clearLocalLockRecords(); + stopHeartbeat(); LOG.debug("Committing txn " + JavaUtils.txnIdToString(txnId)); client.commitTxn(txnId); } catch (NoSuchTxnException e) { @@ -277,6 +326,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { } try { lockMgr.clearLocalLockRecords(); + stopHeartbeat(); LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId)); client.rollbackTxn(txnId); } catch (NoSuchTxnException e) { @@ -337,6 +387,31 @@ public class DbTxnManager extends HiveTxnManagerImpl { } } + private void startHeartbeat() throws LockException { + startHeartbeat(0); + } + + /** + * This is for testing only. Normally client should call {@link #startHeartbeat()} + * Make the heartbeater start before an initial delay period. + * @param delay time to delay before first execution, in milliseconds + */ + void startHeartbeat(long delay) throws LockException { + long heartbeatInterval = getHeartbeatInterval(conf); + assert heartbeatInterval > 0; + heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate( + new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS); + LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " + + 0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS); + } + + private void stopHeartbeat() { + if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { + heartbeatTask.cancel(true); + heartbeatTask = null; + } + } + @Override public ValidTxnList getValidTxns() throws LockException { init(); @@ -366,6 +441,10 @@ public class DbTxnManager extends HiveTxnManagerImpl { @Override protected void destruct() { try { + stopHeartbeat(); + if (shutdownRunner != null) { + ShutdownHookManager.removeShutdownHook(shutdownRunner); + } if (isTxnOpen()) rollbackTxn(); if (lockMgr != null) lockMgr.close(); } catch (Exception e) { @@ -384,6 +463,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { try { Hive db = Hive.get(conf); client = db.getMSC(); + initHeartbeatExecutorService(); } catch (MetaException e) { throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); } catch (HiveException e) { @@ -391,6 +471,26 @@ public class DbTxnManager extends HiveTxnManagerImpl { } } } + + private synchronized void initHeartbeatExecutorService() { + if (heartbeatExecutorService != null + && !heartbeatExecutorService.isShutdown() + && !heartbeatExecutorService.isTerminated()) { + return; + } + + int threadPoolSize = conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE); + heartbeatExecutorService = + Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactory() { + private final AtomicInteger threadCounter = new AtomicInteger(); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement()); + } + }); + ((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true); + } + @Override public boolean isTxnOpen() { return txnId > 0; @@ -403,4 +503,44 @@ public class DbTxnManager extends HiveTxnManagerImpl { public int getStatementId() { return statementId; } + + public static long getHeartbeatInterval(Configuration conf) throws LockException { + // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS), + // then divide it by 2 to give us a safety factor. + long interval = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2; + if (interval == 0) { + throw new LockException(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set," + + " heartbeats won't be sent"); + } + return interval; + } + + /** + * Heartbeater thread + */ + public static class Heartbeater implements Runnable { + private HiveTxnManager txnMgr; + + /** + * + * @param txnMgr transaction manager for this operation + */ + public Heartbeater(HiveTxnManager txnMgr) { + this.txnMgr = txnMgr; + } + + /** + * Send a heartbeat to the metastore for locks and transactions. + */ + @Override + public void run() { + try { + LOG.debug("Heartbeating..."); + txnMgr.heartbeat(); + } catch (LockException e) { + LOG.error("Failed trying to heartbeat " + e.getMessage()); + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 2d30198..036fc24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -181,6 +181,15 @@ class DummyTxnManager extends HiveTxnManagerImpl { } @Override + public void releaseLocks(List hiveLocks) throws LockException { + // If there's no lock manager, it essentially means we didn't acquire locks in the first place, + // thus no need to release locks + if (lockMgr != null) { + lockMgr.releaseLocks(hiveLocks); + } + } + + @Override public void commitTxn() throws LockException { // No-op } http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 2bfc732..cb97d29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.plan.LockTableDesc; import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; +import java.util.List; + /** * An interface that allows Hive to manage transactions. All classes * implementing this should extend {@link HiveTxnManagerImpl} rather than @@ -67,6 +69,14 @@ public interface HiveTxnManager { void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException; /** + * Release specified locks. + * Transaction aware TxnManagers, which has {@code supportsAcid() == true}, + * will track locks internally and ignore this parameter + * @param hiveLocks The list of locks to be released. + */ + void releaseLocks(List hiveLocks) throws LockException; + + /** * Commit the current transaction. This will release all locks obtained in * {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, * org.apache.hadoop.hive.ql.Context, java.lang.String)}. http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 68190c2..b20ce28 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -414,7 +414,7 @@ public class TestTxnCommands { runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5"); //make sure currently running txn is considered aborted by housekeeper hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS); - hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS); + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS); AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService(); //this will abort the txn houseKeeperService.start(hiveConf); http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index f82b85a..88b379c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -33,9 +33,6 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; -import org.apache.log4j.Level; -import org.slf4j.LoggerFactory; -import static org.hamcrest.CoreMatchers.is; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -202,6 +199,7 @@ public class TestDbTxnManager { addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); txnMgr.openTxn("NicholasII"); + Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); LockException exception = null; try { @@ -212,8 +210,10 @@ public class TestDbTxnManager { } Assert.assertNotNull("Expected exception1", exception); Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); + exception = null; txnMgr.openTxn("AlexanderIII"); + Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); try { txnMgr.rollbackTxn(); @@ -223,20 +223,6 @@ public class TestDbTxnManager { } Assert.assertNotNull("Expected exception2", exception); Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_NO_SUCH_TRANSACTION, exception.getCanonicalErrorMsg()); - exception = null; - txnMgr.openTxn("PeterI"); - txnMgr.acquireLocks(qp, ctx, "PeterI"); - List locks = ctx.getHiveLocks(); - Assert.assertThat("Unexpected lock count", locks.size(), is(1)); - runReaper(); - try { - txnMgr.heartbeat(); - } - catch(LockException ex) { - exception = ex; - } - Assert.assertNotNull("Expected exception3", exception); - Assert.assertEquals("Wrong Exception3", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); } @Test @@ -247,12 +233,12 @@ public class TestDbTxnManager { expireLocks(txnMgr, 0); //create a few read locks, all on the same resource for(int i = 0; i < 5; i++) { - txnMgr.acquireLocks(qp, ctx, "PeterI" + i); + ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat } expireLocks(txnMgr, 5); //create a lot of locks for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) { - txnMgr.acquireLocks(qp, ctx, "PeterI" + i); + ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat } expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17); } @@ -260,6 +246,7 @@ public class TestDbTxnManager { DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager(); ShowLocksResponse resp = lockManager.getLocks(); Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size()); + Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); resp = lockManager.getLocks(); Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size()); @@ -381,6 +368,70 @@ public class TestDbTxnManager { Assert.assertTrue(sawException); } + @Test + public void testLockAcquisitionAndRelease() throws Exception { + addTableInput(); + QueryPlan qp = new MockQueryPlan(this); + txnMgr.acquireLocks(qp, ctx, "fred"); + List locks = ctx.getHiveLocks(); + Assert.assertEquals(1, locks.size()); + txnMgr.releaseLocks(locks); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testHeartbeater() throws Exception { + Assert.assertTrue(txnMgr instanceof DbTxnManager); + + addTableInput(); + LockException exception = null; + QueryPlan qp = new MockQueryPlan(this); + + // Case 1: If there's no delay for the heartbeat, txn should be able to commit + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(qp, ctx, "fred"); // heartbeat started.. + runReaper(); + try { + txnMgr.commitTxn(); + } catch (LockException e) { + exception = e; + } + Assert.assertNull("Txn commit should be successful", exception); + exception = null; + + // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance, + // then txt should be able to commit + txnMgr.openTxn("tom"); + // Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT + ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "tom", + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2); + runReaper(); + try { + txnMgr.commitTxn(); + } catch (LockException e) { + exception = e; + } + Assert.assertNull("Txn commit should also be successful", exception); + exception = null; + + // Case 3: If there's delay for the heartbeat, and the delay is long enough to trigger the reaper, + // then the txn will time out and be aborted. + // Here we just don't send the heartbeat at all - an infinite delay. + txnMgr.openTxn("jerry"); + // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT + ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "jerry", true); + Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); + runReaper(); + try { + txnMgr.commitTxn(); + } catch (LockException e) { + exception = e; + } + Assert.assertNotNull("Txn should have been aborted", exception); + Assert.assertEquals(ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); + } + @Before public void setUp() throws Exception { TxnDbUtil.prepDb(); @@ -391,7 +442,7 @@ public class TestDbTxnManager { readEntities = new HashSet(); writeEntities = new HashSet(); conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS); - conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS); + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); houseKeeperService = new AcidHouseKeeperService(); }