Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2CC52200C2C for ; Tue, 24 Jan 2017 00:01:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2B865160B65; Mon, 23 Jan 2017 23:01:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5F9ED160B49 for ; Tue, 24 Jan 2017 00:01:39 +0100 (CET) Received: (qmail 12617 invoked by uid 500); 23 Jan 2017 23:01:37 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 10164 invoked by uid 99); 23 Jan 2017 23:01:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jan 2017 23:01:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5459FF351B; Mon, 23 Jan 2017 23:01:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@hbase.apache.org Date: Mon, 23 Jan 2017 23:01:49 -0000 Message-Id: <6ecd30248d384d9993c41376f1664f88@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/50] [abbrv] hbase git commit: HBASE-17396 Add first async admin impl and implement balance methods archived-at: Mon, 23 Jan 2017 23:01:41 -0000 HBASE-17396 Add first async admin impl and implement balance methods Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb9ce2ce Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb9ce2ce Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb9ce2ce Branch: refs/heads/HBASE-16961 Commit: cb9ce2ceafb5467522b1b380956446e40b8250d5 Parents: 8f1d0a2 Author: Guanghao Zhang Authored: Thu Jan 19 10:15:12 2017 +0800 Committer: Guanghao Zhang Committed: Thu Jan 19 10:15:12 2017 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/AsyncAdmin.java | 64 +++++++ .../hadoop/hbase/client/AsyncConnection.java | 9 + .../hbase/client/AsyncConnectionImpl.java | 106 ++++++++++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 144 ++++++++++++++++ .../AsyncMasterRequestRpcRetryingCaller.java | 73 ++++++++ .../hbase/client/AsyncRpcRetryingCaller.java | 151 +++++++++++++++++ .../client/AsyncRpcRetryingCallerFactory.java | 55 ++++++ .../AsyncSingleRequestRpcRetryingCaller.java | 169 ++++--------------- .../hadoop/hbase/client/TestAsyncAdmin.java | 87 ++++++++++ 9 files changed, 720 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java new file mode 100644 index 0000000..fadeebe --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The asynchronous administrative API for HBase. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncAdmin { + + /** + * Turn the load balancer on or off. + * @param on + * @return Previous balancer value wrapped by a {@link CompletableFuture}. + */ + CompletableFuture setBalancerRunning(final boolean on) throws IOException; + + /** + * Invoke the balancer. Will run the balancer and if regions to move, it will go ahead and do the + * reassignments. Can NOT run for various reasons. Check logs. + * @return True if balancer ran, false otherwise. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture balancer() throws IOException; + + /** + * Invoke the balancer. Will run the balancer and if regions to move, it will go ahead and do the + * reassignments. If there is region in transition, force parameter of true would still run + * balancer. Can *not* run for other reasons. Check logs. + * @param force whether we should force balance even if there is region in transition. + * @return True if balancer ran, false otherwise. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture balancer(boolean force) throws IOException; + + /** + * Query the current state of the balancer. + * @return true if the balancer is enabled, false otherwise. + * The return value will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture isBalancerEnabled() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index 9f114ac..dbe32ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -96,4 +96,13 @@ public interface AsyncConnection extends Closeable { * @param pool the thread pool to use for executing callback */ AsyncTableBuilder getTableBuilder(TableName tableName, ExecutorService pool); + + /** + * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned AsyncAdmin + * is not guaranteed to be thread-safe. A new instance should be created for each using thread. + * This is a lightweight operation. Pooling or caching of the returned AsyncAdmin is not + * recommended. + * @return an AsyncAdmin instance for cluster administration + */ + AsyncAdmin getAdmin(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index c58500a..bc6a3b2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -28,23 +28,32 @@ import io.netty.util.HashedWheelTimer; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.Threads; @@ -88,6 +97,11 @@ class AsyncConnectionImpl implements AsyncConnection { private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); + private final AtomicReference masterStub = new AtomicReference<>(); + + private final AtomicReference> masterStubMakeFuture = + new AtomicReference<>(); + public AsyncConnectionImpl(Configuration conf, User user) { this.conf = conf; this.user = user; @@ -149,6 +163,93 @@ class AsyncConnectionImpl implements AsyncConnection { () -> createRegionServerStub(serverName)); } + private MasterService.Interface createMasterStub(ServerName serverName) throws IOException { + return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); + } + + private void makeMasterStub(CompletableFuture future) { + registry.getMasterAddress().whenComplete( + (sn, error) -> { + if (sn == null) { + String msg = "ZooKeeper available but no active master location found"; + LOG.info(msg); + this.masterStubMakeFuture.getAndSet(null).completeExceptionally( + new MasterNotRunningException(msg)); + return; + } + try { + MasterService.Interface stub = createMasterStub(sn); + HBaseRpcController controller = getRpcController(); + stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(), + new RpcCallback() { + @Override + public void run(IsMasterRunningResponse resp) { + if (controller.failed() || resp == null + || (resp != null && !resp.getIsMasterRunning())) { + masterStubMakeFuture.getAndSet(null).completeExceptionally( + new MasterNotRunningException("Master connection is not running anymore")); + } else { + masterStub.set(stub); + masterStubMakeFuture.set(null); + future.complete(stub); + } + } + }); + } catch (IOException e) { + this.masterStubMakeFuture.getAndSet(null).completeExceptionally( + new IOException("Failed to create async master stub", e)); + } + }); + } + + CompletableFuture getMasterStub() { + MasterService.Interface masterStub = this.masterStub.get(); + + if (masterStub == null) { + for (;;) { + if (this.masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) { + CompletableFuture future = this.masterStubMakeFuture.get(); + makeMasterStub(future); + } else { + CompletableFuture future = this.masterStubMakeFuture.get(); + if (future != null) { + return future; + } + } + } + } + + for (;;) { + if (masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) { + CompletableFuture future = masterStubMakeFuture.get(); + HBaseRpcController controller = getRpcController(); + masterStub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(), + new RpcCallback() { + @Override + public void run(IsMasterRunningResponse resp) { + if (controller.failed() || resp == null + || (resp != null && !resp.getIsMasterRunning())) { + makeMasterStub(future); + } else { + future.complete(masterStub); + } + } + }); + } else { + CompletableFuture future = masterStubMakeFuture.get(); + if (future != null) { + return future; + } + } + } + } + + private HBaseRpcController getRpcController() { + HBaseRpcController controller = this.rpcControllerFactory.newController(); + controller.setCallTimeout((int) TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); + return controller; + } + @Override public AsyncTableBuilder getRawTableBuilder(TableName tableName) { return new AsyncTableBuilderBase(tableName, connConf) { @@ -171,4 +272,9 @@ class AsyncConnectionImpl implements AsyncConnection { } }; } + + @Override + public AsyncAdmin getAdmin() { + return new AsyncHBaseAdmin(this); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java new file mode 100644 index 0000000..1dd92e5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; + +/** + * The implementation of AsyncAdmin. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class AsyncHBaseAdmin implements AsyncAdmin { + + private final AsyncConnectionImpl connection; + + private final long rpcTimeoutNs; + + private final long operationTimeoutNs; + + private final long pauseNs; + + private final int maxAttempts; + + private final int startLogErrorsCnt; + + AsyncHBaseAdmin(AsyncConnectionImpl connection) { + this.connection = connection; + this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs(); + this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs(); + this.pauseNs = connection.connConf.getPauseNs(); + this.maxAttempts = connection.connConf.getMaxRetries(); + this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt(); + } + + private MasterRequestCallerBuilder newCaller() { + return this.connection.callerFactory. masterRequest() + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt); + } + + @FunctionalInterface + private interface RpcCall { + void call(MasterService.Interface stub, HBaseRpcController controller, REQ req, + RpcCallback done); + } + + @FunctionalInterface + private interface Converter { + D convert(S src) throws IOException; + } + + private CompletableFuture call(HBaseRpcController controller, + MasterService.Interface stub, PREQ preq, RpcCall rpcCall, + Converter respConverter) { + CompletableFuture future = new CompletableFuture<>(); + rpcCall.call(stub, controller, preq, new RpcCallback() { + + @Override + public void run(PRESP resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(respConverter.convert(resp)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + return future; + } + + @Override + public CompletableFuture setBalancerRunning(final boolean on) throws IOException { + return this + . newCaller() + .action( + (controller, stub) -> this + . call(controller, + stub, RequestConverter.buildSetBalancerRunningRequest(on, true), + (s, c, req, done) -> s.setBalancerRunning(c, req, done), + (resp) -> resp.getPrevBalanceValue())).call(); + } + + @Override + public CompletableFuture balancer() throws IOException { + return balancer(false); + } + + @Override + public CompletableFuture balancer(boolean force) throws IOException { + return this + . newCaller() + .action( + (controller, stub) -> this. call(controller, + stub, RequestConverter.buildBalanceRequest(force), + (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call(); + } + + @Override + public CompletableFuture isBalancerEnabled() throws IOException { + return this + . newCaller() + .action( + (controller, stub) -> this. call( + controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), + (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())) + .call(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java new file mode 100644 index 0000000..e2a3fee --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.HashedWheelTimer; + +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; + +/** + * Retry caller for a request call to master. + */ +@InterfaceAudience.Private +public class AsyncMasterRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { + + @FunctionalInterface + public interface Callable { + CompletableFuture call(HBaseRpcController controller, MasterService.Interface stub); + } + + private final Callable callable; + + public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + Callable callable, long pauseNs, int maxRetries, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); + this.callable = callable; + } + + @Override + protected void doCall() { + conn.getMasterStub().whenComplete((stub, error) -> { + if (error != null) { + onError(error, () -> "Get async master stub failed", err -> { + }); + return; + } + resetCallTimeout(); + callable.call(controller, stub).whenComplete((result, error2) -> { + if (error2 != null) { + onError(error2, () -> "Call to master failed", err -> { + }); + return; + } + future.complete(result); + }); + }); + } + + public CompletableFuture call() { + doCall(); + return future; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java new file mode 100644 index 0000000..d449db1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -0,0 +1,151 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; +import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; +import io.netty.util.HashedWheelTimer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +@InterfaceAudience.Private +public abstract class AsyncRpcRetryingCaller { + + private static final Log LOG = LogFactory.getLog(AsyncRpcRetryingCaller.class); + + private final HashedWheelTimer retryTimer; + + private final long startNs; + + private final long pauseNs; + + private int tries = 1; + + private final int maxAttempts; + + private final int startLogErrorsCnt; + + private final List exceptions; + + private final long rpcTimeoutNs; + + protected final long operationTimeoutNs; + + protected final AsyncConnectionImpl conn; + + protected final CompletableFuture future; + + protected final HBaseRpcController controller; + + public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + long pauseNs, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { + this.retryTimer = retryTimer; + this.conn = conn; + this.pauseNs = pauseNs; + this.maxAttempts = maxAttempts; + this.operationTimeoutNs = operationTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; + this.future = new CompletableFuture<>(); + this.controller = conn.rpcControllerFactory.newController(); + this.exceptions = new ArrayList<>(); + this.startNs = System.nanoTime(); + } + + private long elapsedMs() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + } + + protected long remainingTimeNs() { + return operationTimeoutNs - (System.nanoTime() - startNs); + } + + protected void completeExceptionally() { + future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); + } + + protected void resetCallTimeout() { + long callTimeoutNs; + if (operationTimeoutNs > 0) { + callTimeoutNs = remainingTimeNs(); + if (callTimeoutNs <= 0) { + completeExceptionally(); + return; + } + callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs); + } else { + callTimeoutNs = rpcTimeoutNs; + } + resetController(controller, callTimeoutNs); + } + + protected void onError(Throwable error, Supplier errMsg, + Consumer updateCachedLocation) { + error = translateException(error); + if (tries > startLogErrorsCnt) { + LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts + + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + + " ms, time elapsed = " + elapsedMs() + " ms", error); + } + RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext( + error, EnvironmentEdgeManager.currentTime(), ""); + exceptions.add(qt); + if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + completeExceptionally(); + return; + } + long delayNs; + if (operationTimeoutNs > 0) { + long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; + if (maxDelayNs <= 0) { + completeExceptionally(); + return; + } + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + } else { + delayNs = getPauseTime(pauseNs, tries - 1); + } + updateCachedLocation.accept(error); + tries++; + retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); + } + + protected abstract void doCall(); + + CompletableFuture call() { + doCall(); + return future; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 76b6a33..5df66cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -369,4 +369,59 @@ class AsyncRpcRetryingCallerFactory { public BatchCallerBuilder batch() { return new BatchCallerBuilder(); } + + public class MasterRequestCallerBuilder extends BuilderBase { + private AsyncMasterRequestRpcRetryingCaller.Callable callable; + + private long operationTimeoutNs = -1L; + + private long rpcTimeoutNs = -1L; + + public MasterRequestCallerBuilder action(AsyncMasterRequestRpcRetryingCaller.Callable callable) { + this.callable = callable; + return this; + } + + public MasterRequestCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(operationTimeout); + return this; + } + + public MasterRequestCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public MasterRequestCallerBuilder pause(long pause, TimeUnit unit) { + this.pauseNs = unit.toNanos(pause); + return this; + } + + public MasterRequestCallerBuilder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public MasterRequestCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; + return this; + } + + public AsyncMasterRequestRpcRetryingCaller build() { + return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, checkNotNull(callable, + "action is null"), pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); + } + + /** + * Shortcut for {@code build().call()} + */ + public CompletableFuture call() { + return build().call(); + } + } + + public MasterRequestCallerBuilder masterRequest() { + return new MasterRequestCallerBuilder<>(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 4ce6a18..e1c06d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -17,39 +17,23 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; -import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; -import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; -import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; - import io.netty.util.HashedWheelTimer; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Supplier; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Retry caller for a single request, such as get, put, delete, etc. */ @InterfaceAudience.Private -class AsyncSingleRequestRpcRetryingCaller { - - private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class); +class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { @FunctionalInterface public interface Callable { @@ -57,10 +41,6 @@ class AsyncSingleRequestRpcRetryingCaller { ClientService.Interface stub); } - private final HashedWheelTimer retryTimer; - - private final AsyncConnectionImpl conn; - private final TableName tableName; private final byte[] row; @@ -69,131 +49,45 @@ class AsyncSingleRequestRpcRetryingCaller { private final Callable callable; - private final long pauseNs; - - private final int maxAttempts; - - private final long operationTimeoutNs; - - private final long rpcTimeoutNs; - - private final int startLogErrorsCnt; - - private final CompletableFuture future; - - private final HBaseRpcController controller; - - private final List exceptions; - - private final long startNs; - public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, RegionLocateType locateType, Callable callable, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - this.retryTimer = retryTimer; - this.conn = conn; + super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); this.tableName = tableName; this.row = row; this.locateType = locateType; this.callable = callable; - this.pauseNs = pauseNs; - this.maxAttempts = maxAttempts; - this.operationTimeoutNs = operationTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; - this.startLogErrorsCnt = startLogErrorsCnt; - this.future = new CompletableFuture<>(); - this.controller = conn.rpcControllerFactory.newController(); - this.exceptions = new ArrayList<>(); - this.startNs = System.nanoTime(); - } - - private int tries = 1; - - private long elapsedMs() { - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); - } - - private long remainingTimeNs() { - return operationTimeoutNs - (System.nanoTime() - startNs); - } - - private void completeExceptionally() { - future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); - } - - private void onError(Throwable error, Supplier errMsg, - Consumer updateCachedLocation) { - error = translateException(error); - if (tries > startLogErrorsCnt) { - LOG.warn(errMsg.get(), error); - } - RetriesExhaustedException.ThrowableWithExtraContext qt = - new RetriesExhaustedException.ThrowableWithExtraContext(error, - EnvironmentEdgeManager.currentTime(), ""); - exceptions.add(qt); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { - completeExceptionally(); - return; - } - long delayNs; - if (operationTimeoutNs > 0) { - long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; - if (maxDelayNs <= 0) { - completeExceptionally(); - return; - } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); - } else { - delayNs = getPauseTime(pauseNs, tries - 1); - } - updateCachedLocation.accept(error); - tries++; - retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS); } private void call(HRegionLocation loc) { - long callTimeoutNs; - if (operationTimeoutNs > 0) { - callTimeoutNs = remainingTimeNs(); - if (callTimeoutNs <= 0) { - completeExceptionally(); - return; - } - callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs); - } else { - callTimeoutNs = rpcTimeoutNs; - } ClientService.Interface stub; try { stub = conn.getRegionServerStub(loc.getServerName()); } catch (IOException e) { onError(e, () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) - + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName - + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " - + elapsedMs() + " ms", + + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed", err -> conn.getLocator().updateCachedLocation(loc, err)); return; } - resetController(controller, callTimeoutNs); - callable.call(controller, loc, stub).whenComplete((result, error) -> { - if (error != null) { - onError(error, - () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " - + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries = " - + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " - + elapsedMs() + " ms", - err -> conn.getLocator().updateCachedLocation(loc, err)); - return; - } - future.complete(result); - }); + resetCallTimeout(); + callable.call(controller, loc, stub).whenComplete( + (result, error) -> { + if (error != null) { + onError(error, + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed", + err -> conn.getLocator().updateCachedLocation(loc, err)); + return; + } + future.complete(result); + }); } - private void locateThenCall() { + @Override + protected void doCall() { long locateTimeoutNs; if (operationTimeoutNs > 0) { locateTimeoutNs = remainingTimeNs(); @@ -204,24 +98,23 @@ class AsyncSingleRequestRpcRetryingCaller { } else { locateTimeoutNs = -1L; } - conn.getLocator().getRegionLocation(tableName, row, locateType, locateTimeoutNs) - .whenComplete((loc, error) -> { - if (error != null) { - onError(error, - () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName - + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " - + elapsedMs() + " ms", - err -> { + conn.getLocator() + .getRegionLocation(tableName, row, locateType, locateTimeoutNs) + .whenComplete( + (loc, error) -> { + if (error != null) { + onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + + " failed", err -> { }); - return; - } - call(loc); - }); + return; + } + call(loc); + }); } + @Override public CompletableFuture call() { - locateThenCall(); + doCall(); return future; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/cb9ce2ce/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java new file mode 100644 index 0000000..9beae1f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Class to test AsyncAdmin. + */ +@Category({LargeTests.class, ClientTests.class}) +public class TestAsyncAdmin { + + private static final Log LOG = LogFactory.getLog(TestAdmin1.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static AsyncConnection ASYNC_CONN; + private AsyncAdmin admin; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 3); + TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 1000); + TEST_UTIL.startMiniCluster(1); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.closeQuietly(ASYNC_CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.admin = ASYNC_CONN.getAdmin(); + } + + @Test(timeout = 30000) + public void testBalancer() throws Exception { + boolean initialState = admin.isBalancerEnabled().get(); + + // Start the balancer, wait for it. + boolean prevState = admin.setBalancerRunning(!initialState).get(); + + // The previous state should be the original state we observed + assertEquals(initialState, prevState); + + // Current state should be opposite of the original + assertEquals(!initialState, admin.isBalancerEnabled().get()); + + // Reset it back to what it was + prevState = admin.setBalancerRunning(initialState).get(); + + // The previous state should be the opposite of the initial state + assertEquals(!initialState, prevState); + // Current state should be the original state again + assertEquals(initialState, admin.isBalancerEnabled().get()); + } +}