hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-14229 Flushing canceled by coprocessor still leads to memstoreSize set down (Yerui Sun)
Date Wed, 02 Sep 2015 14:29:07 GMT
Repository: hbase
Updated Branches:
  refs/heads/master f8dd99d73 -> 15a88d2e1


HBASE-14229 Flushing canceled by coprocessor still leads to memstoreSize set down (Yerui Sun)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15a88d2e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15a88d2e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15a88d2e

Branch: refs/heads/master
Commit: 15a88d2e1bd03ce26eb3e9f39ba31e95c2b78fc6
Parents: f8dd99d
Author: tedyu <yuzhihong@gmail.com>
Authored: Wed Sep 2 07:29:00 2015 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Wed Sep 2 07:29:00 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 25 ++++++---
 .../hadoop/hbase/regionserver/TestHRegion.java  | 56 ++++++++++++++++----
 2 files changed, 64 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/15a88d2e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 2293311..e21942c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -521,6 +521,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     final FlushResult result; // indicating a failure result from prepare
     final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
     final TreeMap<byte[], List<Path>> committedFiles;
+    final TreeMap<byte[], Long> storeFlushableSize;
     final long startTime;
     final long flushOpSeqId;
     final long flushedSeqId;
@@ -528,26 +529,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
 
     /** Constructs an early exit case */
     PrepareFlushResult(FlushResult result, long flushSeqId) {
-      this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0);
+      this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
     }
 
     /** Constructs a successful prepare flush result */
     PrepareFlushResult(
       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
-      TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
+      TreeMap<byte[], List<Path>> committedFiles,
+      TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
       long flushedSeqId, long totalFlushableSize) {
-      this(null, storeFlushCtxs, committedFiles, startTime,
+      this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
         flushSeqId, flushedSeqId, totalFlushableSize);
     }
 
     private PrepareFlushResult(
       FlushResult result,
       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
-      TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
+      TreeMap<byte[], List<Path>> committedFiles,
+      TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
       long flushedSeqId, long totalFlushableSize) {
       this.result = result;
       this.storeFlushCtxs = storeFlushCtxs;
       this.committedFiles = committedFiles;
+      this.storeFlushableSize = storeFlushableSize;
       this.startTime = startTime;
       this.flushOpSeqId = flushSeqId;
       this.flushedSeqId = flushedSeqId;
@@ -2156,6 +2160,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
         Bytes.BYTES_COMPARATOR);
+    TreeMap<byte[], Long> storeFlushableSize
+        = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
     // The sequence id of this flush operation which is used to log FlushMarker and pass
to
     // createFlushContext to use as the store file's sequence id. It can be in advance of
edits
     // still in the memstore, edits that are in other column families yet to be flushed.
@@ -2194,6 +2200,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           totalFlushableSizeOfFlushableStores += s.getFlushableSize();
           storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
+          storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
         }
 
         // write the snapshot start to WAL
@@ -2260,7 +2267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         mvcc.advanceMemstore(w);
       }
     }
-    return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId,
+    return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
flushOpSeqId,
       flushedSeqId, totalFlushableSizeOfFlushableStores);
   }
 
@@ -2335,7 +2342,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         if (needsCompaction) {
           compactionRequested = true;
         }
-        committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
+        byte[] storeName = it.next().getFamily().getName();
+        List<Path> storeCommittedFiles = flush.getCommittedFiles();
+        committedFiles.put(storeName, storeCommittedFiles);
+        // Flush committed no files, indicating flush is empty or flush was canceled
+        if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
+          totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
+        }
       }
       storeFlushCtxs.clear();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15a88d2e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 826c9b3..b186f0e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -33,10 +33,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -131,12 +128,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
 import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
 import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.regionserver.wal.*;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -321,7 +313,7 @@ public class TestHRegion {
     Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
     MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize",
CONF);
     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
-      CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
+        CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
 
     Store store = region.getStore(COLUMN_FAMILY_BYTES);
     // Get some random bytes.
@@ -363,6 +355,48 @@ public class TestHRegion {
   }
 
   /**
+   * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set
down
+   */
+  @Test
+  public void testMemstoreSizeWithFlushCanceling() throws IOException {
+    FileSystem fs = FileSystem.get(CONF);
+    Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling");
+    FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF);
+    HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
+        CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
+    Store store = region.getStore(COLUMN_FAMILY_BYTES);
+    assertEquals(0, region.getMemstoreSize());
+
+    // Put some value and make sure flush could be completed normally
+    byte [] value = Bytes.toBytes(name.getMethodName());
+    Put put = new Put(value);
+    put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
+    region.put(put);
+    long onePutSize = region.getMemstoreSize();
+    assertTrue(onePutSize > 0);
+    region.flush(true);
+    assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
+    assertEquals("flushable size should be zero", 0, store.getFlushableSize());
+
+    // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
+    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
+    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
+    when(mockedCPHost.preFlush(isA(HStore.class), isA(InternalScanner.class))).thenReturn(null);
+    region.setCoprocessorHost(mockedCPHost);
+    region.put(put);
+    region.flush(true);
+    assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemstoreSize());
+    assertEquals("flushable size should NOT be zero", onePutSize, store.getFlushableSize());
+
+    // set normalCPHost and flush again, the snapshot will be flushed
+    region.setCoprocessorHost(normalCPHost);
+    region.flush(true);
+    assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
+    assertEquals("flushable size should be zero", 0, store.getFlushableSize());
+    HBaseTestingUtility.closeRegionAndWAL(region);
+  }
+
+  /**
    * Test we do not lose data if we fail a flush and then close.
    * Part of HBase-10466.  Tests the following from the issue description:
    * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed
is


Mime
View raw message