From commits-return-98691-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Tue Jun 1 05:13:23 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id A0974180654 for ; Tue, 1 Jun 2021 07:13:23 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id ED56360BB6 for ; Tue, 1 Jun 2021 05:13:22 +0000 (UTC) Received: (qmail 70319 invoked by uid 500); 1 Jun 2021 05:13:19 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 70310 invoked by uid 99); 1 Jun 2021 05:13:19 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Jun 2021 05:13:19 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2134B81A86; Tue, 1 Jun 2021 05:13:19 +0000 (UTC) Date: Tue, 01 Jun 2021 05:13:16 +0000 To: "commits@hbase.apache.org" Subject: [hbase] branch master updated: HBASE-25932: Ensure replication reads the trailer bytes from WAL. (#3332) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <162252439437.16739.1746097381747404549@gitbox.apache.org> From: bharathv@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: hbase X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 06c6e068030f1213f1d0cd7a7577471041d035ef X-Git-Newrev: b04c3c77860fe430308a6bfd7dd8aebe1ad3510c X-Git-Rev: b04c3c77860fe430308a6bfd7dd8aebe1ad3510c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. bharathv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase.git The following commit(s) were added to refs/heads/master by this push: new b04c3c7 HBASE-25932: Ensure replication reads the trailer bytes from WAL. (#3332) b04c3c7 is described below commit b04c3c77860fe430308a6bfd7dd8aebe1ad3510c Author: Bharath Vissapragada AuthorDate: Mon May 31 22:12:47 2021 -0700 HBASE-25932: Ensure replication reads the trailer bytes from WAL. (#3332) This bug was exposed by the test from HBASE-25924. Since this wal implementations close the wal asynchronously, replication can potentially miss the trailer bytes. (see jira comment for detailed analysis). While this is not a correctness problem (since trailer does not have any entry data), it erroneously bumps a metric that is used to track skipped bytes in WAL resulting in false alarms which is something we should avoid. Reviewed-by: Rushabh Shah Signed-off-by: Viraj Jasani Signed-off-by Anoop Sam John --- .../hbase/regionserver/wal/AbstractFSWAL.java | 20 +++++++++ .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 10 +++-- .../hadoop/hbase/regionserver/wal/FSHLog.java | 5 ++- .../replication/regionserver/WALEntryStream.java | 7 ++- .../regionserver/TestFSHLogWALEntryStream.java | 51 ++++++++++++++++++++++ .../regionserver/TestWALEntryStream.java | 51 +++++++++++++++++----- 6 files changed, 125 insertions(+), 19 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index b569779..52f3f71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -192,6 +193,9 @@ public abstract class AbstractFSWAL implements WAL { /** Listeners that are called on WAL events. */ protected final List listeners = new CopyOnWriteArrayList<>(); + /** Tracks the logs in the process of being closed. */ + protected final Map inflightWALClosures = new ConcurrentHashMap<>(); + /** * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has @@ -1028,6 +1032,13 @@ public abstract class AbstractFSWAL implements WAL { } /** + * @return number of WALs currently in the process of closing. + */ + public int getInflightWALCloseCount() { + return inflightWALClosures.size(); + } + + /** * updates the sequence number of a specific store. depending on the flag: replaces current seq * number if the given seq id is bigger, or even if it is lower than existing one */ @@ -1190,9 +1201,18 @@ public abstract class AbstractFSWAL implements WAL { try { Path currentPath = getOldPath(); if (path.equals(currentPath)) { + // Currently active path. W writer = this.writer; return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); } else { + W temp = inflightWALClosures.get(path.getName()); + if (temp != null) { + // In the process of being closed, trailer bytes may or may not be flushed. + // Ensuring that we read all the bytes in a file is critical for correctness of tailing + // use cases like replication, see HBASE-25924/HBASE-25932. + return OptionalLong.of(temp.getSyncedLength()); + } + // Log rolled successfully. return OptionalLong.empty(); } } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index c11496b..4b0f041 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -31,6 +31,7 @@ import java.util.Comparator; import java.util.Deque; import java.util.Iterator; import java.util.List; +import java.util.OptionalLong; import java.util.Queue; import java.util.SortedSet; import java.util.TreeSet; @@ -712,14 +713,17 @@ public class AsyncFSWAL extends AbstractFSWAL { } } - protected final long closeWriter(AsyncWriter writer) { + protected final long closeWriter(AsyncWriter writer, Path path) { if (writer != null) { + inflightWALClosures.put(path.getName(), writer); long fileLength = writer.getLength(); closeExecutor.execute(() -> { try { writer.close(); } catch (IOException e) { LOG.warn("close old writer failed", e); + } finally { + inflightWALClosures.remove(path.getName()); } }); return fileLength; @@ -733,7 +737,7 @@ public class AsyncFSWAL extends AbstractFSWAL { throws IOException { Preconditions.checkNotNull(nextWriter); waitForSafePoint(); - long oldFileLen = closeWriter(this.writer); + long oldFileLen = closeWriter(this.writer, oldPath); logRollAndSetupWalProps(oldPath, newPath, oldFileLen); this.writer = nextWriter; if (nextWriter instanceof AsyncProtobufLogWriter) { @@ -759,7 +763,7 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected void doShutdown() throws IOException { waitForSafePoint(); - closeWriter(this.writer); + closeWriter(this.writer, getOldPath()); this.writer = null; closeExecutor.shutdown(); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index cad0821..3efadc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * The default implementation of FSWAL. + * The original implementation of FSWAL. */ @InterfaceAudience.Private public class FSHLog extends AbstractFSWAL { @@ -380,6 +380,7 @@ public class FSHLog extends AbstractFSWAL { // In case of having unflushed entries or we already reached the // closeErrorsTolerated count, call the closeWriter inline rather than in async // way so that in case of an IOE we will throw it back and abort RS. + inflightWALClosures.put(oldPath.getName(), writer); if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) { closeWriter(this.writer, oldPath, true); } else { @@ -448,6 +449,8 @@ public class FSHLog extends AbstractFSWAL { } LOG.warn("Riding over failed WAL close of " + path + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe); + } finally { + inflightWALClosures.remove(path.getName()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 5e63e5e..53cd084 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -223,10 +223,9 @@ class WALEntryStream implements Closeable { if (trailerSize < 0) { if (currentPositionOfReader < stat.getLen()) { final long skippedBytes = stat.getLen() - currentPositionOfReader; - LOG.debug( - "Reached the end of WAL {}. It was not closed cleanly," + - " so we did not parse {} bytes of data. This is normally ok.", - currentPath, skippedBytes); + // See the commits in HBASE-25924/HBASE-25932 for context. + LOG.warn("Reached the end of WAL {}. It was not closed cleanly," + + " so we did not parse {} bytes of data.", currentPath, skippedBytes); metrics.incrUncleanlyClosedWALs(); metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java new file mode 100644 index 0000000..32d6ec4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +/** + * TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider. + */ +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestFSHLogWALEntryStream extends TestWALEntryStream { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + CONF = TEST_UTIL.getConfiguration(); + CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class); + CONF.setLong("replication.source.sleepforretries", 10); + TEST_UTIL.startMiniDFSCluster(3); + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 5507972..d4bdaaa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -53,11 +53,14 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.replication.WALEntryFilter; @@ -91,10 +94,11 @@ public class TestWALEntryStream { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALEntryStream.class); - private static HBaseTestingUtility TEST_UTIL; - private static Configuration CONF; - private static FileSystem fs; - private static MiniDFSCluster cluster; + private static final long TEST_TIMEOUT_MS = 5000; + protected static HBaseTestingUtility TEST_UTIL; + protected static Configuration CONF; + protected static FileSystem fs; + protected static MiniDFSCluster cluster; private static final TableName tableName = TableName.valueOf("tablename"); private static final byte[] family = Bytes.toBytes("column"); private static final byte[] qualifier = Bytes.toBytes("qualifier"); @@ -103,6 +107,27 @@ public class TestWALEntryStream { private static final NavigableMap scopes = getScopes(); private final String fakeWalGroupId = "fake-wal-group-id"; + /** + * Test helper that waits until a non-null entry is available in the stream next or times out. + */ + private static class WALEntryStreamWithRetries extends WALEntryStream { + // Class member to be able to set a non-final from within a lambda. + private Entry result; + + public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics, String walGroupId) throws IOException { + super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId); + } + + @Override + public Entry next() { + Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> ( + result = WALEntryStreamWithRetries.super.next()) != null); + return result; + } + } + private static NavigableMap getScopes() { NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); scopes.put(family, 1); @@ -148,7 +173,9 @@ public class TestWALEntryStream { @After public void tearDown() throws Exception { - log.close(); + if (log != null) { + log.close(); + } } // Try out different combinations of row count and KeyValue count @@ -215,7 +242,7 @@ public class TestWALEntryStream { appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos, + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log, null, new MetricsSource("1"), fakeWalGroupId)) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); @@ -229,7 +256,7 @@ public class TestWALEntryStream { log.rollWriter(); appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos, + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log, null, new MetricsSource("1"), fakeWalGroupId)) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -255,7 +282,7 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, + new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { assertEquals("1", getRow(entryStream.next())); @@ -530,7 +557,8 @@ public class TestWALEntryStream { @Override public boolean evaluate() throws Exception { - return fs.getFileStatus(walPath).getLen() > 0; + return fs.getFileStatus(walPath).getLen() > 0 && + ((AbstractFSWAL) log).getInflightWALCloseCount() == 0; } @Override @@ -539,12 +567,13 @@ public class TestWALEntryStream { } }); - long walLength = fs.getFileStatus(walPath).getLen(); ReplicationSourceWALReader reader = createReader(false, CONF); WALEntryBatch entryBatch = reader.take(); assertEquals(walPath, entryBatch.getLastWalPath()); + + long walLength = fs.getFileStatus(walPath).getLen(); assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + walLength, entryBatch.getLastWalPosition() <= walLength); assertEquals(1, entryBatch.getNbEntries()); @@ -869,7 +898,7 @@ public class TestWALEntryStream { */ @Test public void testCleanClosedWALs() throws Exception { - try (WALEntryStream entryStream = new WALEntryStream( + try (WALEntryStream entryStream = new WALEntryStreamWithRetries( logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); appendToLogAndSync();