hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [06/26] hbase git commit: HBASE-14812 Fix ResultBoundedCompletionService deadlock
Date Thu, 19 Nov 2015 07:33:40 GMT
HBASE-14812 Fix ResultBoundedCompletionService deadlock


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d6fdf92f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d6fdf92f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d6fdf92f

Branch: refs/heads/hbase-12439
Commit: d6fdf92f9e5f4eaaf9300dce7f1f23adf228949c
Parents: ca10484
Author: Elliott Clark <eclark@apache.org>
Authored: Fri Nov 13 18:28:12 2015 -0800
Committer: Elliott Clark <eclark@apache.org>
Committed: Tue Nov 17 14:41:35 2015 -0800

----------------------------------------------------------------------
 .../client/ResultBoundedCompletionService.java  | 26 +++++++++++++-------
 .../client/ScannerCallableWithReplicas.java     | 22 ++++++++++++-----
 2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fdf92f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
index eacbe2d..9b32e93 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -39,12 +39,13 @@ public class ResultBoundedCompletionService<V> {
   private final Executor executor;
   private final QueueingFuture<V>[] tasks; // all the tasks
   private volatile QueueingFuture<V> completed = null;
+  private volatile boolean cancelled = false;
   
   class QueueingFuture<T> implements RunnableFuture<T> {
     private final RetryingCallable<T> future;
     private T result = null;
     private ExecutionException exeEx = null;
-    private volatile boolean cancelled;
+    private volatile boolean cancelled = false;
     private final int callTimeout;
     private final RpcRetryingCaller<T> retryingCaller;
     private boolean resultObtained = false;
@@ -61,18 +62,21 @@ public class ResultBoundedCompletionService<V> {
     public void run() {
       try {
         if (!cancelled) {
-          result =
-              this.retryingCaller.callWithRetries(future, callTimeout);
+          result = this.retryingCaller.callWithRetries(future, callTimeout);
           resultObtained = true;
         }
       } catch (Throwable t) {
         exeEx = new ExecutionException(t);
       } finally {
-        if (!cancelled && completed == null) {
-          completed = (QueueingFuture<V>) QueueingFuture.this;
-          synchronized (tasks) {
-            tasks.notify();
+        synchronized (tasks) {
+          // If this wasn't canceled then store the result.
+          if (!cancelled && completed == null) {
+            completed = (QueueingFuture<V>) QueueingFuture.this;
           }
+
+          // Notify just in case there was someone waiting and this was canceled.
+          // That shouldn't happen but better safe than sorry.
+          tasks.notify();
         }
       }
     }
@@ -145,19 +149,23 @@ public class ResultBoundedCompletionService<V> {
 
   public QueueingFuture<V> take() throws InterruptedException {
     synchronized (tasks) {
-      while (completed == null) tasks.wait();
+      while (completed == null && !cancelled) tasks.wait();
     }
     return completed;
   }
 
   public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException
{
     synchronized (tasks) {
-      if (completed == null) unit.timedWait(tasks, timeout);
+      if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
     }
     return completed;
   }
 
   public void cancelAll() {
+    // Grab the lock on tasks so that cancelled is visible everywhere
+    synchronized (tasks) {
+      cancelled = true;
+    }
     for (QueueingFuture<V> future : tasks) {
       if (future != null) future.cancel(true);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fdf92f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 586db8c..97d7d41 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -163,12 +164,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
     replicaSwitched.set(false);
     // submit call for the primary replica.
     addCallsForCurrentReplica(cs, rl);
+
     try {
       // wait for the timeout to see whether the primary responds back
       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
           TimeUnit.MICROSECONDS); // Yes, microseconds
       if (f != null) {
-        Pair<Result[], ScannerCallable> r = f.get();
+        Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
         if (r != null && r.getSecond() != null) {
           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
         }
@@ -180,23 +182,31 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
       throw new InterruptedIOException(e.getMessage());
     } catch (InterruptedException e) {
       throw new InterruptedIOException(e.getMessage());
+    } catch (TimeoutException e) {
+      throw new InterruptedIOException(e.getMessage());
     }
+
     // submit call for the all of the secondaries at once
     // TODO: this may be an overkill for large region replication
     addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+
     try {
-      Future<Pair<Result[], ScannerCallable>> f = cs.take();
-      Pair<Result[], ScannerCallable> r = f.get();
-      if (r != null && r.getSecond() != null) {
-        updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
+      Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
+      if (f != null) {
+        Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
+        if (r != null && r.getSecond() != null) {
+          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
+        }
+        return r == null ? null : r.getFirst(); // great we got an answer
       }
-      return r == null ? null : r.getFirst(); // great we got an answer
     } catch (ExecutionException e) {
       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
     } catch (CancellationException e) {
       throw new InterruptedIOException(e.getMessage());
     } catch (InterruptedException e) {
       throw new InterruptedIOException(e.getMessage());
+    } catch (TimeoutException e) {
+      throw new InterruptedIOException(e.getMessage());
     } finally {
       // We get there because we were interrupted or because one or more of the
       // calls succeeded or failed. In all case, we stop all our tasks.


Mime
View raw message