hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhara...@apache.org
Subject [hbase] branch master updated: HBASE-25932: Ensure replication reads the trailer bytes from WAL. (#3332)
Date Tue, 01 Jun 2021 05:13:16 GMT
This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new b04c3c7  HBASE-25932: Ensure replication reads the trailer bytes from WAL. (#3332)
b04c3c7 is described below

commit b04c3c77860fe430308a6bfd7dd8aebe1ad3510c
Author: Bharath Vissapragada <bharathv@apache.org>
AuthorDate: Mon May 31 22:12:47 2021 -0700

    HBASE-25932: Ensure replication reads the trailer bytes from WAL. (#3332)
    
    This bug was exposed by the test from HBASE-25924. Since this wal
    implementations close the wal asynchronously, replication can potentially
    miss the trailer bytes. (see jira comment for detailed analysis).
    
    While this is not a correctness problem (since trailer does not have any entry data),
    it erroneously bumps a metric that is used to track skipped bytes in WAL resulting
    in false alarms which is something we should avoid.
    
    Reviewed-by: Rushabh Shah <rushabh.shah@salesforce.com>
    Signed-off-by: Viraj Jasani <vjasani@apache.org>
    Signed-off-by Anoop Sam John <anoopsamjohn@apache.org>
---
 .../hbase/regionserver/wal/AbstractFSWAL.java      | 20 +++++++++
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  | 10 +++--
 .../hadoop/hbase/regionserver/wal/FSHLog.java      |  5 ++-
 .../replication/regionserver/WALEntryStream.java   |  7 ++-
 .../regionserver/TestFSHLogWALEntryStream.java     | 51 ++++++++++++++++++++++
 .../regionserver/TestWALEntryStream.java           | 51 +++++++++++++++++-----
 6 files changed, 125 insertions(+), 19 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index b569779..52f3f71 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -192,6 +193,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
   /** Listeners that are called on WAL events. */
   protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
 
+  /** Tracks the logs in the process of being closed. */
+  protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
+
   /**
    * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding
sequence
    * id as yet not flushed as well as the most recent edit sequence id appended to the WAL.
Has
@@ -1028,6 +1032,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
   }
 
   /**
+   * @return number of WALs currently in the process of closing.
+   */
+  public int getInflightWALCloseCount() {
+    return inflightWALClosures.size();
+  }
+
+  /**
    * updates the sequence number of a specific store. depending on the flag: replaces current
seq
    * number if the given seq id is bigger, or even if it is lower than existing one
    */
@@ -1190,9 +1201,18 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements
WAL {
     try {
       Path currentPath = getOldPath();
       if (path.equals(currentPath)) {
+        // Currently active path.
         W writer = this.writer;
         return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
       } else {
+        W temp = inflightWALClosures.get(path.getName());
+        if (temp != null) {
+          // In the process of being closed, trailer bytes may or may not be flushed.
+          // Ensuring that we read all the bytes in a file is critical for correctness of
tailing
+          // use cases like replication, see HBASE-25924/HBASE-25932.
+          return OptionalLong.of(temp.getSyncedLength());
+        }
+        // Log rolled successfully.
         return OptionalLong.empty();
       }
     } finally {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index c11496b..4b0f041 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -31,6 +31,7 @@ import java.util.Comparator;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
+import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -712,14 +713,17 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
-  protected final long closeWriter(AsyncWriter writer) {
+  protected final long closeWriter(AsyncWriter writer, Path path) {
     if (writer != null) {
+      inflightWALClosures.put(path.getName(), writer);
       long fileLength = writer.getLength();
       closeExecutor.execute(() -> {
         try {
           writer.close();
         } catch (IOException e) {
           LOG.warn("close old writer failed", e);
+        } finally {
+          inflightWALClosures.remove(path.getName());
         }
       });
       return fileLength;
@@ -733,7 +737,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       throws IOException {
     Preconditions.checkNotNull(nextWriter);
     waitForSafePoint();
-    long oldFileLen = closeWriter(this.writer);
+    long oldFileLen = closeWriter(this.writer, oldPath);
     logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
     this.writer = nextWriter;
     if (nextWriter instanceof AsyncProtobufLogWriter) {
@@ -759,7 +763,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   @Override
   protected void doShutdown() throws IOException {
     waitForSafePoint();
-    closeWriter(this.writer);
+    closeWriter(this.writer, getOldPath());
     this.writer = null;
     closeExecutor.shutdown();
     try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index cad0821..3efadc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * The default implementation of FSWAL.
+ * The original implementation of FSWAL.
  */
 @InterfaceAudience.Private
 public class FSHLog extends AbstractFSWAL<Writer> {
@@ -380,6 +380,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
         // In case of having unflushed entries or we already reached the
         // closeErrorsTolerated count, call the closeWriter inline rather than in async
         // way so that in case of an IOE we will throw it back and abort RS.
+        inflightWALClosures.put(oldPath.getName(), writer);
         if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated)
{
           closeWriter(this.writer, oldPath, true);
         } else {
@@ -448,6 +449,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       }
       LOG.warn("Riding over failed WAL close of " + path
           + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
+    } finally {
+      inflightWALClosures.remove(path.getName());
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 5e63e5e..53cd084 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -223,10 +223,9 @@ class WALEntryStream implements Closeable {
       if (trailerSize < 0) {
         if (currentPositionOfReader < stat.getLen()) {
           final long skippedBytes = stat.getLen() - currentPositionOfReader;
-          LOG.debug(
-            "Reached the end of WAL {}. It was not closed cleanly," +
-              " so we did not parse {} bytes of data. This is normally ok.",
-            currentPath, skippedBytes);
+          // See the commits in HBASE-25924/HBASE-25932 for context.
+          LOG.warn("Reached the end of WAL {}. It was not closed cleanly," +
+              " so we did not parse {} bytes of data.", currentPath, skippedBytes);
           metrics.incrUncleanlyClosedWALs();
           metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
         }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
new file mode 100644
index 0000000..32d6ec4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+/**
+ * TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL
provider.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestFSHLogWALEntryStream extends TestWALEntryStream {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    CONF = TEST_UTIL.getConfiguration();
+    CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class);
+    CONF.setLong("replication.source.sleepforretries", 10);
+    TEST_UTIL.startMiniDFSCluster(3);
+    cluster = TEST_UTIL.getDFSCluster();
+    fs = cluster.getFileSystem();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 5507972..d4bdaaa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -53,11 +53,14 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -91,10 +94,11 @@ public class TestWALEntryStream {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestWALEntryStream.class);
 
-  private static HBaseTestingUtility TEST_UTIL;
-  private static Configuration CONF;
-  private static FileSystem fs;
-  private static MiniDFSCluster cluster;
+  private static final long TEST_TIMEOUT_MS = 5000;
+  protected static HBaseTestingUtility TEST_UTIL;
+  protected static Configuration CONF;
+  protected static FileSystem fs;
+  protected static MiniDFSCluster cluster;
   private static final TableName tableName = TableName.valueOf("tablename");
   private static final byte[] family = Bytes.toBytes("column");
   private static final byte[] qualifier = Bytes.toBytes("qualifier");
@@ -103,6 +107,27 @@ public class TestWALEntryStream {
   private static final NavigableMap<byte[], Integer> scopes = getScopes();
   private final String fakeWalGroupId = "fake-wal-group-id";
 
+  /**
+   * Test helper that waits until a non-null entry is available in the stream next or times
out.
+   */
+  private static class WALEntryStreamWithRetries extends WALEntryStream {
+    // Class member to be able to set a non-final from within a lambda.
+    private Entry result;
+
+    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
+         long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
+         MetricsSource metrics, String walGroupId) throws IOException {
+      super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
+    }
+
+    @Override
+    public Entry next() {
+      Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> (
+          result = WALEntryStreamWithRetries.super.next()) != null);
+      return result;
+    }
+  }
+
   private static NavigableMap<byte[], Integer> getScopes() {
     NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     scopes.put(family, 1);
@@ -148,7 +173,9 @@ public class TestWALEntryStream {
 
   @After
   public void tearDown() throws Exception {
-    log.close();
+    if (log != null) {
+      log.close();
+    }
   }
 
   // Try out different combinations of row count and KeyValue count
@@ -215,7 +242,7 @@ public class TestWALEntryStream {
 
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
         log, null, new MetricsSource("1"), fakeWalGroupId)) {
       // Read the newly added entry, make sure we made progress
       WAL.Entry entry = entryStream.next();
@@ -229,7 +256,7 @@ public class TestWALEntryStream {
     log.rollWriter();
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
         log, null, new MetricsSource("1"), fakeWalGroupId)) {
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
@@ -255,7 +282,7 @@ public class TestWALEntryStream {
     appendToLog("1");
     appendToLog("2");// 2
     try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, CONF, 0, log, null,
+        new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
           new MetricsSource("1"), fakeWalGroupId)) {
       assertEquals("1", getRow(entryStream.next()));
 
@@ -530,7 +557,8 @@ public class TestWALEntryStream {
 
       @Override
       public boolean evaluate() throws Exception {
-        return fs.getFileStatus(walPath).getLen() > 0;
+        return fs.getFileStatus(walPath).getLen() > 0 &&
+            ((AbstractFSWAL) log).getInflightWALCloseCount() == 0;
       }
 
       @Override
@@ -539,12 +567,13 @@ public class TestWALEntryStream {
       }
 
     });
-    long walLength = fs.getFileStatus(walPath).getLen();
 
     ReplicationSourceWALReader reader = createReader(false, CONF);
 
     WALEntryBatch entryBatch = reader.take();
     assertEquals(walPath, entryBatch.getLastWalPath());
+
+    long walLength = fs.getFileStatus(walPath).getLen();
     assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length
is " +
       walLength, entryBatch.getLastWalPosition() <= walLength);
     assertEquals(1, entryBatch.getNbEntries());
@@ -869,7 +898,7 @@ public class TestWALEntryStream {
    */
   @Test
   public void testCleanClosedWALs() throws Exception {
-    try (WALEntryStream entryStream = new WALEntryStream(
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(
       logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
       assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
       appendToLogAndSync();

Mime
View raw message