hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject git commit: HBASE-11564 Improve cancellation management in the rpc layer
Date Thu, 24 Jul 2014 15:49:06 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 040936845 -> d8562052a


HBASE-11564 Improve cancellation management in the rpc layer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d8562052
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d8562052
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d8562052

Branch: refs/heads/branch-1
Commit: d8562052a4f5c956a514becf6439442763387e86
Parents: 0409368
Author: Nicolas Liochon <nkeywal@gmail.com>
Authored: Thu Jul 24 17:49:01 2014 +0200
Committer: Nicolas Liochon <nkeywal@gmail.com>
Committed: Thu Jul 24 17:49:01 2014 +0200

----------------------------------------------------------------------
 .../hadoop/hbase/client/RpcRetryingCaller.java  |  25 +-
 .../RpcRetryingCallerWithReadReplicas.java      | 264 +++++++++++++------
 .../client/ScannerCallableWithReplicas.java     |   2 +-
 .../org/apache/hadoop/hbase/ipc/RpcClient.java  | 100 ++++---
 .../hbase/ipc/TimeLimitedRpcController.java     |  19 +-
 .../apache/hadoop/hbase/util/ExceptionUtil.java |   3 +-
 .../hbase/IntegrationTestRegionReplicaPerf.java |   2 +-
 .../apache/hadoop/hbase/ipc/BufferChain.java    |   3 +-
 .../hadoop/hbase/procedure/Subprocedure.java    |   2 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |   2 +-
 .../org/apache/hadoop/hbase/ipc/TestIPC.java    |  10 +-
 11 files changed, 300 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index 7ab772e..286b6fe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -25,12 +25,12 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.ipc.RemoteException;
@@ -60,6 +60,7 @@ public class RpcRetryingCaller<T> {
 
   private final long pause;
   private final int retries;
+  private final AtomicBoolean cancelled = new AtomicBoolean(false);
 
   public RpcRetryingCaller(long pause, int retries) {
     this.pause = pause;
@@ -70,6 +71,7 @@ public class RpcRetryingCaller<T> {
     if (callTimeout <= 0) {
       return 0;
     } else {
+      if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
       int remainingTime = (int) (callTimeout -
           (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
       if (remainingTime < MIN_RPC_TIMEOUT) {
@@ -82,6 +84,13 @@ public class RpcRetryingCaller<T> {
     }
   }
 
+  public void cancel(){
+    cancelled.set(true);
+    synchronized (cancelled){
+      cancelled.notifyAll();
+    }
+  }
+
   /**
    * Retries if invocation fails.
    * @param callTimeout Timeout for this call
@@ -103,9 +112,11 @@ public class RpcRetryingCaller<T> {
       } catch (Throwable t) {
         ExceptionUtil.rethrowIfInterrupt(t);
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime="
+
-              (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms",
t);
+          LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", started="
+
+              (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + " ms
ago, "
+              + "cancelled=" + cancelled.get(), t);
         }
+
         // translateException throws exception when should not retry: i.e. when request is
bad.
         t = translateException(t);
         callable.throwable(t, retries != 1);
@@ -130,7 +141,13 @@ public class RpcRetryingCaller<T> {
         }
       }
       try {
-        Thread.sleep(expectedSleep);
+        if (expectedSleep > 0) {
+          synchronized (cancelled) {
+            if (cancelled.get()) return null;
+            cancelled.wait(expectedSleep);
+          }
+        }
+        if (cancelled.get()) return null;
       } catch (InterruptedException e) {
         throw new InterruptedIOException("Interrupted after " + tries + " tries  on " + retries);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index c0c75be..1c733b6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -21,18 +21,7 @@
 package org.apache.hadoop.hbase.client;
 
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -47,10 +36,20 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.BoundedCompletionService;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Caller that goes to replica if the primary region does no answer within a configurable
@@ -60,6 +59,7 @@ import com.google.protobuf.ServiceException;
  */
 public class RpcRetryingCallerWithReadReplicas {
   static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
+
   protected final ExecutorService pool;
   protected final ClusterConnection cConnection;
   protected final Configuration conf;
@@ -69,12 +69,13 @@ public class RpcRetryingCallerWithReadReplicas {
   private final int callTimeout;
   private final int retries;
   private final RpcControllerFactory rpcControllerFactory;
+  private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
 
   public RpcRetryingCallerWithReadReplicas(
       RpcControllerFactory rpcControllerFactory, TableName tableName,
-                                           ClusterConnection cConnection, final Get get,
-                                           ExecutorService pool, int retries, int callTimeout,
-                                           int timeBeforeReplicas) {
+      ClusterConnection cConnection, final Get get,
+      ExecutorService pool, int retries, int callTimeout,
+      int timeBeforeReplicas) {
     this.rpcControllerFactory = rpcControllerFactory;
     this.tableName = tableName;
     this.cConnection = cConnection;
@@ -84,6 +85,7 @@ public class RpcRetryingCallerWithReadReplicas {
     this.retries = retries;
     this.callTimeout = callTimeout;
     this.timeBeforeReplicas = timeBeforeReplicas;
+    this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
   }
 
   /**
@@ -94,12 +96,19 @@ public class RpcRetryingCallerWithReadReplicas {
    */
   class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
     final int id;
+    private final PayloadCarryingRpcController controller;
 
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
       super(RpcRetryingCallerWithReadReplicas.this.cConnection,
           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
       this.id = id;
       this.location = location;
+      this.controller = rpcControllerFactory.newController();
+      controller.setPriority(tableName);
+    }
+
+    public void startCancel() {
+      controller.startCancel();
     }
 
     /**
@@ -109,6 +118,8 @@ public class RpcRetryingCallerWithReadReplicas {
      */
     @Override
     public void prepare(final boolean reload) throws IOException {
+      if (controller.isCanceled()) return;
+
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
@@ -125,13 +136,14 @@ public class RpcRetryingCallerWithReadReplicas {
       }
 
       ServerName dest = location.getServerName();
-      assert dest != null;
 
       setStub(cConnection.getClient(dest));
     }
 
     @Override
     public Result call(int callTimeout) throws Exception {
+      if (controller.isCanceled()) return null;
+
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
@@ -140,12 +152,13 @@ public class RpcRetryingCallerWithReadReplicas {
 
       ClientProtos.GetRequest request =
           RequestConverter.buildGetRequest(reg, get);
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-      controller.setPriority(tableName);
       controller.setCallTimeout(callTimeout);
+
       try {
         ClientProtos.GetResponse response = getStub().get(controller, request);
-        if (response == null) return null;
+        if (response == null) {
+          return null;
+        }
         return ProtobufUtil.toResult(response.getResult());
       } catch (ServiceException se) {
         throw ProtobufUtil.getRemoteException(se);
@@ -154,23 +167,6 @@ public class RpcRetryingCallerWithReadReplicas {
   }
 
   /**
-   * Adapter to put the HBase retrying caller into a Java callable.
-   */
-  class RetryingRPC implements Callable<Result> {
-    final RetryingCallable<Result> callable;
-
-    RetryingRPC(RetryingCallable<Result> callable) {
-      this.callable = callable;
-    }
-
-    @Override
-    public Result call() throws IOException {
-      return new RpcRetryingCallerFactory(conf).<Result>newCaller().
-          callWithRetries(callable, callTimeout);
-    }
-  }
-
-  /**
    * Algo:
    * - we put the query into the execution pool.
    * - after x ms, if we don't have a result, we add the queries for the secondary replicas
@@ -191,12 +187,9 @@ public class RpcRetryingCallerWithReadReplicas {
       throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
     RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID,
         cConnection, tableName, get.getRow());
-    BoundedCompletionService<Result> cs = new BoundedCompletionService<Result>(pool,
rl.size());
+    ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size());
 
-    List<ExecutionException> exceptions = null;
-    int submitted = 0, completed = 0;
-    // submit call for the primary replica.
-    submitted += addCallsForReplica(cs, rl, 0, 0);
+    addCallsForReplica(cs, rl, 0, 0);
     try {
       // wait for the timeout to see whether the primary responds back
       Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes,
microseconds
@@ -204,11 +197,7 @@ public class RpcRetryingCallerWithReadReplicas {
         return f.get(); //great we got a response
       }
     } catch (ExecutionException e) {
-      // the primary call failed with RetriesExhaustedException or DoNotRetryIOException
-      // but the secondaries might still succeed. Continue on the replica RPCs.
-      exceptions = new ArrayList<ExecutionException>(rl.size());
-      exceptions.add(e);
-      completed++;
+      throwEnrichedException(e, retries);
     } catch (CancellationException e) {
       throw new InterruptedIOException();
     } catch (InterruptedException e) {
@@ -216,20 +205,13 @@ public class RpcRetryingCallerWithReadReplicas {
     }
 
     // submit call for the all of the secondaries at once
-    // TODO: this may be an overkill for large region replication
-    submitted += addCallsForReplica(cs, rl, 1, rl.size() - 1);
+    addCallsForReplica(cs, rl, 1, rl.size() - 1);
     try {
-      while (completed < submitted) {
-        try {
-          Future<Result> f = cs.take();
-          return f.get(); // great we got an answer
-        } catch (ExecutionException e) {
-          // if not cancel or interrupt, wait until all RPC's are done
-          // one of the tasks failed. Save the exception for later.
-          if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
-          exceptions.add(e);
-          completed++;
-        }
+      try {
+        Future<Result> f = cs.take();
+        return f.get();
+      } catch (ExecutionException e) {
+        throwEnrichedException(e, retries);
       }
     } catch (CancellationException e) {
       throw new InterruptedIOException();
@@ -238,12 +220,9 @@ public class RpcRetryingCallerWithReadReplicas {
     } finally {
       // We get there because we were interrupted or because one or more of the
       // calls succeeded or failed. In all case, we stop all our tasks.
-      cs.cancelAll(true);
+      cs.cancelAll();
     }
 
-    if (exceptions != null && !exceptions.isEmpty()) {
-      throwEnrichedException(exceptions.get(0), retries, toString()); // just rethrow the
first exception for now.
-    }
     return null; // unreachable
   }
 
@@ -251,7 +230,7 @@ public class RpcRetryingCallerWithReadReplicas {
    * Extract the real exception from the ExecutionException, and throws what makes more
    * sense.
    */
-  static void throwEnrichedException(ExecutionException e, int retries, String str)
+  static void throwEnrichedException(ExecutionException e, int retries)
       throws RetriesExhaustedException, DoNotRetryIOException {
     Throwable t = e.getCause();
     assert t != null; // That's what ExecutionException is about: holding an exception
@@ -266,7 +245,7 @@ public class RpcRetryingCallerWithReadReplicas {
 
     RetriesExhaustedException.ThrowableWithExtraContext qt =
         new RetriesExhaustedException.ThrowableWithExtraContext(t,
-            EnvironmentEdgeManager.currentTimeMillis(), str);
+            EnvironmentEdgeManager.currentTimeMillis(), null);
 
     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
         Collections.singletonList(qt);
@@ -277,26 +256,24 @@ public class RpcRetryingCallerWithReadReplicas {
   /**
    * Creates the calls and submit them
    *
-   * @param cs         - the completion service to use for submitting
-   * @param rl         - the region locations
-   * @param min        - the id of the first replica, inclusive
-   * @param max        - the id of the last replica, inclusive.
-   * @return the number of submitted calls
+   * @param cs  - the completion service to use for submitting
+   * @param rl  - the region locations
+   * @param min - the id of the first replica, inclusive
+   * @param max - the id of the last replica, inclusive.
    */
-  private int addCallsForReplica(BoundedCompletionService<Result> cs,
-                                  RegionLocations rl, int min, int max) {
+  private void addCallsForReplica(ResultBoundedCompletionService cs,
+                                 RegionLocations rl, int min, int max) {
     for (int id = min; id <= max; id++) {
       HRegionLocation hrl = rl.getRegionLocation(id);
       ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
-      RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica);
-      cs.submit(retryingOnReplica);
+      cs.submit(callOnReplica, callTimeout);
     }
-    return max - min + 1;
   }
 
   static RegionLocations getRegionLocations(boolean useCache, int replicaId,
-      ClusterConnection cConnection, TableName tableName, byte[] row)
+                 ClusterConnection cConnection, TableName tableName, byte[] row)
       throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
+
     RegionLocations rl;
     try {
       rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId);
@@ -315,4 +292,135 @@ public class RpcRetryingCallerWithReadReplicas {
 
     return rl;
   }
+
+
+  /**
+   * A completion service for the RpcRetryingCallerFactory.
+   * Keeps the list of the futures, and allows to cancel them all.
+   * This means as well that it can be used for a small set of tasks only.
+   * <br>Implementation is not Thread safe.
+   */
+  public class ResultBoundedCompletionService {
+    private final Executor executor;
+    private final QueueingFuture[] tasks; // all the tasks
+    private volatile QueueingFuture completed = null;
+
+    class QueueingFuture implements RunnableFuture<Result> {
+      private final ReplicaRegionServerCallable future;
+      private Result result = null;
+      private ExecutionException exeEx = null;
+      private volatile boolean canceled;
+      private final int callTimeout;
+      private final RpcRetryingCaller<Result> retryingCaller;
+
+
+      public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) {
+        this.future = future;
+        this.callTimeout = callTimeout;
+        this.retryingCaller = rpcRetryingCallerFactory.<Result>newCaller();
+      }
+
+      @Override
+      public void run() {
+        try {
+          if (!canceled) {
+            result =
+                rpcRetryingCallerFactory.<Result>newCaller().callWithRetries(future,
callTimeout);
+          }
+        } catch (Throwable t) {
+          exeEx = new ExecutionException(t);
+        } finally {
+          if (!canceled && completed == null) {
+            completed = QueueingFuture.this;
+            synchronized (tasks) {
+              tasks.notify();
+            }
+          }
+        }
+      }
+
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        if (result != null || exeEx != null) return false;
+        retryingCaller.cancel();
+        future.startCancel();
+        canceled = true;
+        return true;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return canceled;
+      }
+
+      @Override
+      public boolean isDone() {
+        return result != null || exeEx != null;
+      }
+
+      @Override
+      public Result get() throws InterruptedException, ExecutionException {
+        try {
+          return get(1000, TimeUnit.DAYS);
+        } catch (TimeoutException e) {
+          throw new RuntimeException("You did wait for 1000 days here?", e);
+        }
+      }
+
+      @Override
+      public Result get(long timeout, TimeUnit unit)
+          throws InterruptedException, ExecutionException, TimeoutException {
+        synchronized (tasks) {
+          if (result != null) {
+            return result;
+          }
+          if (exeEx != null) {
+            throw exeEx;
+          }
+          unit.timedWait(tasks, timeout);
+        }
+
+        if (result != null) {
+          return result;
+        }
+        if (exeEx != null) {
+          throw exeEx;
+        }
+
+        throw new TimeoutException();
+      }
+    }
+
+    public ResultBoundedCompletionService(Executor executor, int maxTasks) {
+      this.executor = executor;
+      this.tasks = new QueueingFuture[maxTasks];
+    }
+
+
+    public void submit(ReplicaRegionServerCallable task, int callTimeout) {
+      QueueingFuture newFuture = new QueueingFuture(task, callTimeout);
+      executor.execute(newFuture);
+      tasks[task.id] = newFuture;
+    }
+
+    public QueueingFuture take() throws InterruptedException {
+      synchronized (tasks) {
+        if (completed == null) tasks.wait();
+      }
+      return completed;
+    }
+
+    public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException {
+      synchronized (tasks) {
+        if (completed == null) unit.timedWait(tasks, timeout);
+      }
+      return completed;
+    }
+
+    public void cancelAll() {
+      for (QueueingFuture future : tasks) {
+        if (future != null) future.cancel(true);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 4c99e01..709ae62 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -198,7 +198,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
 
     if (exceptions != null && !exceptions.isEmpty()) {
       RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
-          retries, toString()); // just rethrow the first exception for now.
+          retries); // just rethrow the first exception for now.
     }
     return null; // unreachable
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 620fb18..5959da3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -24,6 +24,7 @@ import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.Message.Builder;
+import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import com.google.protobuf.TextFormat;
@@ -115,13 +116,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class RpcClient {
   // The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all
under
   // o.a.h.hbase is set to DEBUG as default).
-  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient");
+  public static final Log LOG = LogFactory.getLog(RpcClient.class);
   protected final PoolMap<ConnectionId, Connection> connections;
 
   protected final AtomicInteger callIdCnt = new AtomicInteger();
   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final protected Configuration conf;
-  protected final int minIdleTimeBeforeClose; // if the connection is iddle for more than
this
+  protected final int minIdleTimeBeforeClose; // if the connection is idle for more than
this
                                                // time (in ms), it will be closed at any
moment.
   final protected int maxRetries; //the max. no. of retries for socket connections
   final protected long failureSleep; // Time to sleep before retry on failure.
@@ -168,7 +169,7 @@ public class RpcClient {
       "hbase.ipc.client.fallback-to-simple-auth-allowed";
   public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
 
-  public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt";
+  public static final String SPECIFIC_WRITE_THREAD = "hbase.ipc.client.specificThreadForWriting";
 
   /**
    * A class to manage a list of servers that failed recently.
@@ -426,7 +427,10 @@ public class RpcClient {
       public CallFuture sendCall(Call call, int priority, Span span)
           throws InterruptedException, IOException {
         CallFuture cts = new CallFuture(call, priority, span);
-        callsToWrite.add(cts);
+        if (!callsToWrite.offer(cts)) {
+          throw new IOException("Can't add the call " + call.id +
+              " to the write queue. callsToWrite.size()=" + callsToWrite.size());
+        }
         checkIsOpen(); // We check after the put, to be sure that the call we added won't
stay
                        //  in the list while the cleanup was already done.
         return cts;
@@ -448,7 +452,11 @@ public class RpcClient {
 
       public void remove(CallFuture cts){
         callsToWrite.remove(cts);
+
+        // By removing the call from the expected call list, we make the list smaller, but
+        //  it means as well that we don't know how many calls we cancelled.
         calls.remove(cts.call.id);
+        cts.call.callComplete();
       }
 
       /**
@@ -464,7 +472,7 @@ public class RpcClient {
             markClosed(new InterruptedIOException());
           }
 
-          if (cts == null || cts == CallFuture.DEATH_PILL){
+          if (cts == null || cts == CallFuture.DEATH_PILL) {
             assert shouldCloseConnection.get();
             break;
           }
@@ -580,7 +588,7 @@ public class RpcClient {
         + ticket.getUserName())));
       this.setDaemon(true);
 
-      if (conf.getBoolean(ALLOWS_INTERRUPTS, false)) {
+      if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
         callSender = new CallSender(getName(), conf);
         callSender.start();
       } else {
@@ -608,6 +616,7 @@ public class RpcClient {
     }
 
 
+
     protected synchronized void setupConnection() throws IOException {
       short ioFailures = 0;
       short timeoutFailures = 0;
@@ -717,7 +726,7 @@ public class RpcClient {
      * it is idle too long, it is marked as to be closed,
      * or the client is marked as not running.
      *
-     * Return true if it is time to read a response; false otherwise.
+     * @return true if it is time to read a response; false otherwise.
      */
     protected synchronized boolean waitForWork() throws InterruptedException {
       // beware of the concurrent access to the calls list: we can add calls, but as well
@@ -743,13 +752,18 @@ public class RpcClient {
         return true;
       }
 
-      // Connection is idle.
-      // We expect the number of calls to be zero here, but actually someone can
-      //  adds a call at the any moment, as there is no synchronization between this task
-      //  and adding new calls. It's not a big issue, but it will get an exception.
-      markClosed(new IOException(
-          "idle connection closed with " + calls.size() + " pending request(s)"));
+      if (EnvironmentEdgeManager.currentTimeMillis() >= waitUntil) {
+        // Connection is idle.
+        // We expect the number of calls to be zero here, but actually someone can
+        //  adds a call at the any moment, as there is no synchronization between this task
+        //  and adding new calls. It's not a big issue, but it will get an exception.
+        markClosed(new IOException(
+            "idle connection closed with " + calls.size() + " pending request(s)"));
+        return false;
+      }
 
+      // We can get here if we received a notification that there is some work to do but
+      //  the work was cancelled. As we're not idle we continue to wait.
       return false;
     }
 
@@ -767,15 +781,19 @@ public class RpcClient {
         while (waitForWork()) { // Wait here for work - read or close connection
           readResponse();
         }
+      } catch (InterruptedException t) {
+        LOG.debug(getName() + ": interrupted while waiting for call responses");
+        markClosed(ExceptionUtil.asInterrupt(t));
       } catch (Throwable t) {
-        LOG.debug(getName() + ": unexpected exception receiving call responses", t);
-        markClosed(new IOException("Unexpected exception receiving call responses", t));
+        LOG.debug(getName() + ": unexpected throwable while waiting for call responses",
t);
+        markClosed(new IOException("Unexpected throwable while waiting call responses", t));
       }
 
       close();
 
-      if (LOG.isDebugEnabled())
+      if (LOG.isDebugEnabled()) {
         LOG.debug(getName() + ": stopped, connections " + connections.size());
+      }
     }
 
     private synchronized void disposeSasl() {
@@ -1146,8 +1164,10 @@ public class RpcClient {
           // this connection.
           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
           int whatIsLeftToRead = totalSize - readSoFar;
-          LOG.debug("Unknown callId: " + id + ", skipping over this response of " +
-              whatIsLeftToRead + " bytes");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Unknown callId: " + id + ", skipping over this response of " +
+                whatIsLeftToRead + " bytes");
+          }
           IOUtils.skipFully(in, whatIsLeftToRead);
         }
         if (responseHeader.hasException()) {
@@ -1188,16 +1208,10 @@ public class RpcClient {
         }
       } finally {
         cleanupCalls(false);
-        if (expectedCall && !call.done) {
-          LOG.warn("Coding error: code should be true for callId=" + call.id +
-              ", server=" + getRemoteAddress() +
-              ", shouldCloseConnection=" + shouldCloseConnection.get());
-        }
       }
     }
 
     /**
-     * @param e
      * @return True if the exception is a fatal connection exception.
      */
     private boolean isFatalConnectionException(final ExceptionResponse e) {
@@ -1225,7 +1239,7 @@ public class RpcClient {
 
       if (shouldCloseConnection.compareAndSet(false, true)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(getName() + ": marking at should close, reason =" + e.getMessage());
+          LOG.debug(getName() + ": marking at should close, reason: " + e.getMessage());
         }
         if (callSender != null) {
           callSender.close();
@@ -1447,10 +1461,12 @@ public class RpcClient {
     }
   }
 
-  Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
+  Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
+                                  MethodDescriptor md, Message param, CellScanner cells,
       Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout)
   throws InterruptedException, IOException {
-    return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
+    return
+        call(pcrc, md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
   }
 
   /** Make a call, passing <code>param</code>, to the IPC server running at
@@ -1465,16 +1481,34 @@ public class RpcClient {
    * @throws InterruptedException
    * @throws IOException
    */
-  Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
+  Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor
md,
+                                  Message param, CellScanner cells,
       Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
       throws IOException, InterruptedException {
-    Call call = new Call(md, param, cells, returnType, callTimeout);
-    Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor);
+    final Call call = new Call(md, param, cells, returnType, callTimeout);
+
+    final Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor);
 
-    CallFuture cts = null;
-    if (connection.callSender != null){
+    final CallFuture cts;
+    if (connection.callSender != null) {
       cts = connection.callSender.sendCall(call, priority, Trace.currentSpan());
+      if (pcrc != null) {
+        pcrc.notifyOnCancel(new RpcCallback<Object>() {
+          @Override
+          public void run(Object parameter) {
+            connection.callSender.remove(cts);
+            call.callComplete();
+          }
+        });
+        if (pcrc.isCanceled()) {
+          // To finish if the call was cancelled before we set the notification (race condition)
+          call.callComplete();
+          return new Pair<Message, CellScanner>(call.response, call.cells);
+        }
+      }
+
     } else {
+      cts = null;
       connection.tracedWriteRequest(call, priority, Trace.currentSpan());
     }
 
@@ -1663,7 +1697,7 @@ public class RpcClient {
     }
     Pair<Message, CellScanner> val;
     try {
-      val = call(md, param, cells, returnType, ticket, isa, callTimeout,
+      val = call(pcrc, md, param, cells, returnType, ticket, isa, callTimeout,
         pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
       if (pcrc != null) {
         // Shove the results into controller so can be carried across the proxy/pb service
void.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
index 5117388..ec98a5f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
@@ -22,12 +22,18 @@ package org.apache.hadoop.hbase.ipc;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class TimeLimitedRpcController implements RpcController {
 
   /**
    * The time, in ms before the call should expire.
    */
-  protected Integer callTimeout;
+  protected volatile Integer callTimeout;
+  protected volatile boolean cancelled = false;
+  protected final AtomicReference<RpcCallback<Object>> cancellationCb =
+      new AtomicReference<RpcCallback<Object>>(null);
 
   public Integer getCallTimeout() {
     return callTimeout;
@@ -53,12 +59,12 @@ public class TimeLimitedRpcController implements RpcController {
 
   @Override
   public boolean isCanceled() {
-    throw new UnsupportedOperationException();
+    return cancelled;
   }
 
   @Override
-  public void notifyOnCancel(RpcCallback<Object> arg0) {
-    throw new UnsupportedOperationException();
+  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
+    this.cancellationCb.set(cancellationCb);
   }
 
   @Override
@@ -73,6 +79,9 @@ public class TimeLimitedRpcController implements RpcController {
 
   @Override
   public void startCancel() {
-    throw new UnsupportedOperationException();
+    cancelled = true;
+    if (cancellationCb.get() != null) {
+      cancellationCb.get().run(null);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
index 35bb259..d56055a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
@@ -59,7 +59,8 @@ public class ExceptionUtil {
     if (t instanceof InterruptedIOException) return (InterruptedIOException) t;
 
     if (t instanceof InterruptedException || t instanceof ClosedByInterruptException) {
-      InterruptedIOException iie = new InterruptedIOException();
+      InterruptedIOException iie =
+          new InterruptedIOException("Origin: " + t.getClass().getSimpleName());
       iie.initCause(t);
       return iie;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
index 8ea27bf..8471cb4 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
@@ -149,7 +149,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase
{
       conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
 
     // enable client-side settings
-    conf.setBoolean(RpcClient.ALLOWS_INTERRUPTS, true);
+    conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
     // TODO: expose these settings to CLI override
     conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
     conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index 8b0b568..940dbca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
  */
 @InterfaceAudience.Private
 class BufferChain {
-  private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0];
   private final ByteBuffer[] buffers;
   private int remaining = 0;
   private int bufferOffset = 0;
@@ -44,7 +43,7 @@ class BufferChain {
       bbs.add(b);
       this.remaining += b.remaining();
     }
-    this.buffers = bbs.toArray(FOR_TOARRAY_TYPE);
+    this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
index fc234f6..9ef5d23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
@@ -143,7 +143,7 @@ abstract public class Subprocedure implements Callable<Void> {
    *
    * This would normally be executed by the ProcedureMemeber when a acquire message comes
from the
    * coordinator.  Rpcs are used to spend message back to the coordinator after different
phases
-   * are executed.  Any exceptions caught during the execution (except for InterrupedException)
get
+   * are executed.  Any exceptions caught during the execution (except for InterruptedException)
get
    * converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted(
    * Subprocedure, ForeignException)}.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index e0f1dfb..54232b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -343,7 +343,7 @@ public class TestHCM {
     c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
     c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 0); // don't wait between retries.
     c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
-    c2.setBoolean(RpcClient.ALLOWS_INTERRUPTS, allowsInterrupt);
+    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
 
     final HTable table = new HTable(c2, tableName.getBytes());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8562052/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 128a91a..138103e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -190,7 +190,7 @@ public class TestIPC {
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       final String message = "hello";
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
-      Pair<Message, CellScanner> r = client.call(md, param, null,
+      Pair<Message, CellScanner> r = client.call(null, md, param, null,
         md.getOutputType().toProto(), User.getCurrent(), address, 0);
       assertTrue(r.getSecond() == null);
       // Silly assertion that the message is in the returned pb.
@@ -229,7 +229,7 @@ public class TestIPC {
       InetSocketAddress address = rpcServer.getListenerAddress();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      Pair<Message, CellScanner> r = client.call(md, param, CellUtil.createCellScanner(cells),
+      Pair<Message, CellScanner> r = client.call(null, md, param, CellUtil.createCellScanner(cells),
         md.getOutputType().toProto(), User.getCurrent(), address, 0);
       int index = 0;
       while (r.getSecond().advance()) {
@@ -263,7 +263,7 @@ public class TestIPC {
       InetSocketAddress address = rpcServer.getListenerAddress();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      client.call(md, param, null, null, User.getCurrent(), address, 0);
+      client.call(null, md, param, null, null, User.getCurrent(), address, 0);
       fail("Expected an exception to have been thrown!");
     } catch (Exception e) {
       LOG.info("Caught expected exception: " + e.toString());
@@ -287,7 +287,7 @@ public class TestIPC {
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
       for (int i = 0; i < 10; i++) {
-        client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
+        client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
             md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
0);
       }
       verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
@@ -342,7 +342,7 @@ public class TestIPC {
         }
         CellScanner cellScanner = CellUtil.createCellScanner(cells);
         Pair<Message, CellScanner> response =
-          client.call(md, builder.build(), cellScanner, param, user, address, 0);
+            client.call(null, md, builder.build(), cellScanner, param, user, address, 0);
         /*
         int count = 0;
         while (p.getSecond().advance()) {


Mime
View raw message