hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [1/5] hbase git commit: HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll
Date Wed, 19 Oct 2016 02:41:42 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 66941910b -> 019c7f930
  refs/heads/branch-1.1 382f88ae8 -> 4e304b3f9
  refs/heads/branch-1.2 bcc74e5ee -> 571814425
  refs/heads/branch-1.3 d38310aa4 -> c51722629
  refs/heads/master 6c89c6251 -> ef8c65e54


HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef8c65e5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef8c65e5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef8c65e5

Branch: refs/heads/master
Commit: ef8c65e54201b37edfb9a8f4f4d24137544b8ec1
Parents: 6c89c62
Author: Enis Soztutar <enis@apache.org>
Authored: Tue Oct 18 18:46:02 2016 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Tue Oct 18 18:46:02 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 48 +++++++++++++++-----
 .../wal/TestLogRollingNoCluster.java            | 42 +++++++++++------
 2 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ef8c65e5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 3e0e829..142ab63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -30,15 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -68,6 +59,15 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
 /**
  * The default implementation of FSWAL.
  */
@@ -499,6 +499,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
+    private volatile SyncFuture takeSyncFuture = null;
 
     /**
      * UPDATE!
@@ -546,6 +547,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       if (!syncFuture.done(currentSequence, t)) {
         throw new IllegalStateException();
       }
+
       // This function releases one sync future only.
       return 1;
     }
@@ -589,13 +591,21 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       return sequence;
     }
 
+    boolean areSyncFuturesReleased() {
+      // check whether there is no sync futures offered, and no in-flight sync futures that
is being
+      // processed.
+      return syncFutures.size() <= 0
+          && takeSyncFuture == null;
+    }
+
     public void run() {
       long currentSequence;
       while (!isInterrupted()) {
         int syncCount = 0;
-        SyncFuture takeSyncFuture;
+
         try {
           while (true) {
+            takeSyncFuture = null;
             // We have to process what we 'take' from the queue
             takeSyncFuture = this.syncFutures.take();
             currentSequence = this.sequence;
@@ -975,11 +985,23 @@ public class FSHLog extends AbstractFSWAL<Writer> {
      * @return True if outstanding sync futures still
      */
     private boolean isOutstandingSyncs() {
+      // Look at SyncFutures in the EventHandler
       for (int i = 0; i < this.syncFuturesCount; i++) {
         if (!this.syncFutures[i].isDone()) {
           return true;
         }
       }
+
+      return false;
+    }
+
+    private boolean isOutstandingSyncsFromRunners() {
+      // Look at SyncFutures in the SyncRunners
+      for (SyncRunner syncRunner: syncRunners) {
+        if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) {
+          return true;
+        }
+      }
       return false;
     }
 
@@ -1095,11 +1117,13 @@ public class FSHLog extends AbstractFSWAL<Writer> {
         // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
         // shutdown or unless our latch has been thrown because we have been aborted or unless
         // this WAL is broken and we can't get a sync/append to complete).
-        while (!this.shutdown && this.zigzagLatch.isCocked()
+        while ((!this.shutdown && this.zigzagLatch.isCocked()
             && highestSyncedTxid.get() < currentSequence &&
             // We could be in here and all syncs are failing or failed. Check for this. Otherwise
             // we'll just be stuck here for ever. In other words, ensure there syncs running.
-        isOutstandingSyncs()) {
+            isOutstandingSyncs())
+            // Wait for all SyncRunners to finish their work so that we can replace the writer
+            || isOutstandingSyncsFromRunners()) {
           synchronized (this.safePointWaiter) {
             this.safePointWaiter.wait(0, 1);
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef8c65e5/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index eda7df7..7412128 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -18,11 +18,10 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertFalse;
-
 import java.io.IOException;
 import java.util.NavigableMap;
 import java.util.TreeMap;
-
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -56,7 +56,18 @@ public class TestLogRollingNoCluster {
       withLookingForStuckThread(true).build();
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
-  private static final int THREAD_COUNT = 100; // Spin up this many threads
+  private static final int NUM_THREADS = 100; // Spin up this many threads
+  private static final int NUM_ENTRIES = 100; // How many entries to write
+
+  /** ProtobufLogWriter that simulates higher latencies in sync() call */
+  public static class HighLatencySyncWriter extends  ProtobufLogWriter {
+    @Override
+    public void sync() throws IOException {
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+      super.sync();
+      Threads.sleep(ThreadLocalRandom.current().nextInt(10));
+    }
+  }
 
   /**
    * Spin up a bunch of threads and have them all append to a WAL.  Roll the
@@ -65,38 +76,42 @@ public class TestLogRollingNoCluster {
    * @throws InterruptedException
    */
   @Test
-  public void testContendedLogRolling() throws IOException, InterruptedException {
-    Path dir = TEST_UTIL.getDataTestDir();
+  public void testContendedLogRolling() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(3);
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS();
+
     // The implementation needs to know the 'handler' count.
-    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
+    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
     final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.set(WALFactory.WAL_PROVIDER, "filesystem");
     FSUtils.setRootDir(conf, dir);
+    conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
     final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
     final WAL wal = wals.getWAL(new byte[]{}, null);
     
     Appender [] appenders = null;
 
-    final int count = THREAD_COUNT;
-    appenders = new Appender[count];
+    final int numThreads = NUM_THREADS;
+    appenders = new Appender[numThreads];
     try {
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         // Have each appending thread write 'count' entries
-        appenders[i] = new Appender(wal, i, count);
+        appenders[i] = new Appender(wal, i, NUM_ENTRIES);
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         appenders[i].start();
       }
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < numThreads; i++) {
         //ensure that all threads are joined before closing the wal
         appenders[i].join();
       }
     } finally {
       wals.close();
     }
-    for (int i = 0; i < count; i++) {
+    for (int i = 0; i < numThreads; i++) {
       assertFalse(appenders[i].isException());
     }
+    TEST_UTIL.shutdownMiniDFSCluster();
   }
 
   /**
@@ -149,6 +164,7 @@ public class TestLogRollingNoCluster {
           }
           final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
               TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true);
+          Threads.sleep(ThreadLocalRandom.current().nextInt(5));
           wal.sync(txid);
         }
         String msg = getName() + " finished";


Mime
View raw message