From commits-return-82834-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Fri Feb 1 08:02:40 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 778231807A1 for ; Fri, 1 Feb 2019 09:02:39 +0100 (CET) Received: (qmail 76074 invoked by uid 500); 1 Feb 2019 08:02:38 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 75743 invoked by uid 99); 1 Feb 2019 08:02:37 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Feb 2019 08:02:37 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D4FBF85785; Fri, 1 Feb 2019 08:02:36 +0000 (UTC) Date: Fri, 01 Feb 2019 08:02:40 +0000 To: "commits@hbase.apache.org" Subject: [hbase] 04/08: HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: zhangduo@apache.org In-Reply-To: <154900815658.13751.17726520964395203950@gitbox.apache.org> References: <154900815658.13751.17726520964395203950@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: hbase X-Git-Refname: refs/heads/HBASE-21512 X-Git-Reftype: branch X-Git-Rev: b1ac31b82a1277a04bc15759bef06fd99213c243 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190201080236.D4FBF85785@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git commit b1ac31b82a1277a04bc15759bef06fd99213c243 Author: zhangduo AuthorDate: Tue Jan 1 21:27:14 2019 +0800 HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint --- .../hbase/client/AsyncRegionServerAdmin.java | 14 +++++--- .../hbase/protobuf/ReplicationProtbufUtil.java | 35 ++++++++++--------- .../HBaseInterClusterReplicationEndpoint.java | 31 +++++++++-------- .../regionserver/ReplicationSinkManager.java | 40 ++++++++-------------- .../hbase/replication/SyncReplicationTestBase.java | 12 +++---- .../regionserver/TestReplicationSinkManager.java | 21 +++++------- 6 files changed, 74 insertions(+), 79 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java index 9accd89..b9141a9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.yetus.audience.InterfaceAudience; @@ -94,9 +95,9 @@ public class AsyncRegionServerAdmin { void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback done); } - private CompletableFuture call(RpcCall rpcCall) { + private CompletableFuture call(RpcCall rpcCall, CellScanner cellScanner) { CompletableFuture future = new CompletableFuture<>(); - HBaseRpcController controller = conn.rpcControllerFactory.newController(); + HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner); try { rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback() { @@ -115,6 +116,10 @@ public class AsyncRegionServerAdmin { return future; } + private CompletableFuture call(RpcCall rpcCall) { + return call(rpcCall, null); + } + public CompletableFuture getRegionInfo(GetRegionInfoRequest request) { return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done)); } @@ -154,8 +159,9 @@ public class AsyncRegionServerAdmin { } public CompletableFuture replicateWALEntry( - ReplicateWALEntryRequest request) { - return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done)); + ReplicateWALEntryRequest request, CellScanner cellScanner) { + return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done), + cellScanner); } public CompletableFuture replay(ReplicateWALEntryRequest request) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index c1b3911..74fad26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -20,51 +20,54 @@ package org.apache.hadoop.hbase.protobuf; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.io.SizedCellScanner; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @InterfaceAudience.Private public class ReplicationProtbufUtil { + /** - * A helper to replicate a list of WAL entries using admin protocol. - * @param admin Admin service + * A helper to replicate a list of WAL entries using region server admin + * @param admin the region server admin * @param entries Array of WAL entries to be replicated * @param replicationClusterId Id which will uniquely identify source cluster FS client * configurations in the replication configuration directory * @param sourceBaseNamespaceDir Path to source cluster base namespace directory * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory - * @throws java.io.IOException */ - public static void replicateWALEntry(final AdminService.BlockingInterface admin, - final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir, - Path sourceHFileArchiveDir) throws IOException { - Pair p = - buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, - sourceHFileArchiveDir); - HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond()); + public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) + throws IOException { + Pair p = buildReplicateWALEntryRequest( + entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); try { - admin.replicateWALEntry(controller, p.getFirst()); - } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) { - throw ProtobufUtil.getServiceException(e); + admin.replicateWALEntry(p.getFirst(), p.getSecond()).get(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, IOException.class); + throw new IOException(e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 7db53aa..0359096 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -39,7 +39,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -48,13 +47,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -65,8 +67,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; - /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} * implementation for replicating to another HBase cluster. @@ -85,8 +85,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; - private ClusterConnection conn; - private Configuration localConf; + private AsyncClusterConnection conn; private Configuration conf; // How long should we sleep for each retry private long sleepForRetries; @@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(ctx.getConfiguration()); - this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); decorateConf(); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", @@ -132,12 +130,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. - this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf); + this.conn = + ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); // ReplicationQueueInfo parses the peerId out of the znode for us - this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); + this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); @@ -284,9 +283,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } private void reconnectToPeerCluster() { - ClusterConnection connection = null; + AsyncClusterConnection connection = null; try { - connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); + connection = + ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); } catch (IOException ioe) { LOG.warn("Failed to create connection for peer cluster", ioe); } @@ -367,7 +367,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } continue; } - if (this.conn == null || this.conn.isClosed()) { + if (this.conn == null) { reconnectToPeerCluster(); } try { @@ -480,10 +480,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi entriesHashCode, entries.size(), size, replicationClusterId); } sinkPeer = replicationSinkMgr.getReplicationSink(); - BlockingInterface rrs = sinkPeer.getRegionServer(); + AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); try { - ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + ReplicationProtbufUtil.replicateWALEntry(rsAdmin, + entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, + hfileArchiveDir); LOG.trace("Completed replicating batch {}", entriesHashCode); } catch (IOException e) { LOG.trace("Failed replicating batch {}", entriesHashCode, e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java index 3cd7884..21b07ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -35,8 +35,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; - /** * Maintains a collection of peers to replicate to, and randomly selects a * single peer to replicate to per set of data to replicate. Also handles @@ -61,9 +59,7 @@ public class ReplicationSinkManager { static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; - private final Connection conn; - - private final String peerClusterId; + private final AsyncClusterConnection conn; private final HBaseReplicationEndpoint endpoint; @@ -77,8 +73,6 @@ public class ReplicationSinkManager { // replication sinks is refreshed private final int badSinkThreshold; - private final Random random; - // A timestamp of the last time the list of replication peers changed private long lastUpdateToPeers; @@ -88,26 +82,22 @@ public class ReplicationSinkManager { /** * Instantiate for a single replication peer cluster. * @param conn connection to the peer cluster - * @param peerClusterId identifier of the peer cluster * @param endpoint replication endpoint for inter cluster replication * @param conf HBase configuration, used for determining replication source ratio and bad peer * threshold */ - public ReplicationSinkManager(ClusterConnection conn, String peerClusterId, - HBaseReplicationEndpoint endpoint, Configuration conf) { + public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint, + Configuration conf) { this.conn = conn; - this.peerClusterId = peerClusterId; this.endpoint = endpoint; this.badReportCounts = Maps.newHashMap(); this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); - this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold", - DEFAULT_BAD_SINK_THRESHOLD); - this.random = new Random(); + this.badSinkThreshold = + conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); } /** * Get a randomly-chosen replication sink to replicate to. - * * @return a replication sink to replicate to */ public synchronized SinkPeer getReplicationSink() throws IOException { @@ -119,8 +109,8 @@ public class ReplicationSinkManager { if (sinks.isEmpty()) { throw new IOException("No replication sinks are available"); } - ServerName serverName = sinks.get(random.nextInt(sinks.size())); - return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName)); + ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size())); + return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName)); } /** @@ -160,7 +150,7 @@ public class ReplicationSinkManager { */ public synchronized void chooseSinks() { List slaveAddresses = endpoint.getRegionServers(); - Collections.shuffle(slaveAddresses, random); + Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); sinks = slaveAddresses.subList(0, numSinks); lastUpdateToPeers = System.currentTimeMillis(); @@ -182,9 +172,9 @@ public class ReplicationSinkManager { */ public static class SinkPeer { private ServerName serverName; - private AdminService.BlockingInterface regionServer; + private AsyncRegionServerAdmin regionServer; - public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) { + public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { this.serverName = serverName; this.regionServer = regionServer; } @@ -193,10 +183,8 @@ public class ReplicationSinkManager { return serverName; } - public AdminService.BlockingInterface getRegionServer() { + public AsyncRegionServerAdmin getRegionServer() { return regionServer; } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index f373590..e0d112d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -250,19 +250,19 @@ public class SyncReplicationTestBase { protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility, boolean expectedRejection) throws Exception { HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME); - ClusterConnection connection = regionServer.getClusterConnection(); + AsyncClusterConnection connection = regionServer.getAsyncClusterConnection(); Entry[] entries = new Entry[10]; for (int i = 0; i < entries.length; i++) { entries[i] = new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); } if (!expectedRejection) { - ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), - entries, null, null, null); + ReplicationProtbufUtil.replicateWALEntry( + connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null); } else { try { - ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), - entries, null, null, null); + ReplicationProtbufUtil.replicateWALEntry( + connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null); fail("Should throw IOException when sync-replication state is in A or DA"); } catch (DoNotRetryIOException e) { assertTrue(e.getMessage().contains("Reject to apply to sink cluster")); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java index 39dabb4..60afd40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java @@ -25,7 +25,8 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -37,8 +38,6 @@ import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; - @Category({ReplicationTests.class, SmallTests.class}) public class TestReplicationSinkManager { @@ -46,16 +45,14 @@ public class TestReplicationSinkManager { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationSinkManager.class); - private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; - private HBaseReplicationEndpoint replicationEndpoint; private ReplicationSinkManager sinkManager; @Before public void setUp() { replicationEndpoint = mock(HBaseReplicationEndpoint.class); - sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class), - PEER_CLUSTER_ID, replicationEndpoint, new Configuration()); + sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class), + replicationEndpoint, new Configuration()); } @Test @@ -100,7 +97,7 @@ public class TestReplicationSinkManager { // Sanity check assertEquals(1, sinkManager.getNumSinks()); - SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); + SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); sinkManager.reportBadSink(sinkPeer); @@ -131,7 +128,7 @@ public class TestReplicationSinkManager { ServerName serverName = sinkManager.getSinksForTesting().get(0); - SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); + SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class)); sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { @@ -147,7 +144,7 @@ public class TestReplicationSinkManager { // serverName = sinkManager.getSinksForTesting().get(0); - sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); + sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) { sinkManager.reportBadSink(sinkPeer); } @@ -188,8 +185,8 @@ public class TestReplicationSinkManager { ServerName serverNameA = sinkList.get(0); ServerName serverNameB = sinkList.get(1); - SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); - SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); + SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); + SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { sinkManager.reportBadSink(sinkPeerA);