hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-15811 Batch Get after batch Put does not fetch all Cells We were not waiting on all executors in a batch to complete. The test for no-more-executors was damaged by the 0.99/0.98.4 fix "HBASE-11403 Fix race conditions around Object
Date Sat, 14 May 2016 00:53:52 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 92f415976 -> 77f511fce


HBASE-15811 Batch Get after batch Put does not fetch all Cells We were not waiting on all
executors in a batch to complete. The test for no-more-executors was damaged by the 0.99/0.98.4
fix "HBASE-11403 Fix race conditions around Object#notify"

It added this in AsyncProcess#waitForMaximumCurrentTasks:

synchronized (this.tasksInProgress) {
+          if (tasksInProgress.get() != oldInProgress) break;
           this.tasksInProgress.wait(100);

which added a break out of our waiting loop if any change in
count of tasks; it seems that what was wanted was instead to
avoid the wait if there was movement in the count of completed
task.

Reformats waitForMaximumCurrentTasks so it is testable. Adds
test that we indeed wait on the specified parameter.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/77f511fc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/77f511fc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/77f511fc

Branch: refs/heads/branch-1
Commit: 77f511fcebcc391ed92e5dfc5998b7006afed2c9
Parents: 92f4159
Author: stack <stack@apache.org>
Authored: Thu May 12 14:37:29 2016 -0700
Committer: stack <stack@apache.org>
Committed: Fri May 13 17:53:37 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 18 ++--
 .../hadoop/hbase/client/TestAsyncProcess.java   | 95 ++++++++++++++------
 2 files changed, 82 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/77f511fc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index b2b6be5..f9199a8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -1679,7 +1679,7 @@ class AsyncProcess {
         synchronized (actionsInProgress) {
           if (actionsInProgress.get() == 0) break;
           if (!hasWait) {
-            actionsInProgress.wait(100);
+            actionsInProgress.wait(10);
           } else {
             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
@@ -1770,9 +1770,16 @@ class AsyncProcess {
 
   /** Wait until the async does not have more than max tasks in progress. */
   private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
+    waitForMaximumCurrentTasks(max, tasksInProgress, id);
+  }
+
+  // Break out this method so testable
+  @VisibleForTesting
+  static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final
long id)
+  throws InterruptedIOException {
     long lastLog = EnvironmentEdgeManager.currentTime();
     long currentInProgress, oldInProgress = Long.MAX_VALUE;
-    while ((currentInProgress = this.tasksInProgress.get()) > max) {
+    while ((currentInProgress = tasksInProgress.get()) > max) {
       if (oldInProgress != currentInProgress) { // Wait for in progress to change.
         long now = EnvironmentEdgeManager.currentTime();
         if (now > lastLog + 10000) {
@@ -1783,9 +1790,10 @@ class AsyncProcess {
       }
       oldInProgress = currentInProgress;
       try {
-        synchronized (this.tasksInProgress) {
-          if (tasksInProgress.get() != oldInProgress) break;
-          this.tasksInProgress.wait(100);
+        synchronized (tasksInProgress) {
+          if (tasksInProgress.get() == oldInProgress) {
+            tasksInProgress.wait(10);
+          }
         }
       } catch (InterruptedException e) {
         throw new InterruptedIOException("#" + id + ", interrupted." +

http://git-wip-us.apache.org/repos/asf/hbase/blob/77f511fc/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index b588ce8..9809c28 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -20,31 +20,7 @@
 package org.apache.hadoop.hbase.client;
 
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CallQueueTooBigException;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
-import org.mockito.Mockito;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -56,6 +32,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -68,6 +46,32 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+
 @Category(MediumTests.class)
 public class TestAsyncProcess {
   private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class);
@@ -1134,4 +1138,43 @@ public class TestAsyncProcess {
     Assert.assertTrue(puts.isEmpty());
   }
 
-}
+  @Test
+  public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException
{
+    final AtomicLong tasks = new AtomicLong(0);
+    final AtomicInteger max = new AtomicInteger(0);
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          barrier.await();
+          AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
+        } catch (InterruptedIOException e) {
+          Assert.fail(e.getMessage());
+        } catch (InterruptedException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        } catch (BrokenBarrierException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    };
+    // First test that our runnable thread only exits when tasks is zero.
+    Thread t = new Thread(runnable);
+    t.start();
+    barrier.await();
+    t.join();
+    // Now assert we stay running if max == zero and tasks is > 0.
+    barrier.reset();
+    tasks.set(1000000);
+    t = new Thread(runnable);
+    t.start();
+    barrier.await();
+    while (tasks.get() > 0) {
+      assertTrue(t.isAlive());
+      tasks.set(tasks.get() - 1);
+    }
+    t.join();
+  }
+}
\ No newline at end of file


Mime
View raw message