hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject hbase git commit: HBASE-15931 Add log for long-running tasks in AsyncProcess
Date Fri, 29 Jul 2016 04:03:13 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 1d9694206 -> d8a53cf5d


HBASE-15931 Add log for long-running tasks in AsyncProcess


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d8a53cf5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d8a53cf5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d8a53cf5

Branch: refs/heads/branch-1.1
Commit: d8a53cf5d89f42951d7bb839153db9ca8f35067e
Parents: 1d96942
Author: Yu Li <liyu@apache.org>
Authored: Thu Jun 2 12:00:42 2016 +0800
Committer: Yu Li <liyu@apache.org>
Committed: Fri Jul 29 11:59:18 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 58 +++++++++++++++++---
 .../hbase/client/BufferedMutatorImpl.java       |  3 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  5 +-
 3 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d8a53cf5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 263c7fc..a32dfee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -112,6 +112,17 @@ class AsyncProcess {
   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
 
   /**
+   * Configuration to decide whether to log details for batch error
+   */
+  public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
+
+  private final int thresholdToLogUndoneTaskDetails;
+  private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
+      "hbase.client.threshold.log.details";
+  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
+  private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
+
+  /**
    * The context used to wait for results from one submit call.
    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
    *    then errors and failed operations in this object will reflect global errors.
@@ -299,6 +310,10 @@ class AsyncProcess {
 
     this.rpcCallerFactory = rpcCaller;
     this.rpcFactory = rpcFactory;
+
+    this.thresholdToLogUndoneTaskDetails =
+        conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
+          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
   }
 
   /**
@@ -352,7 +367,7 @@ class AsyncProcess {
     List<Integer> locationErrorRows = null;
     do {
       // Wait until there is at least one slot for a new task.
-      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
+      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
 
       // Remember the previous decisions about regions or region servers we put in the
       //  final multi.
@@ -1635,18 +1650,19 @@ class AsyncProcess {
   @VisibleForTesting
   /** Waits until all outstanding tasks are done. Used in tests. */
   void waitUntilDone() throws InterruptedIOException {
-    waitForMaximumCurrentTasks(0);
+    waitForMaximumCurrentTasks(0, null);
   }
 
   /** Wait until the async does not have more than max tasks in progress. */
-  private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
-    waitForMaximumCurrentTasks(max, tasksInProgress, id);
+  private void waitForMaximumCurrentTasks(int max, String tableName)
+      throws InterruptedIOException {
+    waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
   }
 
   // Break out this method so testable
   @VisibleForTesting
-  static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final
long id)
-  throws InterruptedIOException {
+  void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
+      String tableName) throws InterruptedIOException {
     long lastLog = EnvironmentEdgeManager.currentTime();
     long currentInProgress, oldInProgress = Long.MAX_VALUE;
     while ((currentInProgress = tasksInProgress.get()) > max) {
@@ -1655,7 +1671,11 @@ class AsyncProcess {
         if (now > lastLog + 10000) {
           lastLog = now;
           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
-              + max + ", tasksInProgress=" + currentInProgress);
+              + max + ", tasksInProgress=" + currentInProgress +
+              " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
+          if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
+            logDetailsOfUndoneTasks(currentInProgress);
+          }
         }
       }
       oldInProgress = currentInProgress;
@@ -1672,6 +1692,25 @@ class AsyncProcess {
     }
   }
 
+  private void logDetailsOfUndoneTasks(long taskInProgress) {
+    ArrayList<ServerName> servers = new ArrayList<ServerName>();
+    for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet())
{
+      if (entry.getValue().get() > 0) {
+        servers.add(entry.getKey());
+      }
+    }
+    LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
+    if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
+      ArrayList<String> regions = new ArrayList<String>();
+      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet())
{
+        if (entry.getValue().get() > 0) {
+          regions.add(Bytes.toString(entry.getKey()));
+        }
+      }
+      LOG.info("Regions against which left over task(s) are processed: " + regions);
+    }
+  }
+
   /**
    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
    * @return Whether there were any errors in any request since the last time
@@ -1687,12 +1726,13 @@ class AsyncProcess {
    * failed operations themselves.
    * @param failedRows an optional list into which the rows that failed since the last time
    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are
saved.
+   * @param tableName name of the table
    * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
    *          was called, or AP was created.
    */
   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
-      List<Row> failedRows) throws InterruptedIOException {
-    waitForMaximumCurrentTasks(0);
+      List<Row> failedRows, String tableName) throws InterruptedIOException {
+    waitForMaximumCurrentTasks(0, tableName);
     if (!globalErrors.hasErrors()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8a53cf5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index f9a68e2..1e25d0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -205,7 +205,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
         while (!writeAsyncBuffer.isEmpty()) {
           ap.submit(tableName, writeAsyncBuffer, true, null, false);
         }
-        RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
+        RetriesExhaustedWithDetailsException error =
+            ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
         if (error != null) {
           if (listener == null) {
             throw error;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8a53cf5/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 796b9be..0a8474a 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -1123,16 +1123,17 @@ public class TestAsyncProcess {
   }
 
   @Test
-  public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException
{
+  public void testWaitForMaximumCurrentTasks() throws Exception {
     final AtomicLong tasks = new AtomicLong(0);
     final AtomicInteger max = new AtomicInteger(0);
     final CyclicBarrier barrier = new CyclicBarrier(2);
+    final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
     Runnable runnable = new Runnable() {
       @Override
       public void run() {
         try {
           barrier.await();
-          AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
+          ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
         } catch (InterruptedIOException e) {
           Assert.fail(e.getMessage());
         } catch (InterruptedException e) {


Mime
View raw message