Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0F6D5200B62 for ; Fri, 29 Jul 2016 05:58:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0E11D160A94; Fri, 29 Jul 2016 03:58:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2CFDB160A85 for ; Fri, 29 Jul 2016 05:58:47 +0200 (CEST) Received: (qmail 71280 invoked by uid 500); 29 Jul 2016 03:58:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 71271 invoked by uid 99); 29 Jul 2016 03:58:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jul 2016 03:58:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 093A3E0100; Fri, 29 Jul 2016 03:58:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyu@apache.org To: commits@hbase.apache.org Message-Id: <9ffdd199926146488c59803590838409@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-15931 Add log for long-running tasks in AsyncProcess Date: Fri, 29 Jul 2016 03:58:46 +0000 (UTC) archived-at: Fri, 29 Jul 2016 03:58:48 -0000 Repository: hbase Updated Branches: refs/heads/branch-1.2 a82ca4a82 -> 9db09fc93 HBASE-15931 Add log for long-running tasks in AsyncProcess Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9db09fc9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9db09fc9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9db09fc9 Branch: refs/heads/branch-1.2 Commit: 9db09fc933bfa81d52b3dcfe08618bd1195d49f6 Parents: a82ca4a Author: Yu Li Authored: Thu Jun 2 12:00:42 2016 +0800 Committer: Yu Li Committed: Fri Jul 29 11:54:26 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 58 +++++++++++++++++--- .../hbase/client/BufferedMutatorImpl.java | 3 +- .../hadoop/hbase/client/TestAsyncProcess.java | 5 +- 3 files changed, 54 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9db09fc9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index c9961d0..d72f0e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -114,6 +114,17 @@ class AsyncProcess { public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9; /** + * Configuration to decide whether to log details for batch error + */ + public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; + + private final int thresholdToLogUndoneTaskDetails; + private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = + "hbase.client.threshold.log.details"; + private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; + private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2; + + /** * The context used to wait for results from one submit call. * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), * then errors and failed operations in this object will reflect global errors. @@ -318,6 +329,10 @@ class AsyncProcess { this.rpcCallerFactory = rpcCaller; this.rpcFactory = rpcFactory; + + this.thresholdToLogUndoneTaskDetails = + conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, + DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); } /** @@ -375,7 +390,7 @@ class AsyncProcess { List locationErrorRows = null; do { // Wait until there is at least one slot for a new task. - waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); + waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); // Remember the previous decisions about regions or region servers we put in the // final multi. @@ -1706,18 +1721,19 @@ class AsyncProcess { @VisibleForTesting /** Waits until all outstanding tasks are done. Used in tests. */ void waitUntilDone() throws InterruptedIOException { - waitForMaximumCurrentTasks(0); + waitForMaximumCurrentTasks(0, null); } /** Wait until the async does not have more than max tasks in progress. */ - private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { - waitForMaximumCurrentTasks(max, tasksInProgress, id); + private void waitForMaximumCurrentTasks(int max, String tableName) + throws InterruptedIOException { + waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName); } // Break out this method so testable @VisibleForTesting - static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id) - throws InterruptedIOException { + void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id, + String tableName) throws InterruptedIOException { long lastLog = EnvironmentEdgeManager.currentTime(); long currentInProgress, oldInProgress = Long.MAX_VALUE; while ((currentInProgress = tasksInProgress.get()) > max) { @@ -1726,7 +1742,11 @@ class AsyncProcess { if (now > lastLog + 10000) { lastLog = now; LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" - + max + ", tasksInProgress=" + currentInProgress); + + max + ", tasksInProgress=" + currentInProgress + + " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName); + if (currentInProgress <= thresholdToLogUndoneTaskDetails) { + logDetailsOfUndoneTasks(currentInProgress); + } } } oldInProgress = currentInProgress; @@ -1743,6 +1763,25 @@ class AsyncProcess { } } + private void logDetailsOfUndoneTasks(long taskInProgress) { + ArrayList servers = new ArrayList(); + for (Map.Entry entry : taskCounterPerServer.entrySet()) { + if (entry.getValue().get() > 0) { + servers.add(entry.getKey()); + } + } + LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers); + if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) { + ArrayList regions = new ArrayList(); + for (Map.Entry entry : taskCounterPerRegion.entrySet()) { + if (entry.getValue().get() > 0) { + regions.add(Bytes.toString(entry.getKey())); + } + } + LOG.info("Regions against which left over task(s) are processed: " + regions); + } + } + /** * Only used w/useGlobalErrors ctor argument, for HTable backward compat. * @return Whether there were any errors in any request since the last time @@ -1758,12 +1797,13 @@ class AsyncProcess { * failed operations themselves. * @param failedRows an optional list into which the rows that failed since the last time * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved. + * @param tableName name of the table * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)} * was called, or AP was created. */ public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( - List failedRows) throws InterruptedIOException { - waitForMaximumCurrentTasks(0); + List failedRows, String tableName) throws InterruptedIOException { + waitForMaximumCurrentTasks(0, tableName); if (!globalErrors.hasErrors()) { return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/9db09fc9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 6220cd6..273f2e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -237,7 +237,8 @@ public class BufferedMutatorImpl implements BufferedMutator { while (!buffer.isEmpty()) { ap.submit(tableName, buffer, true, null, false); } - RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); + RetriesExhaustedWithDetailsException error = + ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); if (error != null) { if (listener == null) { throw error; http://git-wip-us.apache.org/repos/asf/hbase/blob/9db09fc9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 949c65b..c67c16d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -1105,16 +1105,17 @@ public class TestAsyncProcess { } @Test - public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException { + public void testWaitForMaximumCurrentTasks() throws Exception { final AtomicLong tasks = new AtomicLong(0); final AtomicInteger max = new AtomicInteger(0); final CyclicBarrier barrier = new CyclicBarrier(2); + final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf); Runnable runnable = new Runnable() { @Override public void run() { try { barrier.await(); - AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1); + ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null); } catch (InterruptedIOException e) { Assert.fail(e.getMessage()); } catch (InterruptedException e) {