From commits-return-75013-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Thu Jun 28 05:23:58 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id F12E7180778 for ; Thu, 28 Jun 2018 05:23:56 +0200 (CEST) Received: (qmail 79840 invoked by uid 500); 28 Jun 2018 03:23:55 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 79091 invoked by uid 99); 28 Jun 2018 03:23:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jun 2018 03:23:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D1353E1180; Thu, 28 Jun 2018 03:23:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Thu, 28 Jun 2018 03:24:14 -0000 Message-Id: <21d27c0232b44dd28307c49f63729def@git.apache.org> In-Reply-To: <8c08ce3a686c43d7a973b12c1d82f982@git.apache.org> References: <8c08ce3a686c43d7a973b12c1d82f982@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/39] hbase git commit: HBASE-19782 Reject the replication request when peer is DA or A state HBASE-19782 Reject the replication request when peer is DA or A state Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3bf93a3d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3bf93a3d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3bf93a3d Branch: refs/heads/HBASE-19064 Commit: 3bf93a3db7d5166693a903559ad21bbda5ccc5b7 Parents: 8d6b95b Author: huzheng Authored: Fri Mar 2 18:05:29 2018 +0800 Committer: zhangduo Committed: Thu Jun 28 11:22:31 2018 +0800 ---------------------------------------------------------------------- .../hbase/protobuf/ReplicationProtbufUtil.java | 15 +++---- .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hbase/regionserver/HRegionServer.java | 5 +-- .../hbase/regionserver/RSRpcServices.java | 25 +++++++++-- .../RejectReplicationRequestStateChecker.java | 45 ++++++++++++++++++++ .../ReplaySyncReplicationWALCallable.java | 24 ++++++----- .../replication/regionserver/Replication.java | 2 +- .../regionserver/ReplicationSink.java | 16 +++---- .../SyncReplicationPeerInfoProvider.java | 11 ++--- .../SyncReplicationPeerInfoProviderImpl.java | 13 +++--- .../SyncReplicationPeerMappingManager.java | 5 +-- .../hbase/wal/SyncReplicationWALProvider.java | 7 +-- .../replication/SyncReplicationTestBase.java | 32 ++++++++++++++ .../replication/TestSyncReplicationActive.java | 13 +++++- .../regionserver/TestReplicationSink.java | 5 +-- .../regionserver/TestWALEntrySinkFilter.java | 3 +- .../wal/TestSyncReplicationWALProvider.java | 6 +-- 17 files changed, 169 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 157ad1b..c1b3911 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -16,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.protobuf; @@ -24,25 +23,25 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + @InterfaceAudience.Private public class ReplicationProtbufUtil { /** http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 48969d8..250b163 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1996,7 +1996,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private boolean shouldForbidMajorCompaction() { if (rsServices != null && rsServices.getReplicationSourceService() != null) { return rsServices.getReplicationSourceService().getSyncReplicationPeerInfoProvider() - .checkState(getRegionInfo(), ForbidMajorCompactionChecker.get()); + .checkState(getRegionInfo().getTable(), ForbidMajorCompactionChecker.get()); } return false; } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 677ee05..1a71cb7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2481,10 +2481,9 @@ public class HRegionServer extends HasThread implements } /** - * @return Return the object that implements the replication - * sink executorService. + * @return Return the object that implements the replication sink executorService. */ - ReplicationSinkService getReplicationSinkService() { + public ReplicationSinkService getReplicationSinkService() { return replicationSinkHandler; } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5316ac5..bdb86d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; @@ -2204,9 +2205,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private void checkShouldRejectReplicationRequest(List entries) throws IOException { + ReplicationSourceService replicationSource = regionServer.getReplicationSourceService(); + if (replicationSource == null || entries.isEmpty()) { + return; + } + // We can ensure that all entries are for one peer, so only need to check one entry's + // table name. if the table hit sync replication at peer side and the peer cluster + // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply + // those entries according to the design doc. + TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); + if (replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, + RejectReplicationRequestStateChecker.get())) { + throw new DoNotRetryIOException( + "Reject to apply to sink cluster because sync replication state of sink cluster " + + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); + } + } + /** * Replicate WAL entries on the region server. - * * @param controller the RPC controller * @param request the request * @throws ServiceException @@ -2220,7 +2238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (regionServer.replicationSinkHandler != null) { requestCount.increment(); List entries = request.getEntryList(); - CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); + checkShouldRejectReplicationRequest(entries); + CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), @@ -2435,7 +2454,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private boolean shouldRejectRequestsFromClient(HRegion region) { return regionServer.getReplicationSourceService().getSyncReplicationPeerInfoProvider() - .checkState(region.getRegionInfo(), RejectRequestsFromClientStateChecker.get()); + .checkState(region.getRegionInfo().getTable(), RejectRequestsFromClientStateChecker.get()); } private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java new file mode 100644 index 0000000..9ad0af2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.function.BiPredicate; + +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Check whether we need to reject the replication request from source cluster. + */ +@InterfaceAudience.Private +public class RejectReplicationRequestStateChecker + implements BiPredicate { + + private static final RejectReplicationRequestStateChecker INST = + new RejectReplicationRequestStateChecker(); + + @Override + public boolean test(SyncReplicationState state, SyncReplicationState newState) { + return state == SyncReplicationState.ACTIVE || state == SyncReplicationState.DOWNGRADE_ACTIVE + || newState == SyncReplicationState.ACTIVE + || newState == SyncReplicationState.DOWNGRADE_ACTIVE; + } + + public static RejectReplicationRequestStateChecker get() { + return INST; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index c9c5ef6..3cf065c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -46,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; /** @@ -81,14 +80,19 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { throw initError; } LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId); - try (Reader reader = getReader()) { - List entries = readWALEntries(reader); - while (!entries.isEmpty()) { - Pair pair = ReplicationProtbufUtil - .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); - HBaseRpcController controller = new HBaseRpcControllerImpl(pair.getSecond()); - rs.getRSRpcServices().replicateWALEntry(controller, pair.getFirst()); - entries = readWALEntries(reader); + if (rs.getReplicationSinkService() != null) { + try (Reader reader = getReader()) { + List entries = readWALEntries(reader); + while (!entries.isEmpty()) { + Pair pair = ReplicationProtbufUtil + .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); + ReplicateWALEntryRequest request = pair.getFirst(); + rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), + pair.getSecond(), request.getReplicationClusterId(), + request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath()); + // Read next entries. + entries = readWALEntries(reader); + } } } return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 2846d2c..2199415 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -275,7 +275,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer List oldSources = this.replicationManager.getOldSources(); for (ReplicationSourceInterface source : oldSources) { if (source instanceof ReplicationSource) { - sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); + sourceMetricsList.add(source.getSourceMetrics()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index eb09a3a..a334b16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -93,9 +94,8 @@ public class ReplicationSink { /** * Create a sink for replication - * - * @param conf conf object - * @param stopper boolean to tell this thread to stop + * @param conf conf object + * @param stopper boolean to tell this thread to stop * @throws IOException thrown when HDFS goes bad or bad file name */ public ReplicationSink(Configuration conf, Stoppable stopper) @@ -104,16 +104,15 @@ public class ReplicationSink { decorateConf(); this.metrics = new MetricsSink(); this.walEntrySinkFilter = setupWALEntrySinkFilter(); - String className = - conf.get("hbase.replication.source.fs.conf.provider", - DefaultSourceFSConfigurationProvider.class.getCanonicalName()); + String className = conf.get("hbase.replication.source.fs.conf.provider", + DefaultSourceFSConfigurationProvider.class.getCanonicalName()); try { Class c = Class.forName(className).asSubclass(SourceFSConfigurationProvider.class); this.provider = c.getDeclaredConstructor().newInstance(); } catch (Exception e) { throw new IllegalArgumentException( - "Configured source fs configuration provider class " + className + " throws error.", e); + "Configured source fs configuration provider class " + className + " throws error.", e); } } @@ -178,8 +177,7 @@ public class ReplicationSink { Map>>> bulkLoadHFileMap = null; for (WALEntry entry : entries) { - TableName table = - TableName.valueOf(entry.getKey().getTableName().toByteArray()); + TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); if (this.walEntrySinkFilter != null) { if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { // Skip Cells in CellScanner associated with this entry. http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java index 66fe3be..cfe525a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Optional; import java.util.function.BiPredicate; -import org.apache.hadoop.hbase.client.RegionInfo; + +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -31,17 +32,17 @@ import org.apache.yetus.audience.InterfaceAudience; public interface SyncReplicationPeerInfoProvider { /** - * Return the peer id and remote WAL directory if the region is synchronously replicated and the + * Return the peer id and remote WAL directory if the table is synchronously replicated and the * state is {@link SyncReplicationState#ACTIVE}. */ - Optional> getPeerIdAndRemoteWALDir(RegionInfo info); + Optional> getPeerIdAndRemoteWALDir(TableName table); /** - * Check whether the give region is contained in a sync replication peer which can pass the state + * Check whether the given table is contained in a sync replication peer which can pass the state * checker. *

* Will call the checker with current sync replication state and new sync replication state. */ - boolean checkState(RegionInfo info, + boolean checkState(TableName table, BiPredicate checker); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java index cb33dab..75274ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Optional; import java.util.function.BiPredicate; -import org.apache.hadoop.hbase.client.RegionInfo; + +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.SyncReplicationState; @@ -40,11 +41,11 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv } @Override - public Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { - if (info == null) { + public Optional> getPeerIdAndRemoteWALDir(TableName table) { + if (table == null) { return Optional.empty(); } - String peerId = mapping.getPeerId(info); + String peerId = mapping.getPeerId(table); if (peerId == null) { return Optional.empty(); } @@ -65,9 +66,9 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv } @Override - public boolean checkState(RegionInfo info, + public boolean checkState(TableName table, BiPredicate checker) { - String peerId = mapping.getPeerId(info); + String peerId = mapping.getPeerId(table); if (peerId == null) { return false; } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java index 64216cb..5d19f72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.yetus.audience.InterfaceAudience; @@ -42,7 +41,7 @@ class SyncReplicationPeerMappingManager { peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove); } - String getPeerId(RegionInfo info) { - return table2PeerId.get(info.getTable()); + String getPeerId(TableName tableName) { + return table2PeerId.get(tableName); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 3cd356d42..3b56aa2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -33,6 +33,7 @@ import java.util.function.BiPredicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -160,7 +161,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } WAL wal = null; Optional> peerIdAndRemoteWALDir = - peerInfoProvider.getPeerIdAndRemoteWALDir(region); + peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable()); if (peerIdAndRemoteWALDir.isPresent()) { Pair pair = peerIdAndRemoteWALDir.get(); wal = getWAL(pair.getFirst(), pair.getSecond()); @@ -273,12 +274,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen implements SyncReplicationPeerInfoProvider { @Override - public Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { + public Optional> getPeerIdAndRemoteWALDir(TableName table) { return Optional.empty(); } @Override - public boolean checkState(RegionInfo info, + public boolean checkState(TableName table, BiPredicate checker) { return false; } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index 30dbdb5..0d5fce8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -25,11 +25,13 @@ import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -37,9 +39,15 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -182,4 +190,28 @@ public class SyncReplicationTestBase { Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); return new Path(remoteWALDir, PEER_ID); } + + protected void verifyReplicationRequestRejection(HBaseTestingUtility utility, + boolean expectedRejection) throws Exception { + HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME); + ClusterConnection connection = regionServer.getClusterConnection(); + Entry[] entries = new Entry[10]; + for (int i = 0; i < entries.length; i++) { + entries[i] = + new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); + } + if (!expectedRejection) { + ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), + entries, null, null, null); + } else { + try { + ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()), + entries, null, null, null); + Assert.fail("Should throw IOException when sync-replication state is in A or DA"); + } catch (DoNotRetryIOException e) { + Assert.assertTrue(e.getMessage().contains("Reject to apply to sink cluster")); + Assert.assertTrue(e.getMessage().contains(TABLE_NAME.toString())); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java index f4fb5fe..bff4572 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -29,7 +29,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSyncReplicationActive.class); + HBaseClassTestRule.forClass(TestSyncReplicationActive.class); + @Test public void testActive() throws Exception { @@ -37,13 +38,21 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { SyncReplicationState.STANDBY); UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.ACTIVE); + + // confirm that peer with state A will reject replication request. + verifyReplicationRequestRejection(UTIL1, true); + verifyReplicationRequestRejection(UTIL2, false); + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); write(UTIL1, 0, 100); Thread.sleep(2000); // peer is disabled so no data have been replicated verifyNotReplicatedThroughRegion(UTIL2, 0, 100); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.DOWNGRADE_ACTIVE); + // confirm that peer with state DA will reject replication request. + verifyReplicationRequestRejection(UTIL2, true); // confirm that the data is there after we convert the peer to DA verify(UTIL2, 0, 100); @@ -59,6 +68,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { // confirm that we can convert to DA even if the remote slave cluster is down UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, SyncReplicationState.DOWNGRADE_ACTIVE); + // confirm that peer with state DA will reject replication request. + verifyReplicationRequestRejection(UTIL2, true); write(UTIL2, 200, 300); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index aa6c39c..2d6c28f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -129,8 +129,7 @@ public class TestReplicationSink { TestSourceFSConfigurationProvider.class.getCanonicalName()); TEST_UTIL.startMiniCluster(3); - SINK = - new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); + SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); @@ -419,7 +418,7 @@ public class TestReplicationSink { return builder.build(); } - private WALEntry.Builder createWALEntryBuilder(TableName table) { + public static WALEntry.Builder createWALEntryBuilder(TableName table) { WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); WALKey.Builder keyBuilder = WALKey.newBuilder(); http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 6299065..fd9ff29 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilder; @@ -127,7 +128,7 @@ public class TestWALEntrySinkFilter { conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); conf.setClass("hbase.client.connection.impl", DevNullConnection.class, - Connection.class); + Connection.class); ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); // Create some dumb walentries. List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries = http://git-wip-us.apache.org/repos/asf/hbase/blob/3bf93a3d/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 3263fe8..69ed44d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -75,8 +75,8 @@ public class TestSyncReplicationWALProvider { public static final class InfoProvider implements SyncReplicationPeerInfoProvider { @Override - public Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { - if (info.getTable().equals(TABLE)) { + public Optional> getPeerIdAndRemoteWALDir(TableName table) { + if (table != null && table.equals(TABLE)) { return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); } else { return Optional.empty(); @@ -84,7 +84,7 @@ public class TestSyncReplicationWALProvider { } @Override - public boolean checkState(RegionInfo info, + public boolean checkState(TableName table, BiPredicate checker) { // TODO Implement SyncReplicationPeerInfoProvider.isInState return false;