hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-15811 Batch Get after batch Put does not fetch all Cells We were not waiting on all executors in a batch to complete. The test for no-more-executors was damaged by the 0.99/0.98.4 fix "HBASE-11403 Fix race conditions around Object
Date Sat, 14 May 2016 01:01:04 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 535e76729 -> 4e9ee7347


HBASE-15811 Batch Get after batch Put does not fetch all Cells We were not waiting on all
executors in a batch to complete. The test for no-more-executors was damaged by the 0.99/0.98.4
fix "HBASE-11403 Fix race conditions around Object#notify"

It added this in AsyncProcess#waitForMaximumCurrentTasks:

synchronized (this.tasksInProgress) {
+          if (tasksInProgress.get() != oldInProgress) break;
           this.tasksInProgress.wait(100);

which added a break out of our waiting loop if any change in
count of tasks; it seems that what was wanted was instead to
avoid the wait if there was movement in the count of completed
task.

Reformats waitForMaximumCurrentTasks so it is testable. Adds
test that we indeed wait on the specified parameter.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4e9ee734
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4e9ee734
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4e9ee734

Branch: refs/heads/branch-1.1
Commit: 4e9ee734734741d9d662ba76bec0eb29e96221b5
Parents: 535e767
Author: stack <stack@apache.org>
Authored: Thu May 12 14:37:29 2016 -0700
Committer: stack <stack@apache.org>
Committed: Fri May 13 18:00:52 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 18 ++++--
 .../hadoop/hbase/client/TestAsyncProcess.java   | 67 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4e9ee734/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 49552b0..263c7fc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -1574,7 +1574,7 @@ class AsyncProcess {
         synchronized (actionsInProgress) {
           if (actionsInProgress.get() == 0) break;
           if (!hasWait) {
-            actionsInProgress.wait(100);
+            actionsInProgress.wait(10);
           } else {
             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
@@ -1640,9 +1640,16 @@ class AsyncProcess {
 
   /** Wait until the async does not have more than max tasks in progress. */
   private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
+    waitForMaximumCurrentTasks(max, tasksInProgress, id);
+  }
+
+  // Break out this method so testable
+  @VisibleForTesting
+  static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final
long id)
+  throws InterruptedIOException {
     long lastLog = EnvironmentEdgeManager.currentTime();
     long currentInProgress, oldInProgress = Long.MAX_VALUE;
-    while ((currentInProgress = this.tasksInProgress.get()) > max) {
+    while ((currentInProgress = tasksInProgress.get()) > max) {
       if (oldInProgress != currentInProgress) { // Wait for in progress to change.
         long now = EnvironmentEdgeManager.currentTime();
         if (now > lastLog + 10000) {
@@ -1653,9 +1660,10 @@ class AsyncProcess {
       }
       oldInProgress = currentInProgress;
       try {
-        synchronized (this.tasksInProgress) {
-          if (tasksInProgress.get() != oldInProgress) break;
-          this.tasksInProgress.wait(100);
+        synchronized (tasksInProgress) {
+          if (tasksInProgress.get() == oldInProgress) {
+            tasksInProgress.wait(10);
+          }
         }
       } catch (InterruptedException e) {
         throw new InterruptedIOException("#" + id + ", interrupted." +

http://git-wip-us.apache.org/repos/asf/hbase/blob/4e9ee734/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index cbd3ffc..796b9be 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.Timeout;
 import org.mockito.Mockito;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -53,6 +54,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -65,6 +68,31 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+
 @Category(MediumTests.class)
 public class TestAsyncProcess {
   private static final TableName DUMMY_TABLE =
@@ -1094,4 +1122,43 @@ public class TestAsyncProcess {
     Assert.assertTrue(puts.isEmpty());
   }
 
+  @Test
+  public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException
{
+    final AtomicLong tasks = new AtomicLong(0);
+    final AtomicInteger max = new AtomicInteger(0);
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          barrier.await();
+          AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
+        } catch (InterruptedIOException e) {
+          Assert.fail(e.getMessage());
+        } catch (InterruptedException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        } catch (BrokenBarrierException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    };
+    // First test that our runnable thread only exits when tasks is zero.
+    Thread t = new Thread(runnable);
+    t.start();
+    barrier.await();
+    t.join();
+    // Now assert we stay running if max == zero and tasks is > 0.
+    barrier.reset();
+    tasks.set(1000000);
+    t = new Thread(runnable);
+    t.start();
+    barrier.await();
+    while (tasks.get() > 0) {
+      assertTrue(t.isAlive());
+      tasks.set(tasks.get() - 1);
+    }
+    t.join();
+  }
 }


Mime
View raw message