hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-16172 Unify the retry logic in ScannerCallableWithReplicas and RpcRetryingCallerWithReadReplicas
Date Mon, 18 Jul 2016 13:54:49 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 cfc22ec1e -> 630a1a41d


HBASE-16172 Unify the retry logic in ScannerCallableWithReplicas and RpcRetryingCallerWithReadReplicas


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/630a1a41
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/630a1a41
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/630a1a41

Branch: refs/heads/branch-1
Commit: 630a1a41df5f95a06f17c5eeeaa41433955aeab8
Parents: cfc22ec
Author: tedyu <yuzhihong@gmail.com>
Authored: Mon Jul 18 06:54:09 2016 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Mon Jul 18 06:54:09 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/client/HTable.java  |  2 +-
 .../client/RpcRetryingCallerWithReadReplicas.java    | 15 ++++++++++-----
 .../hbase/client/ScannerCallableWithReplicas.java    |  5 ++++-
 3 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/630a1a41/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index efa03c6..4cd11d0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -871,7 +871,7 @@ public class HTable implements HTableInterface, RegionLocator {
       connConfiguration.getRetriesNumber(),
       operationTimeout,
       connConfiguration.getPrimaryCallTimeoutMicroSecond());
-    return callable.call();
+    return callable.call(operationTimeout);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/630a1a41/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 80c187c..aefa3bc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -47,10 +47,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -194,7 +192,7 @@ public class RpcRetryingCallerWithReadReplicas {
    * Globally, the number of retries, timeout and so on still applies, but it's per replica,
    * not global. We continue until all retries are done, or all timeouts are exceeded.
    */
-  public synchronized Result call()
+  public Result call(int operationTimeout)
       throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
     boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
 
@@ -227,10 +225,17 @@ public class RpcRetryingCallerWithReadReplicas {
 
     try {
       try {
-        Future<Result> f = cs.take();
-        return f.get();
+        long start = EnvironmentEdgeManager.currentTime();
+        Future<Result> f = cs.poll(operationTimeout, TimeUnit.MILLISECONDS);
+        long duration = EnvironmentEdgeManager.currentTime() - start;
+        if (f == null) {
+          throw new RetriesExhaustedException("timed out after " + duration + " ms");
+        }
+        return f.get(operationTimeout - duration, TimeUnit.MILLISECONDS);
       } catch (ExecutionException e) {
         throwEnrichedException(e, retries);
+      } catch (TimeoutException te) {
+        throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms");
       }
     } catch (CancellationException e) {
       throw new InterruptedIOException();

http://git-wip-us.apache.org/repos/asf/hbase/blob/630a1a41/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 4d5bb0f..4e8647f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -195,9 +196,11 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
     addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
 
     try {
+      long start = EnvironmentEdgeManager.currentTime();
       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
+      long duration = EnvironmentEdgeManager.currentTime() - start;
       if (f != null) {
-        Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
+        Pair<Result[], ScannerCallable> r = f.get(timeout - duration, TimeUnit.MILLISECONDS);
         if (r != null && r.getSecond() != null) {
           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
         }


Mime
View raw message