hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [16/50] [abbrv] hbase git commit: HBASE-17396 Add first async admin impl and implement balance methods
Date Mon, 23 Jan 2017 23:01:49 GMT
HBASE-17396 Add first async admin impl and implement balance methods


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb9ce2ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb9ce2ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb9ce2ce

Branch: refs/heads/HBASE-16961
Commit: cb9ce2ceafb5467522b1b380956446e40b8250d5
Parents: 8f1d0a2
Author: Guanghao Zhang <zghao@apache.org>
Authored: Thu Jan 19 10:15:12 2017 +0800
Committer: Guanghao Zhang <zghao@apache.org>
Committed: Thu Jan 19 10:15:12 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |  64 +++++++
 .../hadoop/hbase/client/AsyncConnection.java    |   9 +
 .../hbase/client/AsyncConnectionImpl.java       | 106 ++++++++++++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    | 144 ++++++++++++++++
 .../AsyncMasterRequestRpcRetryingCaller.java    |  73 ++++++++
 .../hbase/client/AsyncRpcRetryingCaller.java    | 151 +++++++++++++++++
 .../client/AsyncRpcRetryingCallerFactory.java   |  55 ++++++
 .../AsyncSingleRequestRpcRetryingCaller.java    | 169 ++++---------------
 .../hadoop/hbase/client/TestAsyncAdmin.java     |  87 ++++++++++
 9 files changed, 720 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
new file mode 100644
index 0000000..fadeebe
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ *  The asynchronous administrative API for HBase.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AsyncAdmin {
+
+  /**
+   * Turn the load balancer on or off.
+   * @param on
+   * @return Previous balancer value wrapped by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> setBalancerRunning(final boolean on) throws IOException;
+
+  /**
+   * Invoke the balancer. Will run the balancer and if regions to move, it will go ahead
and do the
+   * reassignments. Can NOT run for various reasons. Check logs.
+   * @return True if balancer ran, false otherwise. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> balancer() throws IOException;
+
+  /**
+   * Invoke the balancer. Will run the balancer and if regions to move, it will go ahead
and do the
+   * reassignments. If there is region in transition, force parameter of true would still
run
+   * balancer. Can *not* run for other reasons. Check logs.
+   * @param force whether we should force balance even if there is region in transition.
+   * @return True if balancer ran, false otherwise. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> balancer(boolean force) throws IOException;
+
+  /**
+   * Query the current state of the balancer.
+   * @return true if the balancer is enabled, false otherwise.
+   *         The return value will be wrapped by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> isBalancerEnabled() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 9f114ac..dbe32ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -96,4 +96,13 @@ public interface AsyncConnection extends Closeable {
    * @param pool the thread pool to use for executing callback
    */
   AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService
pool);
+
+  /**
+   * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned AsyncAdmin
+   * is not guaranteed to be thread-safe. A new instance should be created for each using
thread.
+   * This is a lightweight operation. Pooling or caching of the returned AsyncAdmin is not
+   * recommended.
+   * @return an AsyncAdmin instance for cluster administration
+   */
+  AsyncAdmin getAdmin();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index c58500a..bc6a3b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -28,23 +28,32 @@ import io.netty.util.HashedWheelTimer;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
@@ -88,6 +97,11 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
 
+  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
+
+  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture
=
+      new AtomicReference<>();
+
   public AsyncConnectionImpl(Configuration conf, User user) {
     this.conf = conf;
     this.user = user;
@@ -149,6 +163,93 @@ class AsyncConnectionImpl implements AsyncConnection {
       () -> createRegionServerStub(serverName));
   }
 
+  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException
{
+    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
+  }
+
+  private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
+    registry.getMasterAddress().whenComplete(
+      (sn, error) -> {
+        if (sn == null) {
+          String msg = "ZooKeeper available but no active master location found";
+          LOG.info(msg);
+          this.masterStubMakeFuture.getAndSet(null).completeExceptionally(
+            new MasterNotRunningException(msg));
+          return;
+        }
+        try {
+          MasterService.Interface stub = createMasterStub(sn);
+          HBaseRpcController controller = getRpcController();
+          stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
+            new RpcCallback<IsMasterRunningResponse>() {
+              @Override
+              public void run(IsMasterRunningResponse resp) {
+                if (controller.failed() || resp == null
+                    || (resp != null && !resp.getIsMasterRunning())) {
+                  masterStubMakeFuture.getAndSet(null).completeExceptionally(
+                    new MasterNotRunningException("Master connection is not running anymore"));
+                } else {
+                  masterStub.set(stub);
+                  masterStubMakeFuture.set(null);
+                  future.complete(stub);
+                }
+              }
+            });
+        } catch (IOException e) {
+          this.masterStubMakeFuture.getAndSet(null).completeExceptionally(
+            new IOException("Failed to create async master stub", e));
+        }
+      });
+  }
+
+  CompletableFuture<MasterService.Interface> getMasterStub() {
+    MasterService.Interface masterStub = this.masterStub.get();
+
+    if (masterStub == null) {
+      for (;;) {
+        if (this.masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>()))
{
+          CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
+          makeMasterStub(future);
+        } else {
+          CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
+          if (future != null) {
+            return future;
+          }
+        }
+      }
+    }
+
+    for (;;) {
+      if (masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
+        CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
+        HBaseRpcController controller = getRpcController();
+        masterStub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
+          new RpcCallback<IsMasterRunningResponse>() {
+            @Override
+            public void run(IsMasterRunningResponse resp) {
+              if (controller.failed() || resp == null
+                  || (resp != null && !resp.getIsMasterRunning())) {
+                makeMasterStub(future);
+              } else {
+                future.complete(masterStub);
+              }
+            }
+          });
+      } else {
+        CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
+        if (future != null) {
+          return future;
+        }
+      }
+    }
+  }
+
+  private HBaseRpcController getRpcController() {
+    HBaseRpcController controller = this.rpcControllerFactory.newController();
+    controller.setCallTimeout((int) TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
+    return controller;
+  }
+
   @Override
   public AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName) {
     return new AsyncTableBuilderBase<RawAsyncTable>(tableName, connConf) {
@@ -171,4 +272,9 @@ class AsyncConnectionImpl implements AsyncConnection {
       }
     };
   }
+
+  @Override
+  public AsyncAdmin getAdmin() {
+    return new AsyncHBaseAdmin(this);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
new file mode 100644
index 0000000..1dd92e5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+
+/**
+ * The implementation of AsyncAdmin.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class AsyncHBaseAdmin implements AsyncAdmin {
+
+  private final AsyncConnectionImpl connection;
+
+  private final long rpcTimeoutNs;
+
+  private final long operationTimeoutNs;
+
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
+  private final int startLogErrorsCnt;
+
+  AsyncHBaseAdmin(AsyncConnectionImpl connection) {
+    this.connection = connection;
+    this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
+    this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
+    this.pauseNs = connection.connConf.getPauseNs();
+    this.maxAttempts = connection.connConf.getMaxRetries();
+    this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
+  }
+
+  private <T> MasterRequestCallerBuilder<T> newCaller() {
+    return this.connection.callerFactory.<T> masterRequest()
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+        .startLogErrorsCnt(startLogErrorsCnt);
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP, REQ> {
+    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
+        RpcCallback<RESP> done);
+  }
+
+  @FunctionalInterface
+  private interface Converter<D, S> {
+    D convert(S src) throws IOException;
+  }
+
+  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController
controller,
+      MasterService.Interface stub, PREQ preq, RpcCall<PRESP, PREQ> rpcCall,
+      Converter<RESP, PRESP> respConverter) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
+
+      @Override
+      public void run(PRESP resp) {
+        if (controller.failed()) {
+          future.completeExceptionally(controller.getFailed());
+        } else {
+          try {
+            future.complete(respConverter.convert(resp));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Boolean> setBalancerRunning(final boolean on) throws IOException
{
+    return this
+        .<Boolean> newCaller()
+        .action(
+          (controller, stub) -> this
+              .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
+                stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
+                (s, c, req, done) -> s.setBalancerRunning(c, req, done),
+                (resp) -> resp.getPrevBalanceValue())).call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> balancer() throws IOException {
+    return balancer(false);
+  }
+
+  @Override
+  public CompletableFuture<Boolean> balancer(boolean force) throws IOException {
+    return this
+        .<Boolean> newCaller()
+        .action(
+          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean>
call(controller,
+            stub, RequestConverter.buildBalanceRequest(force),
+            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isBalancerEnabled() throws IOException {
+    return this
+        .<Boolean> newCaller()
+        .action(
+          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
Boolean> call(
+            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
+            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
+        .call();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
new file mode 100644
index 0000000..e2a3fee
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+
+/**
+ * Retry caller for a request call to master.
+ */
+@InterfaceAudience.Private
+public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T>
{
+
+  @FunctionalInterface
+  public interface Callable<T> {
+    CompletableFuture<T> call(HBaseRpcController controller, MasterService.Interface
stub);
+  }
+
+  private final Callable<T> callable;
+
+  public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl
conn,
+      Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt) {
+    super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
+        startLogErrorsCnt);
+    this.callable = callable;
+  }
+
+  @Override
+  protected void doCall() {
+    conn.getMasterStub().whenComplete((stub, error) -> {
+      if (error != null) {
+        onError(error, () -> "Get async master stub failed", err -> {
+        });
+        return;
+      }
+      resetCallTimeout();
+      callable.call(controller, stub).whenComplete((result, error2) -> {
+        if (error2 != null) {
+          onError(error2, () -> "Call to master failed", err -> {
+          });
+          return;
+        }
+        future.complete(result);
+      });
+    });
+  }
+
+  public CompletableFuture<T> call() {
+    doCall();
+    return future;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
new file mode 100644
index 0000000..d449db1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
+import io.netty.util.HashedWheelTimer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+@InterfaceAudience.Private
+public abstract class AsyncRpcRetryingCaller<T> {
+
+  private static final Log LOG = LogFactory.getLog(AsyncRpcRetryingCaller.class);
+
+  private final HashedWheelTimer retryTimer;
+
+  private final long startNs;
+
+  private final long pauseNs;
+
+  private int tries = 1;
+
+  private final int maxAttempts;
+
+  private final int startLogErrorsCnt;
+
+  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
+
+  private final long rpcTimeoutNs;
+
+  protected final long operationTimeoutNs;
+
+  protected final AsyncConnectionImpl conn;
+
+  protected final CompletableFuture<T> future;
+
+  protected final HBaseRpcController controller;
+
+  public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+      long pauseNs, int maxAttempts, long operationTimeoutNs,
+      long rpcTimeoutNs, int startLogErrorsCnt) {
+    this.retryTimer = retryTimer;
+    this.conn = conn;
+    this.pauseNs = pauseNs;
+    this.maxAttempts = maxAttempts;
+    this.operationTimeoutNs = operationTimeoutNs;
+    this.rpcTimeoutNs = rpcTimeoutNs;
+    this.startLogErrorsCnt = startLogErrorsCnt;
+    this.future = new CompletableFuture<>();
+    this.controller = conn.rpcControllerFactory.newController();
+    this.exceptions = new ArrayList<>();
+    this.startNs = System.nanoTime();
+  }
+
+  private long elapsedMs() {
+    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+  }
+
+  protected long remainingTimeNs() {
+    return operationTimeoutNs - (System.nanoTime() - startNs);
+  }
+
+  protected void completeExceptionally() {
+    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
+  }
+
+  protected void resetCallTimeout() {
+    long callTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      callTimeoutNs = remainingTimeNs();
+      if (callTimeoutNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+      callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
+    } else {
+      callTimeoutNs = rpcTimeoutNs;
+    }
+    resetController(controller, callTimeoutNs);
+  }
+
+  protected void onError(Throwable error, Supplier<String> errMsg,
+      Consumer<Throwable> updateCachedLocation) {
+    error = translateException(error);
+    if (tries > startLogErrorsCnt) {
+      LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts
+          + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs)
+          + " ms, time elapsed = " + elapsedMs() + " ms", error);
+    }
+    RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(
+        error, EnvironmentEdgeManager.currentTime(), "");
+    exceptions.add(qt);
+    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+      completeExceptionally();
+      return;
+    }
+    long delayNs;
+    if (operationTimeoutNs > 0) {
+      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
+      if (maxDelayNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+    } else {
+      delayNs = getPauseTime(pauseNs, tries - 1);
+    }
+    updateCachedLocation.accept(error);
+    tries++;
+    retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
+  }
+
+  protected abstract void doCall();
+
+  CompletableFuture<T> call() {
+    doCall();
+    return future;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 76b6a33..5df66cc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -369,4 +369,59 @@ class AsyncRpcRetryingCallerFactory {
   public BatchCallerBuilder batch() {
     return new BatchCallerBuilder();
   }
+
+  public class MasterRequestCallerBuilder<T> extends BuilderBase {
+    private AsyncMasterRequestRpcRetryingCaller.Callable<T> callable;
+
+    private long operationTimeoutNs = -1L;
+
+    private long rpcTimeoutNs = -1L;
+
+    public MasterRequestCallerBuilder<T> action(AsyncMasterRequestRpcRetryingCaller.Callable<T>
callable) {
+      this.callable = callable;
+      return this;
+    }
+
+    public MasterRequestCallerBuilder<T> operationTimeout(long operationTimeout, TimeUnit
unit) {
+      this.operationTimeoutNs = unit.toNanos(operationTimeout);
+      return this;
+    }
+
+    public MasterRequestCallerBuilder<T> rpcTimeout(long rpcTimeout, TimeUnit unit)
{
+      this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+      return this;
+    }
+
+    public MasterRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
+      this.pauseNs = unit.toNanos(pause);
+      return this;
+    }
+
+    public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
+      this.maxAttempts = maxAttempts;
+      return this;
+    }
+
+    public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
+      this.startLogErrorsCnt = startLogErrorsCnt;
+      return this;
+    }
+
+    public AsyncMasterRequestRpcRetryingCaller<T> build() {
+      return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, checkNotNull(callable,
+        "action is null"), pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+          startLogErrorsCnt);
+    }
+
+    /**
+     * Shortcut for {@code build().call()}
+     */
+    public CompletableFuture<T> call() {
+      return build().call();
+    }
+  }
+
+  public <T> MasterRequestCallerBuilder<T> masterRequest() {
+    return new MasterRequestCallerBuilder<>();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 4ce6a18..e1c06d7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -17,39 +17,23 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
-
 import io.netty.util.HashedWheelTimer;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * Retry caller for a single request, such as get, put, delete, etc.
  */
 @InterfaceAudience.Private
-class AsyncSingleRequestRpcRetryingCaller<T> {
-
-  private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
+class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T>
{
 
   @FunctionalInterface
   public interface Callable<T> {
@@ -57,10 +41,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
         ClientService.Interface stub);
   }
 
-  private final HashedWheelTimer retryTimer;
-
-  private final AsyncConnectionImpl conn;
-
   private final TableName tableName;
 
   private final byte[] row;
@@ -69,131 +49,45 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
 
   private final Callable<T> callable;
 
-  private final long pauseNs;
-
-  private final int maxAttempts;
-
-  private final long operationTimeoutNs;
-
-  private final long rpcTimeoutNs;
-
-  private final int startLogErrorsCnt;
-
-  private final CompletableFuture<T> future;
-
-  private final HBaseRpcController controller;
-
-  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
-
-  private final long startNs;
-
   public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl
conn,
       TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
       long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
       int startLogErrorsCnt) {
-    this.retryTimer = retryTimer;
-    this.conn = conn;
+    super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
+        startLogErrorsCnt);
     this.tableName = tableName;
     this.row = row;
     this.locateType = locateType;
     this.callable = callable;
-    this.pauseNs = pauseNs;
-    this.maxAttempts = maxAttempts;
-    this.operationTimeoutNs = operationTimeoutNs;
-    this.rpcTimeoutNs = rpcTimeoutNs;
-    this.startLogErrorsCnt = startLogErrorsCnt;
-    this.future = new CompletableFuture<>();
-    this.controller = conn.rpcControllerFactory.newController();
-    this.exceptions = new ArrayList<>();
-    this.startNs = System.nanoTime();
-  }
-
-  private int tries = 1;
-
-  private long elapsedMs() {
-    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
-  }
-
-  private long remainingTimeNs() {
-    return operationTimeoutNs - (System.nanoTime() - startNs);
-  }
-
-  private void completeExceptionally() {
-    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
-  }
-
-  private void onError(Throwable error, Supplier<String> errMsg,
-      Consumer<Throwable> updateCachedLocation) {
-    error = translateException(error);
-    if (tries > startLogErrorsCnt) {
-      LOG.warn(errMsg.get(), error);
-    }
-    RetriesExhaustedException.ThrowableWithExtraContext qt =
-        new RetriesExhaustedException.ThrowableWithExtraContext(error,
-            EnvironmentEdgeManager.currentTime(), "");
-    exceptions.add(qt);
-    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
-      completeExceptionally();
-      return;
-    }
-    long delayNs;
-    if (operationTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        completeExceptionally();
-        return;
-      }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
-    } else {
-      delayNs = getPauseTime(pauseNs, tries - 1);
-    }
-    updateCachedLocation.accept(error);
-    tries++;
-    retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS);
   }
 
   private void call(HRegionLocation loc) {
-    long callTimeoutNs;
-    if (operationTimeoutNs > 0) {
-      callTimeoutNs = remainingTimeNs();
-      if (callTimeoutNs <= 0) {
-        completeExceptionally();
-        return;
-      }
-      callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
-    } else {
-      callTimeoutNs = rpcTimeoutNs;
-    }
     ClientService.Interface stub;
     try {
       stub = conn.getRegionServerStub(loc.getServerName());
     } catch (IOException e) {
       onError(e,
         () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
-            + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName
-            + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout
= "
-            + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
-            + elapsedMs() + " ms",
+            + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed",
         err -> conn.getLocator().updateCachedLocation(loc, err));
       return;
     }
-    resetController(controller, callTimeoutNs);
-    callable.call(controller, loc, stub).whenComplete((result, error) -> {
-      if (error != null) {
-        onError(error,
-          () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
+ "' in "
-              + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries
= "
-              + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
-              + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed =
"
-              + elapsedMs() + " ms",
-          err -> conn.getLocator().updateCachedLocation(loc, err));
-        return;
-      }
-      future.complete(result);
-    });
+    resetCallTimeout();
+    callable.call(controller, loc, stub).whenComplete(
+      (result, error) -> {
+        if (error != null) {
+          onError(error,
+            () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
+ "' in "
+                + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed",
+            err -> conn.getLocator().updateCachedLocation(loc, err));
+          return;
+        }
+        future.complete(result);
+      });
   }
 
-  private void locateThenCall() {
+  @Override
+  protected void doCall() {
     long locateTimeoutNs;
     if (operationTimeoutNs > 0) {
       locateTimeoutNs = remainingTimeNs();
@@ -204,24 +98,23 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
     } else {
       locateTimeoutNs = -1L;
     }
-    conn.getLocator().getRegionLocation(tableName, row, locateType, locateTimeoutNs)
-        .whenComplete((loc, error) -> {
-          if (error != null) {
-            onError(error,
-              () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
-                  + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout
= "
-                  + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed
= "
-                  + elapsedMs() + " ms",
-              err -> {
+    conn.getLocator()
+        .getRegionLocation(tableName, row, locateType, locateTimeoutNs)
+        .whenComplete(
+          (loc, error) -> {
+            if (error != null) {
+              onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " +
tableName
+                  + " failed", err -> {
               });
-            return;
-          }
-          call(loc);
-        });
+              return;
+            }
+            call(loc);
+          });
   }
 
+  @Override
   public CompletableFuture<T> call() {
-    locateThenCall();
+    doCall();
     return future;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
new file mode 100644
index 0000000..9beae1f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Class to test AsyncAdmin.
+ */
+@Category({LargeTests.class, ClientTests.class})
+public class TestAsyncAdmin {
+
+  private static final Log LOG = LogFactory.getLog(TestAdmin1.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static AsyncConnection ASYNC_CONN;
+  private AsyncAdmin admin;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
+    TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 3);
+    TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 1000);
+    TEST_UTIL.startMiniCluster(1);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    IOUtils.closeQuietly(ASYNC_CONN);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    this.admin = ASYNC_CONN.getAdmin();
+  }
+
+  @Test(timeout = 30000)
+  public void testBalancer() throws Exception {
+    boolean initialState = admin.isBalancerEnabled().get();
+
+    // Start the balancer, wait for it.
+    boolean prevState = admin.setBalancerRunning(!initialState).get();
+
+    // The previous state should be the original state we observed
+    assertEquals(initialState, prevState);
+
+    // Current state should be opposite of the original
+    assertEquals(!initialState, admin.isBalancerEnabled().get());
+
+    // Reset it back to what it was
+    prevState = admin.setBalancerRunning(initialState).get();
+
+    // The previous state should be the opposite of the initial state
+    assertEquals(!initialState, prevState);
+    // Current state should be the original state again
+    assertEquals(initialState, admin.isBalancerEnabled().get());
+  }
+}


Mime
View raw message