hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-16721 Concurrency issue in WAL unflushed seqId tracking
Date Thu, 29 Sep 2016 20:45:13 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 76396714e -> bf3c928b7


HBASE-16721 Concurrency issue in WAL unflushed seqId tracking


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bf3c928b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bf3c928b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bf3c928b

Branch: refs/heads/master
Commit: bf3c928b7499797735f71974992b68c9d876b97c
Parents: 7639671
Author: Enis Soztutar <enis@apache.org>
Authored: Thu Sep 29 12:56:22 2016 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Thu Sep 29 12:56:22 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |   2 +-
 .../regionserver/wal/AbstractTestFSWAL.java     |   2 +-
 .../hbase/regionserver/wal/TestFSHLog.java      | 105 +++++++++++++++++++
 3 files changed, 107 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bf3c928b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 79321b3..20ae602 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface WAL {
+public interface WAL extends AutoCloseable {
 
   /**
    * Registers WALActionsListener

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf3c928b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
index 9eaeda4..19759d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -72,7 +72,7 @@ import org.junit.rules.TestName;
 
 public abstract class AbstractTestFSWAL {
 
-  private static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class);
+  protected static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class);
 
   protected static Configuration CONF;
   protected static FileSystem FS;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bf3c928b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index bf56afe..640e851 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -23,6 +23,10 @@ import java.lang.reflect.Field;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,14 +36,21 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Provides FSHLog test cases.
  */
@@ -101,4 +112,98 @@ public class TestFSHLog extends AbstractTestFSWAL {
       log.close();
     }
   }
+
+  /**
+   * Test case for https://issues.apache.org/jira/browse/HBASE-16721
+   */
+  @Test (timeout = 30000)
+  public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
+    final String name = "testSyncRunnerIndexOverflow";
+    final byte[] b = Bytes.toBytes("b");
+
+    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
+    final CountDownLatch holdAppend = new CountDownLatch(1);
+    final CountDownLatch flushFinished = new CountDownLatch(1);
+    final CountDownLatch putFinished = new CountDownLatch(1);
+
+    try (FSHLog log =
+        new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME,
CONF,
+            null, true, null, null)) {
+
+      log.registerWALActionsListener(new WALActionsListener.Base() {
+        @Override
+        public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)
+            throws IOException {
+          if (startHoldingForAppend.get()) {
+            try {
+              holdAppend.await();
+            } catch (InterruptedException e) {
+              LOG.error(e);
+            }
+          }
+        }
+      });
+
+      // open a new region which uses this WAL
+      HTableDescriptor htd =
+          new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor(b));
+      HRegionInfo hri =
+          new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+
+      final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
+      ExecutorService exec = Executors.newFixedThreadPool(2);
+
+      // do a regular write first because of memstore size calculation.
+      region.put(new Put(b).addColumn(b, b,b));
+
+      startHoldingForAppend.set(true);
+      exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            region.put(new Put(b).addColumn(b, b,b));
+            putFinished.countDown();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      });
+
+      // give the put a chance to start
+      Threads.sleep(3000);
+
+      exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            Region.FlushResult flushResult = region.flush(true);
+            LOG.info("Flush result:" +  flushResult.getResult());
+            LOG.info("Flush succeeded:" +  flushResult.isFlushSucceeded());
+            flushFinished.countDown();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      });
+
+      // give the flush a chance to start. Flush should have got the region lock, and
+      // should have been waiting on the mvcc complete after this.
+      Threads.sleep(3000);
+
+      // let the append to WAL go through now that the flush already started
+      holdAppend.countDown();
+      putFinished.await();
+      flushFinished.await();
+
+      // check whether flush went through
+      assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size());
+
+      // now check the region's unflushed seqIds.
+      long seqId = log.getEarliestMemstoreSeqNum(hri.getEncodedNameAsBytes());
+      assertEquals("Found seqId for the region which is already flushed",
+          HConstants.NO_SEQNUM, seqId);
+
+      region.close();
+    }
+  }
 }


Mime
View raw message