hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject hbase git commit: HBASE-12012. Improve cancellation for the scan RPCs (patch from master; had to resolve some conflicts)
Date Tue, 23 Dec 2014 00:45:53 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 fd232cc3b -> 949982fc6


HBASE-12012. Improve cancellation for the scan RPCs (patch from master; had to resolve some
conflicts)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/949982fc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/949982fc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/949982fc

Branch: refs/heads/branch-1
Commit: 949982fc6e7d1805ef1c1a6997aa8d89ccbcf01f
Parents: fd232cc
Author: Devaraj Das <ddas@apache.org>
Authored: Mon Dec 22 16:45:41 2014 -0800
Committer: Devaraj Das <ddas@apache.org>
Committed: Mon Dec 22 16:45:41 2014 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/Cancellable.java |  31 ++++
 .../hadoop/hbase/client/ClientScanner.java      |   7 +
 .../hadoop/hbase/client/ClientSmallScanner.java |   7 +-
 .../client/ResultBoundedCompletionService.java  | 165 +++++++++++++++++++
 .../RpcRetryingCallerWithReadReplicas.java      | 152 ++---------------
 .../hadoop/hbase/client/ScannerCallable.java    |   7 +-
 .../client/ScannerCallableWithReplicas.java     | 105 +++++++++---
 .../hadoop/hbase/client/TestReplicasClient.java |  48 ++++++
 8 files changed, 356 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/949982fc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java
new file mode 100644
index 0000000..43011e9
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This should be implemented by the Get/Scan implementations that
+ * talk to replica regions. When an RPC response is received from one
+ * of the replicas, the RPCs to the other replicas are cancelled.
+ */
+@InterfaceAudience.Private
+interface Cancellable {
+  public void cancel();
+  public boolean isCancelled();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/949982fc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 9d59242..2ac5652 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implements the scanner interface for the HBase client.
  * If there are multiple regions in a table, this scanner will iterate
@@ -276,6 +278,11 @@ public class ClientScanner extends AbstractClientScanner {
       return true;
     }
 
+  @VisibleForTesting
+  boolean isAnyRPCcancelled() {
+    return callable.isAnyRPCcancelled();
+  }
+
   static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
       throws IOException, RuntimeException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/949982fc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index 478ba76..9c370b9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -169,7 +168,7 @@ public class ClientSmallScanner extends ClientScanner {
       ScanRequest request = RequestConverter.buildScanRequest(getLocation()
           .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
       ScanResponse response = null;
-      PayloadCarryingRpcController controller = controllerFactory.newController();
+      controller = controllerFactory.newController();
       try {
         controller.setPriority(getTableName());
         controller.setCallTimeout(timeout);
@@ -183,8 +182,8 @@ public class ClientSmallScanner extends ClientScanner {
 
     @Override
     public ScannerCallable getScannerCallableForReplica(int id) {
-      return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
scanMetrics,
-        controllerFactory, getCaching(), id);
+      return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
+          scanMetrics, controllerFactory, getCaching(), id);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/949982fc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
new file mode 100644
index 0000000..1dab776
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -0,0 +1,165 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.htrace.Trace;
+
+/**
+ * A completion service for the RpcRetryingCallerFactory.
+ * Keeps the list of the futures, and allows to cancel them all.
+ * This means as well that it can be used for a small set of tasks only.
+ * <br>Implementation is not Thread safe.
+ */
+@InterfaceAudience.Private
+public class ResultBoundedCompletionService<V> {
+  private final RpcRetryingCallerFactory retryingCallerFactory;
+  private final Executor executor;
+  private final QueueingFuture<V>[] tasks; // all the tasks
+  private volatile QueueingFuture<V> completed = null;
+  
+  class QueueingFuture<T> implements RunnableFuture<T> {
+    private final RetryingCallable<T> future;
+    private T result = null;
+    private ExecutionException exeEx = null;
+    private volatile boolean cancelled;
+    private final int callTimeout;
+    private final RpcRetryingCaller<T> retryingCaller;
+    private boolean resultObtained = false;
+
+
+    public QueueingFuture(RetryingCallable<T> future, int callTimeout) {
+      this.future = future;
+      this.callTimeout = callTimeout;
+      this.retryingCaller = retryingCallerFactory.<T>newCaller();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void run() {
+      try {
+        if (!cancelled) {
+          result =
+              this.retryingCaller.callWithRetries(future, callTimeout);
+          resultObtained = true;
+        }
+      } catch (Throwable t) {
+        exeEx = new ExecutionException(t);
+      } finally {
+        if (!cancelled && completed == null) {
+          completed = (QueueingFuture<V>) QueueingFuture.this;
+          synchronized (tasks) {
+            tasks.notify();
+          }
+        }
+      }
+    }
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      if (resultObtained || exeEx != null) return false;
+      retryingCaller.cancel();
+      if (future instanceof Cancellable) ((Cancellable)future).cancel();
+      cancelled = true;
+      return true;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return cancelled;
+    }
+
+    @Override
+    public boolean isDone() {
+      return resultObtained || exeEx != null;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+      try {
+        return get(1000, TimeUnit.DAYS);
+      } catch (TimeoutException e) {
+        throw new RuntimeException("You did wait for 1000 days here?", e);
+      }
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      synchronized (tasks) {
+        if (resultObtained) {
+          return result;
+        }
+        if (exeEx != null) {
+          throw exeEx;
+        }
+        unit.timedWait(tasks, timeout);
+      }
+      if (resultObtained) {
+        return result;
+      }
+      if (exeEx != null) {
+        throw exeEx;
+      }
+
+      throw new TimeoutException("timeout=" + timeout + ", " + unit);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public ResultBoundedCompletionService(
+      RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
+      int maxTasks) {
+    this.retryingCallerFactory = retryingCallerFactory;
+    this.executor = executor;
+    this.tasks = new QueueingFuture[maxTasks];
+  }
+
+
+  public void submit(RetryingCallable<V> task, int callTimeout, int id) {
+    QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout);
+    executor.execute(Trace.wrap(newFuture));
+    tasks[id] = newFuture;
+  }
+
+  public QueueingFuture<V> take() throws InterruptedException {
+    synchronized (tasks) {
+      while (completed == null) tasks.wait();
+    }
+    return completed;
+  }
+
+  public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException
{
+    synchronized (tasks) {
+      if (completed == null) unit.timedWait(tasks, timeout);
+    }
+    return completed;
+  }
+
+  public void cancelAll() {
+    for (QueueingFuture<V> future : tasks) {
+      if (future != null) future.cancel(true);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/949982fc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 8d937aa..50fdc5b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -21,7 +21,6 @@
 package org.apache.hadoop.hbase.client;
 
 
-import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 import com.google.protobuf.ServiceException;
 
-import org.htrace.Trace;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -100,7 +98,7 @@ public class RpcRetryingCallerWithReadReplicas {
    * - we need to stop retrying when the call is completed
    * - we can be interrupted
    */
-  class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
+  class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements
Cancellable {
     final int id;
     private final PayloadCarryingRpcController controller;
 
@@ -113,7 +111,8 @@ public class RpcRetryingCallerWithReadReplicas {
       controller.setPriority(tableName);
     }
 
-    public void startCancel() {
+    @Override
+    public void cancel() {
       controller.startCancel();
     }
 
@@ -170,6 +169,11 @@ public class RpcRetryingCallerWithReadReplicas {
         throw ProtobufUtil.getRemoteException(se);
       }
     }
+
+    @Override
+    public boolean isCancelled() {
+      return controller.isCanceled();
+    }
   }
 
   /**
@@ -195,7 +199,8 @@ public class RpcRetryingCallerWithReadReplicas {
 
     RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
         : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
-    ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size());
+    ResultBoundedCompletionService<Result> cs =
+        new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool,
rl.size());
 
     if(isTargetReplicaSpecified) {
       addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
@@ -274,12 +279,12 @@ public class RpcRetryingCallerWithReadReplicas {
    * @param min - the id of the first replica, inclusive
    * @param max - the id of the last replica, inclusive.
    */
-  private void addCallsForReplica(ResultBoundedCompletionService cs,
+  private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
                                  RegionLocations rl, int min, int max) {
     for (int id = min; id <= max; id++) {
       HRegionLocation hrl = rl.getRegionLocation(id);
       ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
-      cs.submit(callOnReplica, callTimeout);
+      cs.submit(callOnReplica, callTimeout, id);
     }
   }
 
@@ -309,137 +314,4 @@ public class RpcRetryingCallerWithReadReplicas {
 
     return rl;
   }
-
-
-  /**
-   * A completion service for the RpcRetryingCallerFactory.
-   * Keeps the list of the futures, and allows to cancel them all.
-   * This means as well that it can be used for a small set of tasks only.
-   * <br>Implementation is not Thread safe.
-   */
-  public class ResultBoundedCompletionService {
-    private final Executor executor;
-    private final QueueingFuture[] tasks; // all the tasks
-    private volatile QueueingFuture completed = null;
-
-    class QueueingFuture implements RunnableFuture<Result> {
-      private final ReplicaRegionServerCallable future;
-      private Result result = null;
-      private ExecutionException exeEx = null;
-      private volatile boolean canceled;
-      private final int callTimeout;
-      private final RpcRetryingCaller<Result> retryingCaller;
-
-
-      public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) {
-        this.future = future;
-        this.callTimeout = callTimeout;
-        this.retryingCaller = rpcRetryingCallerFactory.<Result>newCaller();
-      }
-
-      @Override
-      public void run() {
-        try {
-          if (!canceled) {
-            result =
-                rpcRetryingCallerFactory.<Result>newCaller().callWithRetries(future,
callTimeout);
-          }
-        } catch (Throwable t) {
-          exeEx = new ExecutionException(t);
-        } finally {
-          if (!canceled && completed == null) {
-            completed = QueueingFuture.this;
-            synchronized (tasks) {
-              tasks.notify();
-            }
-          }
-        }
-      }
-
-      @Override
-      public boolean cancel(boolean mayInterruptIfRunning) {
-        if (result != null || exeEx != null) return false;
-        retryingCaller.cancel();
-        future.startCancel();
-        canceled = true;
-        return true;
-      }
-
-      @Override
-      public boolean isCancelled() {
-        return canceled;
-      }
-
-      @Override
-      public boolean isDone() {
-        return result != null || exeEx != null;
-      }
-
-      @Override
-      public Result get() throws InterruptedException, ExecutionException {
-        try {
-          return get(1000, TimeUnit.DAYS);
-        } catch (TimeoutException e) {
-          throw new RuntimeException("You did wait for 1000 days here?", e);
-        }
-      }
-
-      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
-          justification="Is this an issue?")
-      @Override
-      public Result get(long timeout, TimeUnit unit)
-          throws InterruptedException, ExecutionException, TimeoutException {
-        synchronized (tasks) {
-          if (result != null) {
-            return result;
-          }
-          if (exeEx != null) {
-            throw exeEx;
-          }
-          unit.timedWait(tasks, timeout);
-        }
-        // Findbugs says this null check is redundant.  Will result be set across the wait
above?
-        if (result != null) {
-          return result;
-        }
-        if (exeEx != null) {
-          throw exeEx;
-        }
-
-        throw new TimeoutException("timeout=" + timeout + ", " + unit);
-      }
-    }
-
-    public ResultBoundedCompletionService(Executor executor, int maxTasks) {
-      this.executor = executor;
-      this.tasks = new QueueingFuture[maxTasks];
-    }
-
-
-    public void submit(ReplicaRegionServerCallable task, int callTimeout) {
-      QueueingFuture newFuture = new QueueingFuture(task, callTimeout);
-      executor.execute(Trace.wrap(newFuture));
-      tasks[task.id] = newFuture;
-    }
-
-    public QueueingFuture take() throws InterruptedException {
-      synchronized (tasks) {
-        while (completed == null) tasks.wait();
-      }
-      return completed;
-    }
-
-    public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException {
-      synchronized (tasks) {
-        if (completed == null) unit.timedWait(tasks, timeout);
-      }
-      return completed;
-    }
-
-    public void cancelAll() {
-      for (QueueingFuture future : tasks) {
-        if (future != null) future.cancel(true);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/949982fc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 5ecc363..2fb5966 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -89,6 +89,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]>
{
   protected boolean isRegionServerRemote = true;
   private long nextCallSeq = 0;
   protected RpcControllerFactory controllerFactory;
+  protected PayloadCarryingRpcController controller;
 
   /**
    * @param connection which connection
@@ -124,6 +125,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]>
{
     this.controllerFactory = rpcControllerFactory;
   }
 
+  PayloadCarryingRpcController getController() {
+    return controller;
+  }
+
   /**
    * @param reload force reload of server location
    * @throws IOException
@@ -192,7 +197,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]>
{
           incRPCcallsMetrics();
           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
           ScanResponse response = null;
-          PayloadCarryingRpcController controller = controllerFactory.newController();
+          controller = controllerFactory.newController();
           controller.setPriority(getTableName());
           controller.setCallTimeout(callTimeout);
           try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/949982fc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 440cddf..8b6fc5c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -39,8 +39,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.BoundedCompletionService;
 import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.annotations.VisibleForTesting;
 /**
  * This class has the logic for handling scanners for regions with and without replicas.
  * 1. A scan is attempted on the default (primary) region
@@ -69,8 +70,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
   private Configuration conf;
   private int scannerTimeout;
   private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
+  private boolean someRPCcancelled = false; //required for testing purposes only
 
-  public ScannerCallableWithReplicas (TableName tableName, ClusterConnection cConnection,
+  public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
       ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
       int retries, int scannerTimeout, int caching, Configuration conf,
       RpcRetryingCaller<Result []> caller) {
@@ -134,8 +136,10 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
 
     // allocate a boundedcompletion pool of some multiple of number of replicas.
     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
-    BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
-        new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size()
* 5);
+    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
+        new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
+            new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
+            rl.size() * 5);
 
     List<ExecutionException> exceptions = null;
     int submitted = 0, completed = 0;
@@ -192,7 +196,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
     } finally {
       // We get there because we were interrupted or because one or more of the
       // calls succeeded or failed. In all case, we stop all our tasks.
-      cs.cancelAll(true);
+      cs.cancelAll();
     }
 
     if (exceptions != null && !exceptions.isEmpty()) {
@@ -226,8 +230,14 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
         // want to wait for the "close" to happen yet. The "wait" will happen when
         // the table is closed (when the awaitTermination of the underlying pool is called)
         s.setClose();
-        RetryingRPC r = new RetryingRPC(s);
-        pool.submit(r);
+        final RetryingRPC r = new RetryingRPC(s);
+        pool.submit(new Callable<Void>(){
+          @Override
+          public Void call() throws Exception {
+            r.call(scannerTimeout);
+            return null;
+          }
+        });
       }
       // now clear outstandingCallables since we scheduled a close for all the contained
scanners
       outstandingCallables.clear();
@@ -244,16 +254,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
   }
 
   private int addCallsForCurrentReplica(
-      BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations
rl) {
+      ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations
rl) {
     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
     outstandingCallables.add(currentScannerCallable);
-    cs.submit(retryingOnReplica);
+    cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
     return 1;
   }
 
   private int addCallsForOtherReplicas(
-      BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations
rl, int min,
-      int max) {
+      ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations
rl,
+      int min, int max) {
     if (scan.getConsistency() == Consistency.STRONG) {
       return 0; // not scheduling on other replicas for strong consistency
     }
@@ -267,32 +277,85 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
       }
       outstandingCallables.add(s);
       RetryingRPC retryingOnReplica = new RetryingRPC(s);
-      cs.submit(retryingOnReplica);
+      cs.submit(retryingOnReplica, scannerTimeout, id);
     }
     return max - min + 1;
   }
 
-  class RetryingRPC implements Callable<Pair<Result[], ScannerCallable>> {
+  @VisibleForTesting
+  boolean isAnyRPCcancelled() {
+    return someRPCcancelled;
+  }
+
+  class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>,
Cancellable {
     final ScannerCallable callable;
+    RpcRetryingCaller<Result[]> caller;
+    private volatile boolean cancelled = false;
 
     RetryingRPC(ScannerCallable callable) {
       this.callable = callable;
-    }
-
-    @Override
-    public Pair<Result[], ScannerCallable> call() throws IOException {
       // For the Consistency.STRONG (default case), we reuse the caller
       // to keep compatibility with what is done in the past
       // For the Consistency.TIMELINE case, we can't reuse the caller
       // since we could be making parallel RPCs (caller.callWithRetries is synchronized
       // and we can't invoke it multiple times at the same time)
-      RpcRetryingCaller<Result[]> caller = ScannerCallableWithReplicas.this.caller;
+      this.caller = ScannerCallableWithReplicas.this.caller;
       if (scan.getConsistency() == Consistency.TIMELINE) {
-        caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
+        this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
             <Result[]>newCaller();
       }
-      Result[] res = caller.callWithRetries(callable, scannerTimeout);
-      return new Pair<Result[], ScannerCallable>(res, callable);
+    }
+
+    @Override
+    public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException
{
+      // since the retries is done within the ResultBoundedCompletionService,
+      // we don't invoke callWithRetries here
+      if (cancelled) {
+        return null;
+      }
+      Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
+      return new Pair<Result[], ScannerCallable>(res, this.callable);
+    }
+
+    @Override
+    public void prepare(boolean reload) throws IOException {
+      if (cancelled) return;
+
+      if (Thread.interrupted()) {
+        throw new InterruptedIOException();
+      }
+
+      callable.prepare(reload);
+    }
+
+    @Override
+    public void throwable(Throwable t, boolean retrying) {
+      callable.throwable(t, retrying);
+    }
+
+    @Override
+    public String getExceptionMessageAdditionalDetail() {
+      return callable.getExceptionMessageAdditionalDetail();
+    }
+
+    @Override
+    public long sleep(long pause, int tries) {
+      return callable.sleep(pause, tries);
+    }
+
+    @Override
+    public void cancel() {
+      cancelled = true;
+      caller.cancel();
+      if (callable.getController() != null) {
+        callable.getController().startCancel();
+      }
+      someRPCcancelled = true;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return cancelled;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/949982fc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 9697caa..b56fe3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -544,6 +544,54 @@ public class TestReplicasClient {
     runMultipleScansOfOneType(true, false);
   }
 
+  @Test
+  public void testCancelOfScan() throws Exception {
+    openRegion(hriSecondary);
+    int NUMROWS = 100;
+    try {
+      for (int i = 0; i < NUMROWS; i++) {
+        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
+        Put p = new Put(b1);
+        p.add(f, b1, b1);
+        table.put(p);
+      }
+      LOG.debug("PUT done");
+      int caching = 20;
+      byte[] start;
+      start = Bytes.toBytes("testUseRegionWithReplica" + 0);
+
+      flushRegion(hriPrimary);
+      LOG.info("flush done");
+      Thread.sleep(1000 + REFRESH_PERIOD * 2);
+
+      // now make some 'next' calls slow
+      SlowMeCopro.slowDownNext.set(true);
+      SlowMeCopro.countOfNext.set(0);
+      SlowMeCopro.sleepTime.set(5000);
+
+      Scan scan = new Scan(start);
+      scan.setCaching(caching);
+      scan.setConsistency(Consistency.TIMELINE);
+      ResultScanner scanner = table.getScanner(scan);
+      Iterator<Result> iter = scanner.iterator();
+      iter.next();
+      Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
+      SlowMeCopro.slowDownNext.set(false);
+      SlowMeCopro.countOfNext.set(0);
+    } finally {
+      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.sleepTime.set(0);
+      SlowMeCopro.slowDownNext.set(false);
+      SlowMeCopro.countOfNext.set(0);
+      for (int i = 0; i < NUMROWS; i++) {
+        byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
+        Delete d = new Delete(b1);
+        table.delete(d);
+      }
+      closeRegion(hriSecondary);
+    }
+  }
+
   private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception
{
     openRegion(hriSecondary);
     int NUMROWS = 100;


Mime
View raw message