From commits-return-73292-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Mon May 28 09:41:01 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 46249180771 for ; Mon, 28 May 2018 09:40:58 +0200 (CEST) Received: (qmail 87657 invoked by uid 500); 28 May 2018 07:40:57 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 85599 invoked by uid 99); 28 May 2018 07:40:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 May 2018 07:40:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A4DAE1111; Mon, 28 May 2018 07:40:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Mon, 28 May 2018 07:41:07 -0000 Message-Id: In-Reply-To: <8a356c8908484abeaee1340a7e909bc7@git.apache.org> References: <8a356c8908484abeaee1340a7e909bc7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/34] hbase git commit: HBASE-19747 Introduce a special WALProvider for synchronous replication HBASE-19747 Introduce a special WALProvider for synchronous replication Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ce1b1a71 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ce1b1a71 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ce1b1a71 Branch: refs/heads/HBASE-19064 Commit: ce1b1a71b20831f008de26b8dbaae328521e79e9 Parents: 64d104d Author: zhangduo Authored: Fri Jan 19 18:38:39 2018 +0800 Committer: zhangduo Committed: Mon May 28 15:40:03 2018 +0800 ---------------------------------------------------------------------- .../hbase/regionserver/wal/AbstractFSWAL.java | 7 + .../hbase/regionserver/wal/AsyncFSWAL.java | 1 - .../hbase/regionserver/wal/DualAsyncFSWAL.java | 4 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 4 - .../regionserver/PeerActionListener.java | 33 +++ .../SynchronousReplicationPeerProvider.java | 35 +++ .../hadoop/hbase/wal/AbstractFSWALProvider.java | 1 + .../hadoop/hbase/wal/AsyncFSWALProvider.java | 18 +- .../hbase/wal/NettyAsyncFSWALConfigHelper.java | 8 +- .../hbase/wal/RegionGroupingProvider.java | 13 +- .../wal/SynchronousReplicationWALProvider.java | 225 +++++++++++++++++++ .../org/apache/hadoop/hbase/wal/WALFactory.java | 37 ++- .../org/apache/hadoop/hbase/wal/WALKeyImpl.java | 16 +- .../regionserver/TestCompactionPolicy.java | 1 + .../regionserver/TestFailedAppendAndSync.java | 122 +++++----- .../hadoop/hbase/regionserver/TestHRegion.java | 24 +- .../TestHRegionWithInMemoryFlush.java | 7 - .../hbase/regionserver/TestRegionIncrement.java | 20 +- .../hbase/regionserver/TestWALLockup.java | 1 + .../regionserver/wal/AbstractTestWALReplay.java | 1 + .../regionserver/wal/ProtobufLogTestHelper.java | 44 +++- .../hbase/regionserver/wal/TestAsyncFSWAL.java | 13 +- .../regionserver/wal/TestAsyncWALReplay.java | 4 +- .../wal/TestCombinedAsyncWriter.java | 3 +- .../hbase/regionserver/wal/TestFSHLog.java | 15 +- .../hbase/regionserver/wal/TestWALReplay.java | 1 + .../apache/hadoop/hbase/wal/IOTestProvider.java | 2 - .../TestSynchronousReplicationWALProvider.java | 153 +++++++++++++ 28 files changed, 659 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 825ad17..4255086 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -434,6 +434,13 @@ public abstract class AbstractFSWAL implements WAL { this.implClassName = getClass().getSimpleName(); } + /** + * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. + */ + public void init() throws IOException { + rollWriter(); + } + @Override public void registerWALActionsListener(WALActionsListener listener) { this.listeners.add(listener); http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 4732f41..d98ab75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -248,7 +248,6 @@ public class AsyncFSWAL extends AbstractFSWAL { batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); - rollWriter(); } private static boolean waitingRoll(int epochAndState) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index 42b0dae..0495337 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -38,14 +38,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL { private final Path remoteWalDir; - public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteRootDir, + public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, eventLoopGroup, channelClass); this.remoteFs = remoteFs; - this.remoteWalDir = new Path(remoteRootDir, logDir); + this.remoteWalDir = remoteWalDir; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 61b9cfb..baa87a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -214,12 +214,8 @@ public class FSHLog extends AbstractFSWAL { this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5); this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2); - this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); - // rollWriter sets this.hdfs_out if it can. - rollWriter(); - // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. String hostingThreadName = Thread.currentThread().getName(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java new file mode 100644 index 0000000..74ad626 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Get notification for replication peer events. Mainly used for telling the + * {@link org.apache.hadoop.hbase.wal.SynchronousReplicationWALProvider} to close some WAL if not + * used any more. + *

+ * TODO: Also need a synchronous peer state change notification. + */ +@InterfaceAudience.Private +public interface PeerActionListener { + + default void peerRemoved(String peerId) {} +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java new file mode 100644 index 0000000..b4e04fb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Optional; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Get the peer id and remote root dir if the region is synchronously replicated. + */ +@InterfaceAudience.Private +public interface SynchronousReplicationPeerProvider { + + /** + * Return the peer id and remote WAL directory if the region is synchronously replicated. + */ + Optional> getPeerIdAndRemoteWALDir(RegionInfo info); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 231afd5..3eb8f8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -137,6 +137,7 @@ public abstract class AbstractFSWALProvider> implemen if (walCopy == null) { walCopy = createWAL(); wal = walCopy; + walCopy.init(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index c920279..56edb75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,12 +31,10 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; -import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; /** * A WAL provider that use {@link AsyncFSWAL}. @@ -62,6 +59,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { private EventLoopGroup eventLoopGroup; private Class channelClass; + @Override protected AsyncFSWAL createWAL() throws IOException { return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), @@ -74,15 +72,9 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { @Override protected void doInit(Configuration conf) throws IOException { Pair> eventLoopGroupAndChannelClass = - NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); - if (eventLoopGroupAndChannelClass != null) { - eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); - channelClass = eventLoopGroupAndChannelClass.getSecond(); - } else { - eventLoopGroup = new NioEventLoopGroup(1, - new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY)); - channelClass = NioSocketChannel.class; - } + NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); + eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); + channelClass = eventLoopGroupAndChannelClass.getSecond(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java index 12b63f5..7f33eda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java @@ -27,6 +27,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; /** * Helper class for passing netty event loop config to {@link AsyncFSWALProvider}. @@ -57,7 +60,10 @@ public final class NettyAsyncFSWALConfigHelper { static Pair> getEventLoopConfig(Configuration conf) { String name = conf.get(EVENT_LOOP_CONFIG); if (StringUtils.isBlank(name)) { - return null; + // create new event loop group if config is empty + return Pair.> newPair( + new NioEventLoopGroup(0, new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY)), + NioSocketChannel.class); } return EVENT_LOOP_CONFIG_MAP.get(name); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 28817e9..0b7b8da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; // imports for classes still in regionserver.wal import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.Bytes; @@ -132,6 +133,7 @@ public class RegionGroupingProvider implements WALProvider { private RegionGroupingStrategy strategy; private WALFactory factory; + private Configuration conf; private List listeners = new ArrayList<>(); private String providerId; private Class providerClass; @@ -141,6 +143,7 @@ public class RegionGroupingProvider implements WALProvider { if (null != strategy) { throw new IllegalStateException("WALProvider.init should only be called once."); } + this.conf = conf; this.factory = factory; StringBuilder sb = new StringBuilder().append(factory.factoryId); if (providerId != null) { @@ -156,11 +159,11 @@ public class RegionGroupingProvider implements WALProvider { } private WALProvider createProvider(String group) throws IOException { - if (META_WAL_PROVIDER_ID.equals(providerId)) { - return factory.createProvider(providerClass, META_WAL_PROVIDER_ID); - } else { - return factory.createProvider(providerClass, group); - } + WALProvider provider = WALFactory.createProvider(providerClass); + provider.init(factory, conf, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group); + provider.addWALActionsListener(new MetricsWAL()); + return provider; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java new file mode 100644 index 0000000..f60599f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; +import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Streams; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + +/** + * The special {@link WALProvider} for synchronous replication. + *

+ * It works like an interceptor, when getting WAL, first it will check if the given region should be + * replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate + * the request to the normal {@link WALProvider}. + */ +@InterfaceAudience.Private +public class SynchronousReplicationWALProvider implements WALProvider, PeerActionListener { + + private static final Logger LOG = + LoggerFactory.getLogger(SynchronousReplicationWALProvider.class); + + private static final String LOG_SUFFIX = ".syncrep"; + + private final WALProvider provider; + + private final SynchronousReplicationPeerProvider peerProvider; + + private WALFactory factory; + + private Configuration conf; + + private List listeners = new ArrayList<>(); + + private EventLoopGroup eventLoopGroup; + + private Class channelClass; + + private AtomicBoolean initialized = new AtomicBoolean(false); + + private final ConcurrentMap peerId2WAL = new ConcurrentHashMap<>(); + + private final KeyLocker createLock = new KeyLocker<>(); + + SynchronousReplicationWALProvider(WALProvider provider, + SynchronousReplicationPeerProvider peerProvider) { + this.provider = provider; + this.peerProvider = peerProvider; + } + + @Override + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + if (!initialized.compareAndSet(false, true)) { + throw new IllegalStateException("WALProvider.init should only be called once."); + } + provider.init(factory, conf, providerId); + this.conf = conf; + this.factory = factory; + Pair> eventLoopGroupAndChannelClass = + NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); + eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); + channelClass = eventLoopGroupAndChannelClass.getSecond(); + } + + private String getLogPrefix(String peerId) { + return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId; + } + + private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException { + Path remoteWALDirPath = new Path(remoteWALDir); + FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf); + return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs, + CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId), + getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), + conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass); + } + + private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException { + DualAsyncFSWAL wal = peerId2WAL.get(peerId); + if (wal != null) { + return wal; + } + Lock lock = createLock.acquireLock(peerId); + try { + wal = peerId2WAL.get(peerId); + if (wal == null) { + wal = createWAL(peerId, remoteWALDir); + peerId2WAL.put(peerId, wal); + wal.init(); + } + return wal; + } finally { + lock.unlock(); + } + } + + @Override + public WAL getWAL(RegionInfo region) throws IOException { + Optional> peerIdAndRemoteWALDir = + peerProvider.getPeerIdAndRemoteWALDir(region); + if (peerIdAndRemoteWALDir.isPresent()) { + Pair pair = peerIdAndRemoteWALDir.get(); + return getWAL(pair.getFirst(), pair.getSecond()); + } else { + return provider.getWAL(region); + } + } + + private Stream getWALStream() { + return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream()); + } + + @Override + public List getWALs() { + return getWALStream().collect(Collectors.toList()); + } + + @Override + public void shutdown() throws IOException { + // save the last exception and rethrow + IOException failure = null; + for (DualAsyncFSWAL wal : peerId2WAL.values()) { + try { + wal.shutdown(); + } catch (IOException e) { + LOG.error("Shutdown WAL failed", e); + failure = e; + } + } + provider.shutdown(); + if (failure != null) { + throw failure; + } + } + + @Override + public void close() throws IOException { + // save the last exception and rethrow + IOException failure = null; + for (DualAsyncFSWAL wal : peerId2WAL.values()) { + try { + wal.close(); + } catch (IOException e) { + LOG.error("Close WAL failed", e); + failure = e; + } + } + provider.close(); + if (failure != null) { + throw failure; + } + } + + @Override + public long getNumLogFiles() { + return peerId2WAL.size() + provider.getNumLogFiles(); + } + + @Override + public long getLogFileSize() { + return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + + provider.getLogFileSize(); + } + + @Override + public void peerRemoved(String peerId) { + WAL wal = peerId2WAL.remove(peerId); + if (wal != null) { + try { + wal.close(); + } catch (IOException e) { + LOG.error("Close WAL failed", e); + } + } + } + + @Override + public void addWALActionsListener(WALActionsListener listener) { + listeners.add(listener); + provider.addWALActionsListener(listener); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 24604d9..339fd6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; @@ -132,13 +133,10 @@ public class WALFactory { } } - WALProvider createProvider(Class clazz, String providerId) - throws IOException { - LOG.info("Instantiating WALProvider of type " + clazz); + static WALProvider createProvider(Class clazz) throws IOException { + LOG.info("Instantiating WALProvider of type {}", clazz); try { - final WALProvider result = clazz.getDeclaredConstructor().newInstance(); - result.init(this, conf, providerId); - return result; + return clazz.newInstance(); } catch (Exception e) { LOG.error("couldn't set up WALProvider, the configured class is " + clazz); LOG.debug("Exception details for failure to load WALProvider.", e); @@ -150,9 +148,10 @@ public class WALFactory { * instantiate a provider from a config property. requires conf to have already been set (as well * as anything the provider might need to read). */ - WALProvider getProvider(String key, String defaultValue, String providerId) throws IOException { - Class clazz = getProviderClass(key, defaultValue); - WALProvider provider = createProvider(clazz, providerId); + private WALProvider getProvider(String key, String defaultValue, String providerId) + throws IOException { + WALProvider provider = createProvider(getProviderClass(key, defaultValue)); + provider.init(this, conf, providerId); provider.addWALActionsListener(new MetricsWAL()); return provider; } @@ -184,6 +183,26 @@ public class WALFactory { } /** + * A temporary constructor for testing synchronous replication. + *

+ * Remove it once we can integrate the synchronous replication logic in RS. + */ + @VisibleForTesting + WALFactory(Configuration conf, String factoryId, SynchronousReplicationPeerProvider peerProvider) + throws IOException { + timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); + /* TODO Both of these are probably specific to the fs wal provider */ + logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, + AbstractFSWALProvider.Reader.class); + this.conf = conf; + this.factoryId = factoryId; + WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); + this.provider = new SynchronousReplicationWALProvider(provider, peerProvider); + this.provider.addWALActionsListener(new MetricsWAL()); + this.provider.init(this, conf, null); + } + + /** * Shutdown all WALs and clean up any underlying storage. * Use only when you will not need to replay and edits that have gone to any wals from this * factory. http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java index 8828239..a7e4670 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -130,13 +130,21 @@ public class WALKeyImpl implements WALKey { } @VisibleForTesting - public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, - long logSeqNum, + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, final long now, UUID clusterId) { List clusterIds = new ArrayList<>(1); clusterIds.add(clusterId); - init(encodedRegionName, tablename, logSeqNum, now, clusterIds, - HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE, + HConstants.NO_NONCE, null, null); + } + + @VisibleForTesting + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, + final long now, UUID clusterId, MultiVersionConcurrencyControl mvcc) { + List clusterIds = new ArrayList<>(1); + clusterIds.add(clusterId); + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE, + HConstants.NO_NONCE, mvcc, null); } // TODO: Fix being able to pass in sequenceid. http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java index ca4b227..939f35c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java @@ -100,6 +100,7 @@ public class TestCompactionPolicy { HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); hlog = new FSHLog(fs, basedir, logName, conf); + hlog.init(); ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); region = HRegion.createHRegion(info, basedir, conf, htd, hlog); region.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 3cf06d4..1490653 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -102,65 +102,64 @@ public class TestFailedAppendAndSync { return name.getMethodName(); } - /** - * Reproduce locking up that happens when we get an exceptions appending and syncing. - * See HBASE-14317. - * First I need to set up some mocks for Server and RegionServerServices. I also need to - * set up a dodgy WAL that will throw an exception when we go to append to it. - */ - @Test - public void testLockupAroundBadAssignSync() throws IOException { + // Dodgy WAL. Will throw exceptions when flags set. + class DodgyFSLog extends FSHLog { + volatile boolean throwSyncException = false; + volatile boolean throwAppendException = false; final AtomicLong rolls = new AtomicLong(0); - // Dodgy WAL. Will throw exceptions when flags set. - class DodgyFSLog extends FSHLog { - volatile boolean throwSyncException = false; - volatile boolean throwAppendException = false; - public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) - throws IOException { - super(fs, root, logDir, conf); - } - - @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - byte [][] regions = super.rollWriter(force); - rolls.getAndIncrement(); - return regions; - } + public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } - @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); - return new Writer() { - @Override - public void close() throws IOException { - w.close(); - } + @Override + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + byte[][] regions = super.rollWriter(force); + rolls.getAndIncrement(); + return regions; + } - @Override - public void sync(boolean forceSync) throws IOException { - if (throwSyncException) { - throw new IOException("FAKE! Failed to replace a bad datanode..."); - } - w.sync(forceSync); - } + @Override + protected Writer createWriterInstance(Path path) throws IOException { + final Writer w = super.createWriterInstance(path); + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } - @Override - public void append(Entry entry) throws IOException { - if (throwAppendException) { - throw new IOException("FAKE! Failed to replace a bad datanode..."); - } - w.append(entry); - } + @Override + public void sync(boolean forceSync) throws IOException { + if (throwSyncException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); + } + w.sync(forceSync); + } - @Override - public long getLength() { - return w.getLength(); - } - }; + @Override + public void append(Entry entry) throws IOException { + if (throwAppendException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); } - } + w.append(entry); + } + @Override + public long getLength() { + return w.getLength(); + } + }; + } + } + /** + * Reproduce locking up that happens when we get an exceptions appending and syncing. + * See HBASE-14317. + * First I need to set up some mocks for Server and RegionServerServices. I also need to + * set up a dodgy WAL that will throw an exception when we go to append to it. + */ + @Test + public void testLockupAroundBadAssignSync() throws IOException { // Make up mocked server and services. Server server = mock(Server.class); when(server.getConfiguration()).thenReturn(CONF); @@ -172,6 +171,7 @@ public class TestFailedAppendAndSync { FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + getName()); DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); + dodgyWAL.init(); LogRoller logRoller = new LogRoller(server, services); logRoller.addWAL(dodgyWAL); logRoller.start(); @@ -192,7 +192,7 @@ public class TestFailedAppendAndSync { } catch (IOException ioe) { fail(); } - long rollsCount = rolls.get(); + long rollsCount = dodgyWAL.rolls.get(); try { dodgyWAL.throwAppendException = true; dodgyWAL.throwSyncException = false; @@ -202,8 +202,10 @@ public class TestFailedAppendAndSync { } catch (IOException ioe) { threwOnAppend = true; } - while (rollsCount == rolls.get()) Threads.sleep(100); - rollsCount = rolls.get(); + while (rollsCount == dodgyWAL.rolls.get()) { + Threads.sleep(100); + } + rollsCount = dodgyWAL.rolls.get(); // When we get to here.. we should be ok. A new WAL has been put in place. There were no // appends to sync. We should be able to continue. @@ -217,14 +219,16 @@ public class TestFailedAppendAndSync { } catch (IOException ioe) { threwOnBoth = true; } - while (rollsCount == rolls.get()) Threads.sleep(100); + while (rollsCount == dodgyWAL.rolls.get()) { + Threads.sleep(100); + } // Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able // to just continue. // So, should be no abort at this stage. Verify. - Mockito.verify(server, Mockito.atLeast(0)). - abort(Mockito.anyString(), (Throwable)Mockito.anyObject()); + Mockito.verify(server, Mockito.atLeast(0)).abort(Mockito.anyString(), + Mockito.any(Throwable.class)); try { dodgyWAL.throwAppendException = false; dodgyWAL.throwSyncException = true; @@ -239,8 +243,8 @@ public class TestFailedAppendAndSync { // happens. If it don't we'll timeout the whole test. That is fine. while (true) { try { - Mockito.verify(server, Mockito.atLeast(1)). - abort(Mockito.anyString(), (Throwable)Mockito.anyObject()); + Mockito.verify(server, Mockito.atLeast(1)).abort(Mockito.anyString(), + Mockito.any(Throwable.class)); break; } catch (WantedButNotInvoked t) { Threads.sleep(1); http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 73c88d2..2b3acd4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -217,7 +217,6 @@ public class TestHRegion { protected static HBaseTestingUtility TEST_UTIL; public static Configuration CONF ; private String dir; - private static FileSystem FILESYSTEM; private final int MAX_VERSIONS = 2; // Test names @@ -239,7 +238,6 @@ public class TestHRegion { @Before public void setup() throws IOException { TEST_UTIL = HBaseTestingUtility.createLocalHTU(); - FILESYSTEM = TEST_UTIL.getTestFileSystem(); CONF = TEST_UTIL.getConfiguration(); dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); method = name.getMethodName(); @@ -341,6 +339,7 @@ public class TestHRegion { FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); + faultyLog.init(); HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); @@ -352,7 +351,6 @@ public class TestHRegion { Put put = new Put(value); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC); - boolean threwIOE = false; try { region.put(put); @@ -389,6 +387,7 @@ public class TestHRegion { FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + testName); FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF); + hLog.init(); HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); HStore store = region.getStore(COLUMN_FAMILY_BYTES); @@ -1164,6 +1163,7 @@ public class TestHRegion { FailAppendFlushMarkerWAL wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf), method, walConf); + wal.init(); this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); try { @@ -1195,7 +1195,7 @@ public class TestHRegion { wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH}; wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf), method, walConf); - + wal.init(); this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); region.put(put); @@ -2447,6 +2447,7 @@ public class TestHRegion { FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL"); FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF); + hLog.init(); // This chunk creation is done throughout the code base. Do we want to move it into core? // It is missing from this test. W/o it we NPE. ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); @@ -2499,9 +2500,9 @@ public class TestHRegion { RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must // do below format (from Mockito doc). - Mockito.doAnswer(new Answer() { + Mockito.doAnswer(new Answer() { @Override - public Object answer(InvocationOnMock invocation) throws Throwable { + public Void answer(InvocationOnMock invocation) throws Throwable { MiniBatchOperationInProgress mb = invocation.getArgument(0); mb.addOperationsFromCP(0, new Mutation[]{addPut}); return null; @@ -3795,9 +3796,12 @@ public class TestHRegion { boolean previousEmpty = res.isEmpty(); res.clear(); - InternalScanner scanner = region.getScanner(scan); - while (scanner.next(res)) - ; + try (InternalScanner scanner = region.getScanner(scan)) { + boolean moreRows; + do { + moreRows = scanner.next(res); + } while (moreRows); + } if (!res.isEmpty() || !previousEmpty || i > compactInterval) { assertEquals("i=" + i, expectedCount, res.size()); long timestamp = res.get(0).getTimestamp(); @@ -3893,7 +3897,7 @@ public class TestHRegion { region.put(put); numPutsFinished++; if (numPutsFinished > 0 && numPutsFinished % 47 == 0) { - System.out.println("put iteration = " + numPutsFinished); + LOG.debug("put iteration = {}", numPutsFinished); Delete delete = new Delete(row, (long) numPutsFinished - 30); region.delete(delete); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java index ce83326..84f7973 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java @@ -27,25 +27,18 @@ import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.wal.WAL; import org.junit.ClassRule; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A test similar to TestHRegion, but with in-memory flush families. * Also checks wal truncation after in-memory compaction. */ @Category({VerySlowRegionServerTests.class, LargeTests.class}) -@SuppressWarnings("deprecation") public class TestHRegionWithInMemoryFlush extends TestHRegion { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHRegionWithInMemoryFlush.class); - // Do not spin up clusters in here. If you need to spin up a cluster, do it - // over in TestHRegionOnCluster. - private static final Logger LOG = LoggerFactory.getLogger(TestHRegionWithInMemoryFlush.class); - /** * @return A region on which you must call * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java index 8b96fa7..e5006ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Scan; @@ -36,7 +37,6 @@ import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -81,12 +81,12 @@ public class TestRegionIncrement { } private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { - WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), - TEST_UTIL.getDataTestDir().toString(), conf); + FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), + TEST_UTIL.getDataTestDir().toString(), conf); + wal.init(); ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); - return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, - false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); + return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); } private void closeRegion(final HRegion region) throws IOException { @@ -170,8 +170,6 @@ public class TestRegionIncrement { /** * Have each thread update its own Cell. Avoid contention with another thread. - * @throws IOException - * @throws InterruptedException */ @Test public void testUnContendedSingleCellIncrement() @@ -209,13 +207,9 @@ public class TestRegionIncrement { /** * Have each thread update its own Cell. Avoid contention with another thread. - * This is - * @throws IOException - * @throws InterruptedException */ @Test - public void testContendedAcrossCellsIncrement() - throws IOException, InterruptedException { + public void testContendedAcrossCellsIncrement() throws IOException, InterruptedException { final HRegion region = getRegion(TEST_UTIL.getConfiguration(), TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); long startTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 29a75b8..84b8d6c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -215,6 +215,7 @@ public class TestWALLockup { FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + getName()); DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); + dodgyWAL.init(); Path originalWAL = dodgyWAL.getCurrentFileName(); // I need a log roller running. LogRoller logRoller = new LogRoller(server, services); http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index e7cdf1f..93c379c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1097,6 +1097,7 @@ public abstract class AbstractTestWALReplay { private MockWAL createMockWAL() throws IOException { MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); + wal.init(); // Set down maximum recovery so we dfsclient doesn't linger retrying something // long gone. HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java index aece961..420585f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; @@ -41,7 +42,7 @@ import org.apache.hadoop.hbase.wal.WALProvider; /** * Helper class for testing protobuf log. */ -final class ProtobufLogTestHelper { +public final class ProtobufLogTestHelper { private ProtobufLogTestHelper() { } @@ -54,17 +55,22 @@ final class ProtobufLogTestHelper { return RegionInfoBuilder.newBuilder(tableName).setRegionId(1024).build(); } + private static WAL.Entry generateEdit(int i, RegionInfo hri, TableName tableName, byte[] row, + int columnCount, long timestamp, MultiVersionConcurrencyControl mvcc) { + WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp, + HConstants.DEFAULT_CLUSTER_ID, mvcc); + WALEdit edit = new WALEdit(); + int prefix = i; + IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j)) + .map(value -> new KeyValue(row, row, row, timestamp, value)).forEachOrdered(edit::add); + return new WAL.Entry(key, edit); + } + public static void doWrite(WALProvider.Writer writer, boolean withTrailer, TableName tableName, int columnCount, int recordCount, byte[] row, long timestamp) throws IOException { RegionInfo hri = toRegionInfo(tableName); for (int i = 0; i < recordCount; i++) { - WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp, - HConstants.DEFAULT_CLUSTER_ID); - WALEdit edit = new WALEdit(); - int prefix = i; - IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j)) - .map(value -> new KeyValue(row, row, row, timestamp, value)).forEachOrdered(edit::add); - writer.append(new WAL.Entry(key, edit)); + writer.append(generateEdit(i, hri, tableName, row, columnCount, timestamp, null)); } writer.sync(false); if (withTrailer) { @@ -72,14 +78,24 @@ final class ProtobufLogTestHelper { } } - public static void doRead(ProtobufLogReader reader, boolean withTrailer, TableName tableName, - int columnCount, int recordCount, byte[] row, long timestamp) throws IOException { + public static void doWrite(WAL wal, RegionInfo hri, TableName tableName, int columnCount, + int recordCount, byte[] row, long timestamp, MultiVersionConcurrencyControl mvcc) + throws IOException { + for (int i = 0; i < recordCount; i++) { + WAL.Entry entry = generateEdit(i, hri, tableName, row, columnCount, timestamp, mvcc); + wal.append(hri, entry.getKey(), entry.getEdit(), true); + } + wal.sync(); + } + + public static void doRead(ProtobufLogReader reader, boolean withTrailer, RegionInfo hri, + TableName tableName, int columnCount, int recordCount, byte[] row, long timestamp) + throws IOException { if (withTrailer) { assertNotNull(reader.trailer); } else { assertNull(reader.trailer); } - RegionInfo hri = toRegionInfo(tableName); for (int i = 0; i < recordCount; ++i) { WAL.Entry entry = reader.next(); assertNotNull(entry); @@ -96,4 +112,10 @@ final class ProtobufLogTestHelper { } assertNull(reader.next()); } + + public static void doRead(ProtobufLogReader reader, boolean withTrailer, TableName tableName, + int columnCount, int recordCount, byte[] row, long timestamp) throws IOException { + doRead(reader, withTrailer, toRegionInfo(tableName), tableName, columnCount, recordCount, row, + timestamp); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 450c01b..5f0f77c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -67,8 +67,10 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { protected AbstractFSWAL newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix) throws IOException { - return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix, GROUP, CHANNEL_CLASS); + AsyncFSWAL wal = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, + failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS); + wal.init(); + return wal; } @Override @@ -76,15 +78,16 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, final Runnable action) throws IOException { - return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix, GROUP, CHANNEL_CLASS) { + AsyncFSWAL wal = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, + failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) { @Override void atHeadOfRingBufferEventHandlerAppend() { action.run(); super.atHeadOfRingBufferEventHandlerAppend(); } - }; + wal.init(); + return wal; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java index 80b7477..0740954 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java @@ -66,7 +66,9 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay { @Override protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { - return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName, + AsyncFSWAL wal = new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, CHANNEL_CLASS); + wal.init(); + return wal; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index cb8edc6..36dbe0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -77,8 +77,7 @@ public class TestCombinedAsyncWriter { CHANNEL_CLASS = NioSocketChannel.class; UTIL.startMiniDFSCluster(3); UTIL.getTestFileSystem().mkdirs(UTIL.getDataTestDirOnTestFS()); - WALS = - new WALFactory(UTIL.getConfiguration(), TestCombinedAsyncWriter.class.getSimpleName()); + WALS = new WALFactory(UTIL.getConfiguration(), TestCombinedAsyncWriter.class.getSimpleName()); } @AfterClass http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 7baaa6c..f288f74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -74,8 +74,10 @@ public class TestFSHLog extends AbstractTestFSWAL { protected AbstractFSWAL newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix) throws IOException { - return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix); + FSHLog wal = + new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + wal.init(); + return wal; } @Override @@ -83,8 +85,8 @@ public class TestFSHLog extends AbstractTestFSWAL { String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, final Runnable action) throws IOException { - return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix) { + FSHLog wal = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, + prefix, suffix) { @Override void atHeadOfRingBufferEventHandlerAppend() { @@ -92,6 +94,8 @@ public class TestFSHLog extends AbstractTestFSWAL { super.atHeadOfRingBufferEventHandlerAppend(); } }; + wal.init(); + return wal; } @Test @@ -100,6 +104,7 @@ public class TestFSHLog extends AbstractTestFSWAL { final String name = this.name.getMethodName(); FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); + log.init(); try { Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); ringBufferEventHandlerField.setAccessible(true); @@ -142,7 +147,7 @@ public class TestFSHLog extends AbstractTestFSWAL { try (FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { - + log.init(); log.registerWALActionsListener(new WALActionsListener() { @Override public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 649e981..66e19a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -48,6 +48,7 @@ public class TestWALReplay extends AbstractTestWALReplay { @Override protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); + wal.init(); // Set down maximum recovery so we dfsclient doesn't linger retrying something // long gone. HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 01f0dc6..453b742 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -103,8 +103,6 @@ public class IOTestProvider implements WALProvider { this.factory = factory; this.conf = conf; this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; - - } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ce1b1a71/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java new file mode 100644 index 0000000..e6031c6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestSynchronousReplicationWALProvider { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_ID = "1"; + + private static String REMOTE_WAL_DIR = "/RemoteWAL"; + + private static TableName TABLE = TableName.valueOf("table"); + + private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep"); + + private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build(); + + private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build(); + + private static WALFactory FACTORY; + + private static Optional> getPeerIdAndRemoteWALDir(RegionInfo info) { + if (info.getTable().equals(TABLE)) { + return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); + } else { + return Optional.empty(); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniDFSCluster(3); + FACTORY = new WALFactory(UTIL.getConfiguration(), "test", + TestSynchronousReplicationWALProvider::getPeerIdAndRemoteWALDir); + UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID)); + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + FACTORY.close(); + UTIL.shutdownMiniDFSCluster(); + } + + private void testReadWrite(DualAsyncFSWAL wal) throws Exception { + int recordCount = 100; + int columnCount = 10; + byte[] row = Bytes.toBytes("testRow"); + long timestamp = System.currentTimeMillis(); + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp, + mvcc); + Path localFile = wal.getCurrentFileName(); + Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName()); + try (ProtobufLogReader reader = + (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) { + ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, + timestamp); + } + try (ProtobufLogReader reader = + (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) { + ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, + timestamp); + } + wal.rollWriter(); + DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem(); + UTIL.waitFor(5000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile); + } + + @Override + public String explainFailure() throws Exception { + StringBuilder sb = new StringBuilder(); + if (!dfs.isFileClosed(localFile)) { + sb.append(localFile + " has not been closed yet."); + } + if (!dfs.isFileClosed(remoteFile)) { + sb.append(remoteFile + " has not been closed yet."); + } + return sb.toString(); + } + }); + try (ProtobufLogReader reader = + (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) { + ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row, + timestamp); + } + try (ProtobufLogReader reader = + (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) { + ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row, + timestamp); + } + } + + @Test + public void test() throws Exception { + WAL walNoRep = FACTORY.getWAL(REGION_NO_REP); + assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class))); + DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION); + assertEquals(2, FACTORY.getWALs().size()); + testReadWrite(wal); + SynchronousReplicationWALProvider walProvider = + (SynchronousReplicationWALProvider) FACTORY.getWALProvider(); + walProvider.peerRemoved(PEER_ID); + assertEquals(1, FACTORY.getWALs().size()); + } +}