hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-16515 AsyncProcess has incorrent count of tasks if the backoff policy is enabled (ChiaPing Tsai)
Date Tue, 30 Aug 2016 15:02:01 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 9cb0094bd -> 0f92e943a


HBASE-16515 AsyncProcess has incorrent count of tasks if the backoff policy is enabled (ChiaPing
Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0f92e943
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0f92e943
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0f92e943

Branch: refs/heads/master
Commit: 0f92e943aca497d60358b0ce32ec690a04e4fd85
Parents: 9cb0094
Author: tedyu <yuzhihong@gmail.com>
Authored: Tue Aug 30 08:01:49 2016 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Tue Aug 30 08:01:49 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  6 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   | 79 +++++++++++++++-----
 2 files changed, 66 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0f92e943/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 045885f..5bb0f58 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -1066,7 +1066,6 @@ class AsyncProcess {
       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet())
{
         ServerName server = e.getKey();
         MultiAction<Row> multiAction = e.getValue();
-        incTaskCounters(multiAction.getRegions(), server);
         Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server,
multiAction,
             numAttempt);
         // make sure we correctly count the number of runnables before we try to reuse the
send
@@ -1114,6 +1113,7 @@ class AsyncProcess {
         if (connection.getConnectionMetrics() != null) {
           connection.getConnectionMetrics().incrNormalRunners();
         }
+        incTaskCounters(multiAction.getRegions(), server);
         SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server,
           new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress));
         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
@@ -1136,6 +1136,7 @@ class AsyncProcess {
 
       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
       for (DelayingRunner runner : actions.values()) {
+        incTaskCounters(runner.getActions().getRegions(), server);
         String traceText = "AsyncProcess.sendMultiAction";
         Runnable runnable = addSingleServerRequestHeapSize(server,
           new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
@@ -1757,7 +1758,8 @@ class AsyncProcess {
     }
   }
 
-  private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult>
results) {
+  @VisibleForTesting
+  protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult>
results) {
     boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null;
     boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null;
     if (!stats && !metrics) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f92e943/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 516f2cf..00f5232 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -82,24 +82,10 @@ import org.junit.rules.TestRule;
 import org.mockito.Mockito;
 import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker;
 import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -225,6 +211,11 @@ public class TestAsyncProcess {
       return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
     }
 
+
+    @Override
+    protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult>
results) {
+      // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
+    }
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
         CancellableRegionServerCallable callable) {
@@ -295,7 +286,21 @@ public class TestAsyncProcess {
       return new CallerWithFailure(ioe);
     }
   }
-
+  /**
+   * Make the backoff time always different on each call.
+   */
+  static class MyClientBackoffPolicy implements ClientBackoffPolicy {
+    private final Map<ServerName, AtomicInteger> count = new HashMap<>();
+    @Override
+    public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats)
{
+      AtomicInteger inc = count.get(serverName);
+      if (inc == null) {
+        inc = new AtomicInteger(0);
+        count.put(serverName, inc);
+      }
+      return inc.getAndIncrement();
+    }
+  }
   class MyAsyncProcessWithReplicas extends MyAsyncProcess {
     private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
     private long primarySleepMs = 0, replicaSleepMs = 0;
@@ -836,6 +841,46 @@ public class TestAsyncProcess {
   }
 
   @Test
+  public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException
{
+    ClusterConnection hc = createHConnection();
+    MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false);
+    testTaskCount(ap);
+  }
+
+  @Test
+  public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException
{
+    Configuration copyConf = new Configuration(conf);
+    copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
+    MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
+    ClusterConnection hc = createHConnection();
+    Mockito.when(hc.getConfiguration()).thenReturn(copyConf);
+    Mockito.when(hc.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
+    Mockito.when(hc.getBackoffPolicy()).thenReturn(bp);
+    MyAsyncProcess ap = new MyAsyncProcess(hc, copyConf, false);
+    testTaskCount(ap);
+  }
+
+  private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException
{
+    List<Put> puts = new ArrayList<>();
+    for (int i = 0; i != 3; ++i) {
+      puts.add(createPut(1, true));
+      puts.add(createPut(2, true));
+      puts.add(createPut(3, true));
+    }
+    ap.submit(DUMMY_TABLE, puts, true, null, false);
+    ap.waitUntilDone();
+    // More time to wait if there are incorrect task count.
+    TimeUnit.SECONDS.sleep(1);
+    assertEquals(0, ap.tasksInProgress.get());
+    for (AtomicInteger count : ap.taskCounterPerRegion.values()) {
+      assertEquals(0, count.get());
+    }
+    for (AtomicInteger count : ap.taskCounterPerServer.values()) {
+      assertEquals(0, count.get());
+    }
+  }
+
+  @Test
   public void testMaxTask() throws Exception {
     final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
 


Mime
View raw message