hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject [hbase] branch branch-2 updated: HBASE-24695 FSHLog - close the current WAL file in a background thread. (#2183)
Date Sat, 01 Aug 2020 17:16:53 GMT
This is an automated email from the ASF dual-hosted git repository.

anoopsamjohn pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 86fccba  HBASE-24695 FSHLog - close the current WAL file in a background thread.
(#2183)
86fccba is described below

commit 86fccba0d0c1671ebf71259a3eed3abc43359714
Author: Anoop Sam John <anoopsamjohn@gmail.com>
AuthorDate: Sat Aug 1 22:46:32 2020 +0530

    HBASE-24695 FSHLog - close the current WAL file in a background thread. (#2183)
    
    Signed-off-by: Ramkrishna <ramkrishna@apache.org>
    Signed-off-by: Duo Zhang <zhangduo@apache.org>
---
 .../hadoop/hbase/regionserver/wal/FSHLog.java      | 72 +++++++++++++++++-----
 1 file changed, 56 insertions(+), 16 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 902d354..168cbed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -35,6 +35,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -62,6 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * The default implementation of FSWAL.
@@ -115,6 +118,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
   private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count";
   private static final int DEFAULT_MAX_BATCH_COUNT = 200;
 
+  private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = "hbase.wal.fshlog.wait.on.shutdown.seconds";
+  private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
+
   /**
    * The nexus at which all incoming handlers meet. Does appends and sync with an ordering.
Appends
    * and syncs are each put on the ring which means handlers need to smash up against the
ring twice
@@ -160,6 +166,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
 
   private final AtomicInteger closeErrorCount = new AtomicInteger();
 
+  private final int waitOnShutdownInSeconds;
+  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
+
   /**
    * Exception handler to pass the disruptor ringbuffer. Same as native implementation only
it logs
    * using our logger instead of java native logger.
@@ -224,7 +234,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       CommonFSUtils.getDefaultReplication(fs, this.walDir));
     this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
     this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED);
-
+    this.waitOnShutdownInSeconds = conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS,
+        DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
     // This is the 'writer' -- a single threaded executor. This single thread 'consumes'
what is
     // put on the ring buffer.
     String hostingThreadName = Thread.currentThread().getName();
@@ -355,23 +366,22 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       }
       long oldFileLen = 0L;
       // It is at the safe point. Swap out writer from under the blocked writer thread.
-      // TODO: This is close is inline with critical section. Should happen in background?
       if (this.writer != null) {
         oldFileLen = this.writer.getLength();
-        try {
-          TraceUtil.addTimelineAnnotation("closing writer");
-          this.writer.close();
-          TraceUtil.addTimelineAnnotation("writer closed");
-          this.closeErrorCount.set(0);
-        } catch (IOException ioe) {
-          int errors = closeErrorCount.incrementAndGet();
-          if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated))
{
-            LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + ioe.getMessage()
-                + "\", errors=" + errors
-                + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
-          } else {
-            throw ioe;
-          }
+        // In case of having unflushed entries or we already reached the
+        // closeErrorsTolerated count, call the closeWriter inline rather than in async
+        // way so that in case of an IOE we will throw it back and abort RS.
+        if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated)
{
+          closeWriter(this.writer, oldPath, true);
+        } else {
+          Writer localWriter = this.writer;
+          closeExecutor.execute(() -> {
+            try {
+              closeWriter(localWriter, oldPath, false);
+            } catch (IOException e) {
+              // We will never reach here.
+            }
+          });
         }
       }
       logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
@@ -413,6 +423,24 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     }
   }
 
+  private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException
{
+    try {
+      TraceUtil.addTimelineAnnotation("closing writer");
+      writer.close();
+      TraceUtil.addTimelineAnnotation("writer closed");
+    } catch (IOException ioe) {
+      int errors = closeErrorCount.incrementAndGet();
+      boolean hasUnflushedEntries = isUnflushedEntries();
+      if (syncCloseCall && (hasUnflushedEntries || (errors > this.closeErrorsTolerated)))
{
+        LOG.error("Close of WAL " + path + " failed. Cause=\"" + ioe.getMessage() + "\",
errors="
+            + errors + ", hasUnflushedEntries=" + hasUnflushedEntries);
+        throw ioe;
+      }
+      LOG.warn("Riding over failed WAL close of " + path
+          + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
+    }
+  }
+
   @Override
   protected void doShutdown() throws IOException {
     // Shutdown the disruptor. Will stop after all entries have been processed. Make sure
we
@@ -437,6 +465,18 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       this.writer.close();
       this.writer = null;
     }
+    closeExecutor.shutdown();
+    try {
+      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
+        LOG.error("We have waited {} seconds but the close of writer(s) doesn't complete."
+            + "Please check the status of underlying filesystem"
+            + " or increase the wait time by the config \"{}\"", this.waitOnShutdownInSeconds,
+            FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
+      }
+    } catch (InterruptedException e) {
+      LOG.error("The wait for termination of FSHLog writer(s) is interrupted");
+      Thread.currentThread().interrupt();
+    }
   }
 
   @Override


Mime
View raw message