hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject hbase git commit: HBASE-18342 Add coprocessor service support for async admin
Date Fri, 14 Jul 2017 03:56:17 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 246d42297 -> 06a0bfc3b


HBASE-18342 Add coprocessor service support for async admin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/06a0bfc3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/06a0bfc3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/06a0bfc3

Branch: refs/heads/branch-2
Commit: 06a0bfc3baadf1fd2dba6a53dd8d22cc8109ea4c
Parents: 246d422
Author: Guanghao Zhang <zghao@apache.org>
Authored: Mon Jul 10 09:25:47 2017 +0800
Committer: Guanghao Zhang <zghao@apache.org>
Committed: Fri Jul 14 11:55:32 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |  49 ++++++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    |  16 ++
 .../client/AsyncRpcRetryingCallerFactory.java   |  63 ++++++-
 .../AsyncServerRequestRpcRetryingCaller.java    |  79 +++++++++
 .../client/MasterCoprocessorRpcChannelImpl.java |  86 ++++++++++
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java |  52 +++++-
 .../RegionServerCoprocessorRpcChannelImpl.java  |  86 ++++++++++
 .../TestAsyncCoprocessorEndpoint.java           | 167 +++++++++++++++++++
 8 files changed, 596 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/06a0bfc3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 7d904b3..1adf353 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.ClusterStatus;
@@ -36,12 +37,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.procedure2.LockInfo;
 import org.apache.hadoop.hbase.quotas.QuotaFilter;
 import org.apache.hadoop.hbase.quotas.QuotaSettings;
+import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.util.Pair;
 
+import com.google.protobuf.RpcChannel;
+
 /**
  * The asynchronous administrative API for HBase.
  * <p>
@@ -1060,4 +1064,49 @@ public interface AsyncAdmin {
    *         {@link CompletableFuture}
    */
   CompletableFuture<Integer> runCatalogJanitor();
+
+  /**
+   * Execute the given coprocessor call on the master.
+   * <p>
+   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is
only a
+   * one line lambda expression, like:
+   *
+   * <pre>
+   * <code>
+   * channel -> xxxService.newStub(channel)
+   * </code>
+   * </pre>
+   * @param stubMaker a delegation to the actual {@code newStub} call.
+   * @param callable a delegation to the actual protobuf rpc call. See the comment of
+   *          {@link CoprocessorCallable} for more details.
+   * @param <S> the type of the asynchronous stub
+   * @param <R> the type of the return value
+   * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
+   * @see CoprocessorCallable
+   */
+  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S>
stubMaker,
+      CoprocessorCallable<S, R> callable);
+
+  /**
+   * Execute the given coprocessor call on the given region server.
+   * <p>
+   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is
only a
+   * one line lambda expression, like:
+   *
+   * <pre>
+   * <code>
+   * channel -> xxxService.newStub(channel)
+   * </code>
+   * </pre>
+   * @param stubMaker a delegation to the actual {@code newStub} call.
+   * @param callable a delegation to the actual protobuf rpc call. See the comment of
+   *          {@link CoprocessorCallable} for more details.
+   * @param serverName the given region server
+   * @param <S> the type of the asynchronous stub
+   * @param <R> the type of the return value
+   * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
+   * @see CoprocessorCallable
+   */
+  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S>
stubMaker,
+    CoprocessorCallable<S, R> callable, ServerName serverName);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/06a0bfc3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 8e5a28c..ed7ac4d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
 import org.apache.hadoop.hbase.procedure2.LockInfo;
@@ -45,6 +47,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.util.Pair;
 
+import com.google.protobuf.RpcChannel;
+
 /**
  * The implementation of AsyncAdmin.
  */
@@ -617,4 +621,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   public CompletableFuture<Integer> runCatalogJanitor() {
     return wrap(rawAdmin.runCatalogJanitor());
   }
+
+  @Override
+  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel,
S> stubMaker,
+      CoprocessorCallable<S, R> callable) {
+    return wrap(rawAdmin.coprocessorService(stubMaker, callable));
+  }
+
+  @Override
+  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel,
S> stubMaker,
+      CoprocessorCallable<S, R> callable, ServerName serverName) {
+    return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/06a0bfc3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 270f265..0ee3b52 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -378,7 +378,7 @@ class AsyncRpcRetryingCallerFactory {
     return new MasterRequestCallerBuilder<>();
   }
 
-  public class AdminRequestCallerBuilder<T> extends BuilderBase{
+  public class AdminRequestCallerBuilder<T> extends BuilderBase {
     // TODO: maybe we can reuse AdminRequestCallerBuild, MasterRequestCallerBuild etc.
 
     private AsyncAdminRequestRetryingCaller.Callable<T> callable;
@@ -438,4 +438,65 @@ class AsyncRpcRetryingCallerFactory {
   public <T> AdminRequestCallerBuilder<T> adminRequest(){
     return new AdminRequestCallerBuilder<>();
   }
+
+  public class ServerRequestCallerBuilder<T> extends BuilderBase {
+
+    private AsyncServerRequestRpcRetryingCaller.Callable<T> callable;
+
+    private long operationTimeoutNs = -1L;
+
+    private long rpcTimeoutNs = -1L;
+
+    private ServerName serverName;
+
+    public ServerRequestCallerBuilder<T> action(
+        AsyncServerRequestRpcRetryingCaller.Callable<T> callable) {
+      this.callable = callable;
+      return this;
+    }
+
+    public ServerRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit
unit) {
+      this.operationTimeoutNs = unit.toNanos(operationTimeout);
+      return this;
+    }
+
+    public ServerRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit)
{
+      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+      return this;
+    }
+
+    public ServerRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
+      this.pauseNs = unit.toNanos(pause);
+      return this;
+    }
+
+    public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
+      this.maxAttempts = maxAttempts;
+      return this;
+    }
+
+    public ServerRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
+      this.startLogErrorsCnt = startLogErrorsCnt;
+      return this;
+    }
+
+    public ServerRequestCallerBuilder<T> serverName(ServerName serverName) {
+      this.serverName = serverName;
+      return this;
+    }
+
+    public AsyncServerRequestRpcRetryingCaller<T> build() {
+      return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs,
maxAttempts,
+          operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, serverName, checkNotNull(callable,
+            "action is null"));
+    }
+
+    public CompletableFuture<T> call() {
+      return build().call();
+    }
+  }
+
+  public <T> ServerRequestCallerBuilder<T> serverRequest() {
+    return new ServerRequestCallerBuilder<>();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/06a0bfc3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
new file mode 100644
index 0000000..72241ea
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+
+/**
+ * Retry caller for a request call to region server.
+ * Now only used for coprocessor call to region server.
+ */
+@InterfaceAudience.Private
+public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T>
{
+
+  @FunctionalInterface
+  public interface Callable<T> {
+    CompletableFuture<T> call(HBaseRpcController controller, ClientService.Interface
stub);
+  }
+
+  private final Callable<T> callable;
+  private ServerName serverName;
+
+  public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl
conn,
+      long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
+      int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
+    super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+        startLogErrorsCnt);
+    this.serverName = serverName;
+    this.callable = callable;
+  }
+
+  @Override
+  protected void doCall() {
+    ClientService.Interface stub;
+    try {
+      stub = this.conn.getRegionServerStub(serverName);
+    } catch (IOException e) {
+      onError(e, () -> "Get async admin stub to " + serverName + " failed", err ->
{
+      });
+      return;
+    }
+    resetCallTimeout();
+    callable.call(controller, stub).whenComplete((result, error) -> {
+      if (error != null) {
+        onError(error, () -> "Call to admin stub failed", err -> {
+        });
+        return;
+      }
+      future.complete(result);
+    });
+  }
+
+  CompletableFuture<T> call() {
+    doCall();
+    return future;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/06a0bfc3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
new file mode 100644
index 0000000..fcea508
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+
+/**
+ * The implementation of a master based coprocessor rpc channel.
+ */
+@InterfaceAudience.Private
+class MasterCoprocessorRpcChannelImpl implements RpcChannel {
+
+  MasterRequestCallerBuilder<Message> callerBuilder;
+
+  MasterCoprocessorRpcChannelImpl(MasterRequestCallerBuilder<Message> callerBuilder)
{
+    this.callerBuilder = callerBuilder;
+  }
+
+  private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
+      Message responsePrototype, HBaseRpcController controller, MasterService.Interface stub)
{
+    CompletableFuture<Message> future = new CompletableFuture<>();
+    CoprocessorServiceRequest csr =
+        CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
+    stub.execMasterService(
+      controller,
+      csr,
+      new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>()
{
+
+        @Override
+        public void run(CoprocessorServiceResponse resp) {
+          if (controller.failed()) {
+            future.completeExceptionally(controller.getFailed());
+          } else {
+            try {
+              future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
+            } catch (IOException e) {
+              future.completeExceptionally(e);
+            }
+          }
+        }
+      });
+    return future;
+  }
+
+  @Override
+  public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+      Message responsePrototype, RpcCallback<Message> done) {
+    callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
+        .whenComplete(((r, e) -> {
+          if (e != null) {
+            ((ClientCoprocessorRpcController) controller).setFailed(e);
+          }
+          done.run(r);
+        }));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/06a0bfc3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 0271a50..a87f195 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -34,10 +34,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
 
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
@@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
 import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
@@ -239,7 +244,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@@ -2838,4 +2843,49 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
             (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
         .call();
   }
+
+  @Override
+  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel,
S> stubMaker,
+      CoprocessorCallable<S, R> callable) {
+    MasterCoprocessorRpcChannelImpl channel =
+        new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
+    S stub = stubMaker.apply(channel);
+    CompletableFuture<R> future = new CompletableFuture<>();
+    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
+    callable.call(stub, controller, resp -> {
+      if (controller.failed()) {
+        future.completeExceptionally(controller.getFailed());
+      } else {
+        future.complete(resp);
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel,
S> stubMaker,
+      CoprocessorCallable<S, R> callable, ServerName serverName) {
+    RegionServerCoprocessorRpcChannelImpl channel =
+        new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
+          serverName));
+    S stub = stubMaker.apply(channel);
+    CompletableFuture<R> future = new CompletableFuture<>();
+    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
+    callable.call(stub, controller, resp -> {
+      if (controller.failed()) {
+        future.completeExceptionally(controller.getFailed());
+      } else {
+        future.complete(resp);
+      }
+    });
+    return future;
+  }
+
+  private <T> ServerRequestCallerBuilder<T> newServerCaller() {
+    return this.connection.callerFactory.<T> serverRequest()
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+        .startLogErrorsCnt(startLogErrorsCnt);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/06a0bfc3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
new file mode 100644
index 0000000..610eb60
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+
+/**
+ * The implementation of a region server based coprocessor rpc channel.
+ */
+@InterfaceAudience.Private
+public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel {
+
+  ServerRequestCallerBuilder<Message> callerBuilder;
+
+  RegionServerCoprocessorRpcChannelImpl(ServerRequestCallerBuilder<Message> callerBuilder)
{
+    this.callerBuilder = callerBuilder;
+  }
+
+  private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
+      Message responsePrototype, HBaseRpcController controller, ClientService.Interface stub)
{
+    CompletableFuture<Message> future = new CompletableFuture<>();
+    CoprocessorServiceRequest csr =
+        CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
+    stub.execRegionServerService(
+      controller,
+      csr,
+      new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>()
{
+
+        @Override
+        public void run(CoprocessorServiceResponse resp) {
+          if (controller.failed()) {
+            future.completeExceptionally(controller.getFailed());
+          } else {
+            try {
+              future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
+            } catch (IOException e) {
+              future.completeExceptionally(e);
+            }
+          }
+        }
+      });
+    return future;
+  }
+
+  @Override
+  public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+      Message responsePrototype, RpcCallback<Message> done) {
+    callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
+        .whenComplete(((r, e) -> {
+          if (e != null) {
+            ((ClientCoprocessorRpcController) controller).setFailed(e);
+          }
+          done.run(r);
+        }));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/06a0bfc3/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
new file mode 100644
index 0000000..16fb03c
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorEndpoint.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
+import org.apache.hadoop.hbase.coprocessor.TestRegionServerCoprocessorEndpoint.DummyRegionServerEndpoint;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+@RunWith(Parameterized.class)
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncCoprocessorEndpoint extends TestAsyncAdminBase {
+
+  private static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt");
+  private static final String DUMMY_VALUE = "val";
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+      ProtobufCoprocessorService.class.getName());
+    TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+      DummyRegionServerEndpoint.class.getName());
+    TEST_UTIL.startMiniCluster(2);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+  }
+
+  @Test
+  public void testMasterCoprocessorService() throws Exception {
+    TestProtos.EchoRequestProto request =
+        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+    TestProtos.EchoResponseProto response =
+        admin
+            .<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EchoResponseProto>
coprocessorService(
+              TestRpcServiceProtos.TestProtobufRpcProto::newStub,
+              (s, c, done) -> s.echo(c, request, done)).get();
+    assertEquals("hello", response.getMessage());
+  }
+
+  @Test
+  public void testMasterCoprocessorError() throws Exception {
+    TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.getDefaultInstance();
+    try {
+      admin
+          .<TestRpcServiceProtos.TestProtobufRpcProto.Stub, TestProtos.EmptyResponseProto>
coprocessorService(
+            TestRpcServiceProtos.TestProtobufRpcProto::newStub,
+            (s, c, done) -> s.error(c, emptyRequest, done)).get();
+      fail("Should have thrown an exception");
+    } catch (Exception e) {
+    }
+  }
+
+  @Test
+  public void testRegionServerCoprocessorService() throws Exception {
+    final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+    DummyRegionServerEndpointProtos.DummyRequest request =
+        DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
+    DummyRegionServerEndpointProtos.DummyResponse response =
+        admin
+            .<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse>
coprocessorService(
+              DummyRegionServerEndpointProtos.DummyService::newStub,
+              (s, c, done) -> s.dummyCall(c, request, done), serverName).get();
+    assertEquals(DUMMY_VALUE, response.getValue());
+  }
+
+  @Test
+  public void testRegionServerCoprocessorServiceError() throws Exception {
+    final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+    DummyRegionServerEndpointProtos.DummyRequest request =
+        DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance();
+    try {
+      admin
+          .<DummyRegionServerEndpointProtos.DummyService.Stub, DummyRegionServerEndpointProtos.DummyResponse>
coprocessorService(
+            DummyRegionServerEndpointProtos.DummyService::newStub,
+            (s, c, done) -> s.dummyThrow(c, request, done), serverName).get();
+      fail("Should have thrown an exception");
+    } catch (Exception e) {
+      assertTrue(e.getCause() instanceof RetriesExhaustedException);
+      assertTrue(e.getCause().getMessage().contains(WHAT_TO_THROW.getClass().getName().trim()));
+    }
+  }
+
+  static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService
{
+
+    public DummyRegionServerEndpoint() {
+    }
+
+    @Override
+    public Service getService() {
+      return this;
+    }
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+    }
+
+    @Override
+    public void dummyCall(RpcController controller, DummyRequest request,
+        RpcCallback<DummyResponse> callback) {
+      callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
+    }
+
+    @Override
+    public void dummyThrow(RpcController controller,
+        DummyRequest request,
+        RpcCallback<DummyResponse> done) {
+      CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW);
+    }
+  }
+}


Mime
View raw message