Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6AFD5200D8E for ; Thu, 28 Dec 2017 01:54:10 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6950D160C3D; Thu, 28 Dec 2017 00:54:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 133FF160C3A for ; Thu, 28 Dec 2017 01:54:08 +0100 (CET) Received: (qmail 12437 invoked by uid 500); 28 Dec 2017 00:54:06 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 12249 invoked by uid 99); 28 Dec 2017 00:54:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Dec 2017 00:54:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7B910F17D9; Thu, 28 Dec 2017 00:54:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Thu, 28 Dec 2017 00:54:20 -0000 Message-Id: <75ae1f08196a4151980c957bff5cb8d9@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/26] hbase git commit: HBASE-19536 Client side changes for moving peer modification from zk watcher to procedure archived-at: Thu, 28 Dec 2017 00:54:10 -0000 HBASE-19536 Client side changes for moving peer modification from zk watcher to procedure Signed-off-by: zhangduo Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d77c7449 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d77c7449 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d77c7449 Branch: refs/heads/HBASE-19397 Commit: d77c7449aa34aa24cfe0333050899381b230dc0d Parents: 4567906 Author: Guanghao Zhang Authored: Tue Dec 19 15:50:57 2017 +0800 Committer: zhangduo Committed: Thu Dec 28 08:51:59 2017 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Admin.java | 87 ++++++++++- .../apache/hadoop/hbase/client/HBaseAdmin.java | 149 ++++++++++++++----- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 82 +++++----- 3 files changed, 238 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d77c7449/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index ff2722e..cf8e198 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2463,7 +2463,7 @@ public interface Admin extends Abortable, Closeable { /** * Add a new replication peer for replicating data to slave cluster. * @param peerId a short name that identifies the peer - * @param peerConfig configuration for the replication slave cluster + * @param peerConfig configuration for the replication peer * @throws IOException if a remote or network exception occurs */ default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) @@ -2474,7 +2474,7 @@ public interface Admin extends Abortable, Closeable { /** * Add a new replication peer for replicating data to slave cluster. * @param peerId a short name that identifies the peer - * @param peerConfig configuration for the replication slave cluster + * @param peerConfig configuration for the replication peer * @param enabled peer state, true if ENABLED and false if DISABLED * @throws IOException if a remote or network exception occurs */ @@ -2482,6 +2482,37 @@ public interface Admin extends Abortable, Closeable { throws IOException; /** + * Add a new replication peer but does not block and wait for it. + *

+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + default Future addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig) + throws IOException { + return addReplicationPeerAsync(peerId, peerConfig, true); + } + + /** + * Add a new replication peer but does not block and wait for it. + *

+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication peer + * @param enabled peer state, true if ENABLED and false if DISABLED + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig, + boolean enabled) throws IOException; + + /** * Remove a peer and stop the replication. * @param peerId a short name that identifies the peer * @throws IOException if a remote or network exception occurs @@ -2489,6 +2520,18 @@ public interface Admin extends Abortable, Closeable { void removeReplicationPeer(String peerId) throws IOException; /** + * Remove a replication peer but does not block and wait for it. + *

+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future removeReplicationPeerAsync(String peerId) throws IOException; + + /** * Restart the replication stream to the specified peer. * @param peerId a short name that identifies the peer * @throws IOException if a remote or network exception occurs @@ -2496,6 +2539,18 @@ public interface Admin extends Abortable, Closeable { void enableReplicationPeer(String peerId) throws IOException; /** + * Enable a replication peer but does not block and wait for it. + *

+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future enableReplicationPeerAsync(String peerId) throws IOException; + + /** * Stop the replication stream to the specified peer. * @param peerId a short name that identifies the peer * @throws IOException if a remote or network exception occurs @@ -2503,6 +2558,18 @@ public interface Admin extends Abortable, Closeable { void disableReplicationPeer(String peerId) throws IOException; /** + * Disable a replication peer but does not block and wait for it. + *

+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future disableReplicationPeerAsync(String peerId) throws IOException; + + /** * Returns the configured ReplicationPeerConfig for the specified peer. * @param peerId a short name that identifies the peer * @return ReplicationPeerConfig for the peer @@ -2513,13 +2580,27 @@ public interface Admin extends Abortable, Closeable { /** * Update the peerConfig for the specified peer. * @param peerId a short name that identifies the peer - * @param peerConfig new config for the peer + * @param peerConfig new config for the replication peer * @throws IOException if a remote or network exception occurs */ void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws IOException; /** + * Update the peerConfig for the specified peer but does not block and wait for it. + *

+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @param peerConfig new config for the replication peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future updateReplicationPeerConfigAsync(String peerId, ReplicationPeerConfig peerConfig) + throws IOException; + + /** * Append the replicable table column family config from the specified peer. * @param id a short that identifies the cluster * @param tableCfs A map from tableName to column family names http://git-wip-us.apache.org/repos/asf/hbase/blob/d77c7449/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 63310e6..b42b556 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -198,7 +199,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; /** @@ -3778,6 +3784,25 @@ public class HBaseAdmin implements Admin { } } + @InterfaceAudience.Private + @InterfaceStability.Evolving + private static class ReplicationFuture extends ProcedureFuture { + private final String peerId; + private final Supplier getOperation; + + public ReplicationFuture(HBaseAdmin admin, String peerId, Long procId, + Supplier getOperation) { + super(admin, procId); + this.peerId = peerId; + this.getOperation = getOperation; + } + + @Override + public String toString() { + return "Operation: " + getOperation.get() + ", peerId: " + peerId; + } + } + @Override public List getSecurityCapabilities() throws IOException { try { @@ -3850,50 +3875,82 @@ public class HBaseAdmin implements Admin { @Override public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws IOException { - executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.addReplicationPeer(getRpcController(), - RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled)); - return null; - } - }); + get(addReplicationPeerAsync(peerId, peerConfig, enabled), this.syncWaitTimeout, + TimeUnit.MILLISECONDS); + } + + @Override + public Future addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig, + boolean enabled) throws IOException { + AddReplicationPeerResponse response = executeCallable( + new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected AddReplicationPeerResponse rpcCall() throws Exception { + return master.addReplicationPeer(getRpcController(), + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), () -> "ADD_REPLICATION_PEER"); } @Override public void removeReplicationPeer(String peerId) throws IOException { - executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.removeReplicationPeer(getRpcController(), - RequestConverter.buildRemoveReplicationPeerRequest(peerId)); - return null; - } - }); + get(removeReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); + } + + @Override + public Future removeReplicationPeerAsync(String peerId) throws IOException { + RemoveReplicationPeerResponse response = + executeCallable(new MasterCallable(getConnection(), + getRpcControllerFactory()) { + @Override + protected RemoveReplicationPeerResponse rpcCall() throws Exception { + return master.removeReplicationPeer(getRpcController(), + RequestConverter.buildRemoveReplicationPeerRequest(peerId)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "REMOVE_REPLICATION_PEER"); } @Override public void enableReplicationPeer(final String peerId) throws IOException { - executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.enableReplicationPeer(getRpcController(), - RequestConverter.buildEnableReplicationPeerRequest(peerId)); - return null; - } - }); + get(enableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); + } + + @Override + public Future enableReplicationPeerAsync(final String peerId) throws IOException { + EnableReplicationPeerResponse response = + executeCallable(new MasterCallable(getConnection(), + getRpcControllerFactory()) { + @Override + protected EnableReplicationPeerResponse rpcCall() throws Exception { + return master.enableReplicationPeer(getRpcController(), + RequestConverter.buildEnableReplicationPeerRequest(peerId)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "ENABLE_REPLICATION_PEER"); } @Override public void disableReplicationPeer(final String peerId) throws IOException { - executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.disableReplicationPeer(getRpcController(), - RequestConverter.buildDisableReplicationPeerRequest(peerId)); - return null; - } - }); + get(disableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); + } + + @Override + public Future disableReplicationPeerAsync(final String peerId) throws IOException { + DisableReplicationPeerResponse response = + executeCallable(new MasterCallable(getConnection(), + getRpcControllerFactory()) { + @Override + protected DisableReplicationPeerResponse rpcCall() throws Exception { + return master.disableReplicationPeer(getRpcController(), + RequestConverter.buildDisableReplicationPeerRequest(peerId)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "DISABLE_REPLICATION_PEER"); } @Override @@ -3912,14 +3969,24 @@ public class HBaseAdmin implements Admin { @Override public void updateReplicationPeerConfig(final String peerId, final ReplicationPeerConfig peerConfig) throws IOException { - executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.updateReplicationPeerConfig(getRpcController(), - RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig)); - return null; - } - }); + get(updateReplicationPeerConfigAsync(peerId, peerConfig), this.syncWaitTimeout, + TimeUnit.MILLISECONDS); + } + + @Override + public Future updateReplicationPeerConfigAsync(final String peerId, + final ReplicationPeerConfig peerConfig) throws IOException { + UpdateReplicationPeerConfigResponse response = + executeCallable(new MasterCallable(getConnection(), + getRpcControllerFactory()) { + @Override + protected UpdateReplicationPeerConfigResponse rpcCall() throws Exception { + return master.updateReplicationPeerConfig(getRpcController(), + RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "UPDATE_REPLICATION_PEER_CONFIG"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/d77c7449/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 37d5330..3bd38fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1522,47 +1523,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) { - return this - . newMasterCaller() - .action( - (controller, stub) -> this - . call(controller, stub, - RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s, - c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); + return this. procedureCall( + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), + (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); } @Override public CompletableFuture removeReplicationPeer(String peerId) { - return this - . newMasterCaller() - .action( - (controller, stub) -> this - . call(controller, - stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId), - (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call(); + return this. procedureCall( + RequestConverter.buildRemoveReplicationPeerRequest(peerId), + (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); } @Override public CompletableFuture enableReplicationPeer(String peerId) { - return this - . newMasterCaller() - .action( - (controller, stub) -> this - . call(controller, - stub, RequestConverter.buildEnableReplicationPeerRequest(peerId), - (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call(); + return this. procedureCall( + RequestConverter.buildEnableReplicationPeerRequest(peerId), + (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); } @Override public CompletableFuture disableReplicationPeer(String peerId) { - return this - . newMasterCaller() - .action( - (controller, stub) -> this - . call( - controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s, - c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null)) - .call(); + return this. procedureCall( + RequestConverter.buildDisableReplicationPeerRequest(peerId), + (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); } @Override @@ -1581,13 +1569,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) { return this - . newMasterCaller() - .action( - (controller, stub) -> this - . call( - controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, - peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), ( - resp) -> null)).call(); + . procedureCall( + RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), + (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), + (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); } @Override @@ -2546,6 +2532,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } + private class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { + private final String peerId; + private final Supplier getOperation; + + ReplicationProcedureBiConsumer(String peerId, Supplier getOperation) { + this.peerId = peerId; + this.getOperation = getOperation; + } + + String getDescription() { + return "Operation: " + getOperation.get() + ", peerId: " + peerId; + } + + @Override + void onFinished() { + LOG.info(getDescription() + " completed"); + } + + @Override + void onError(Throwable error) { + LOG.info(getDescription() + " failed with " + error.getMessage()); + } + } + private CompletableFuture waitProcedureResult(CompletableFuture procFuture) { CompletableFuture future = new CompletableFuture<>(); procFuture.whenComplete((procId, error) -> {