hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/2] hbase git commit: HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS
Date Mon, 08 Jun 2015 17:39:21 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 d34e9c5c5 -> 2baf3bfc9


http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index c55280b..7fbb285 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -124,6 +124,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
 
 /**
  * This class is responsible for splitting up a bunch of regionserver commit log
@@ -324,15 +325,19 @@ public class WALSplitter {
       failedServerName = (serverName == null) ? "" : serverName.getServerName();
       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
         byte[] region = entry.getKey().getEncodedRegionName();
-        String key = Bytes.toString(region);
-        lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
+        String encodedRegionNameAsStr = Bytes.toString(region);
+        lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
         if (lastFlushedSequenceId == null) {
           if (this.distributedLogReplay) {
             RegionStoreSequenceIds ids =
                 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
-                  key);
+                  encodedRegionNameAsStr);
             if (ids != null) {
               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ":
" +
+                  TextFormat.shortDebugString(ids));
+              }
             }
           } else if (sequenceIdChecker != null) {
             RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
@@ -341,13 +346,17 @@ public class WALSplitter {
               maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
                 storeSeqId.getSequenceId());
             }
-            regionMaxSeqIdInStores.put(key, maxSeqIdInStores);
+            regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
             lastFlushedSequenceId = ids.getLastFlushedSequenceId();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ":
" +
+                  TextFormat.shortDebugString(ids));
+            }
           }
           if (lastFlushedSequenceId == null) {
             lastFlushedSequenceId = -1L;
           }
-          lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
+          lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
         }
         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
           editsSkipped++;
@@ -1071,7 +1080,7 @@ public class WALSplitter {
     }
 
     private void doRun() throws IOException {
-      LOG.debug("Writer thread " + this + ": starting");
+      if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
       while (true) {
         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
         if (buffer == null) {
@@ -1226,7 +1235,8 @@ public class WALSplitter {
         }
       }
       controller.checkForErrors();
-      LOG.info("Split writers finished");
+      LOG.info((this.writerThreads == null? 0: this.writerThreads.size()) +
+        " split writers finished; closing...");
       return (!progress_failed);
     }
 
@@ -1317,12 +1327,14 @@ public class WALSplitter {
       CompletionService<Void> completionService =
         new ExecutorCompletionService<Void>(closeThreadPool);
       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
-        LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
+        }
         completionService.submit(new Callable<Void>() {
           @Override
           public Void call() throws Exception {
             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
-            LOG.debug("Closing " + wap.p);
+            if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
             try {
               wap.w.close();
             } catch (IOException ioe) {
@@ -1330,8 +1342,8 @@ public class WALSplitter {
               thrown.add(ioe);
               return null;
             }
-            LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
-                + (wap.nanosSpent / 1000 / 1000) + "ms)");
+            LOG.info("Closed " + wap.p + "; wrote " + wap.editsWritten + " edit(s) in "
+                + (wap.nanosSpent / 1000 / 1000) + "ms");
 
             if (wap.editsWritten == 0) {
               // just remove the empty recovered.edits file
@@ -1490,8 +1502,8 @@ public class WALSplitter {
         }
       }
       Writer w = createWriter(regionedits);
-      LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
-      return (new WriterAndPath(regionedits, w));
+      LOG.debug("Creating writer path=" + regionedits);
+      return new WriterAndPath(regionedits, w);
     }
 
     private void filterCellByStore(Entry logEntry) {
@@ -1505,6 +1517,7 @@ public class WALSplitter {
         if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
           byte[] family = CellUtil.cloneFamily(cell);
           Long maxSeqId = maxSeqIdInStores.get(family);
+          LOG.info("CHANGE REMOVE " + Bytes.toString(family) + ", max=" + maxSeqId);
           // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
           // or the master was crashed before and we can not get the information.
           if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum())
{
@@ -1544,9 +1557,9 @@ public class WALSplitter {
           filterCellByStore(logEntry);
           if (!logEntry.getEdit().isEmpty()) {
             wap.w.append(logEntry);
+            this.updateRegionMaximumEditLogSeqNum(logEntry);
+            editsCount++;
           }
-          this.updateRegionMaximumEditLogSeqNum(logEntry);
-          editsCount++;
         }
         // Pass along summary statistics
         wap.incrementEdits(editsCount);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index 9a9d784..b328e57 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.ipc;
 
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.security.UserProvider;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
index 5e6bff8..abb6520 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
@@ -91,6 +91,7 @@ public class TestGetLastFlushedSequenceId {
         testUtil.getHBaseCluster().getMaster()
             .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
     assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId());
+    // This will be the sequenceid just before that of the earliest edit in memstore.
     long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId();
     assertTrue(storeSequenceId > 0);
     testUtil.getHBaseAdmin().flush(tableName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index b61416c..6b342d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -231,6 +231,35 @@ public class TestHRegion {
   }
 
   /**
+   * Test that I can use the max flushed sequence id after the close.
+   * @throws IOException
+   */
+  @Test (timeout = 100000)
+  public void testSequenceId() throws IOException {
+    HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
+    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
+    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
+    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
+    region.close();
+    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
+    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
+    // Open region again.
+    region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
+    byte [] value = Bytes.toBytes(name.getMethodName());
+    // Make a random put against our cf.
+    Put put = new Put(value);
+    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
+    region.put(put);
+    // No flush yet so init numbers should still be in place.
+    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
+    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
+    region.flush(true);
+    long max = region.getMaxFlushedSeqId();
+    region.close();
+    assertEquals(max, region.getMaxFlushedSeqId());
+  }
+
+  /**
    * Test for Bug 2 of HBASE-10466.
    * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
    * is smaller than a certain value, or when region close starts a flush is ongoing, the
first

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
new file mode 100644
index 0000000..92e0558
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
+import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mortbay.log.Log;
+
+/**
+ * Testcase for https://issues.apache.org/jira/browse/HBASE-13811
+ */
+@Category({ MediumTests.class })
+public class TestSplitWalDataLoss {
+
+  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+
+  private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
+      .build();
+
+  private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
+
+  private byte[] family = Bytes.toBytes("f");
+
+  private byte[] qualifier = Bytes.toBytes("q");
+
+  @Before
+  public void setUp() throws Exception {
+    testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
+    testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+    testUtil.startMiniCluster(2);
+    HBaseAdmin admin = testUtil.getHBaseAdmin();
+    admin.createNamespace(namespace);
+    admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
+    testUtil.waitTableAvailable(tableName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    testUtil.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
+    final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
+    HRegion spiedRegion = spy(region);
+    final MutableBoolean flushed = new MutableBoolean(false);
+    final MutableBoolean reported = new MutableBoolean(false);
+    doAnswer(new Answer<FlushResult>() {
+      @Override
+      public FlushResult answer(InvocationOnMock invocation) throws Throwable {
+        synchronized (flushed) {
+          flushed.setValue(true);
+          flushed.notifyAll();
+        }
+        synchronized (reported) {
+          while (!reported.booleanValue()) {
+            reported.wait();
+          }
+        }
+        rs.getWAL(region.getRegionInfo()).abortCacheFlush(
+          region.getRegionInfo().getEncodedNameAsBytes());
+        throw new DroppedSnapshotException("testcase");
+      }
+    }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
+      Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
+      Matchers.<Collection<Store>> any());
+    rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion);
+    Connection conn = testUtil.getConnection();
+
+    try (Table table = conn.getTable(tableName)) {
+      table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
+    }
+    long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
+    Log.info("CHANGE OLDEST " + oldestSeqIdOfStore);
+    assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
+    rs.cacheFlusher.requestFlush(spiedRegion, false);
+    synchronized (flushed) {
+      while (!flushed.booleanValue()) {
+        flushed.wait();
+      }
+    }
+    try (Table table = conn.getTable(tableName)) {
+      table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
+    }
+    long now = EnvironmentEdgeManager.currentTime();
+    rs.tryRegionServerReport(now - 500, now);
+    synchronized (reported) {
+      reported.setValue(true);
+      reported.notifyAll();
+    }
+    while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
+      Thread.sleep(100);
+    }
+    try (Table table = conn.getTable(tableName)) {
+      Result result = table.get(new Get(Bytes.toBytes("row0")));
+      assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index cc5191c..b3b520a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -27,9 +26,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,7 +47,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -58,6 +54,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -320,53 +317,6 @@ public class TestFSHLog {
     }
   }
 
-  /**
-   * Simulates WAL append ops for a region and tests
-   * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
-   * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
-   * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then
the
-   * region should be flushed before archiving this WAL.
-  */
-  @Test
-  public void testAllRegionsFlushed() {
-    LOG.debug("testAllRegionsFlushed");
-    Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
-    Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
-    Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
-    // create a table
-    TableName t1 = TableName.valueOf("t1");
-    // create a region
-    HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-    // variables to mock region sequenceIds
-    final AtomicLong sequenceId1 = new AtomicLong(1);
-    // test empty map
-    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
-    // add entries in the region
-    seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
-    oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
-    // should say region1 is not flushed.
-    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
-    // test with entries in oldestFlushing map.
-    oldestUnFlushedSeqNo.clear();
-    oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
-    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
-    // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
-    oldestFlushingSeqNo.clear();
-    oldestUnFlushedSeqNo.clear();
-    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
-    // insert some large values for region1
-    oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
-    seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
-    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
-
-    // tests when oldestUnFlushed/oldestFlushing contains larger value.
-    // It means region is flushed.
-    oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
-    oldestUnFlushedSeqNo.clear();
-    seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
-    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
-  }
-
   @Test(expected=IOException.class)
   public void testFailedToCreateWALIfParentRenamed() throws IOException {
     final String name = "testFailedToCreateWALIfParentRenamed";

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
new file mode 100644
index 0000000..9fd0cb1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestSequenceIdAccounting {
+  private static final byte [] ENCODED_REGION_NAME = Bytes.toBytes("r");
+  private static final byte [] FAMILY_NAME = Bytes.toBytes("cf");
+  private static final Set<byte[]> FAMILIES;
+  static {
+    FAMILIES = new HashSet<byte[]>();
+    FAMILIES.add(FAMILY_NAME);
+  }
+
+  @Test
+  public void testStartCacheFlush() {
+    SequenceIdAccounting sida = new SequenceIdAccounting();
+    sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
+    Map<byte[], Long> m = new HashMap<byte[], Long>();
+    m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
+    assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
+    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    long sequenceid = 1;
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
+    // Only one family so should return NO_SEQNUM still.
+    assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
+    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    long currentSequenceId = sequenceid;
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
+    final Set<byte[]> otherFamily = new HashSet<byte[]>(1);
+    otherFamily.add(Bytes.toBytes("otherCf"));
+    sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
+    // Should return oldest sequence id in the region.
+    assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily));
+    sida.completeCacheFlush(ENCODED_REGION_NAME);
+  }
+
+  @Test
+  public void testAreAllLower() {
+    SequenceIdAccounting sida = new SequenceIdAccounting();
+    sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
+    Map<byte[], Long> m = new HashMap<byte[], Long>();
+    m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
+    assertTrue(sida.areAllLower(m));
+    long sequenceid = 1;
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
+    assertTrue(sida.areAllLower(m));
+    m.put(ENCODED_REGION_NAME, sequenceid);
+    assertFalse(sida.areAllLower(m));
+    long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
+    assertEquals("Lowest should be first sequence id inserted", 1, lowest);
+    m.put(ENCODED_REGION_NAME, lowest);
+    assertFalse(sida.areAllLower(m));
+    // Now make sure above works when flushing.
+    sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
+    assertFalse(sida.areAllLower(m));
+    m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
+    assertTrue(sida.areAllLower(m));
+    // Let the flush complete and if we ask if the sequenceid is lower, should be yes since
no edits
+    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    m.put(ENCODED_REGION_NAME, sequenceid);
+    assertTrue(sida.areAllLower(m));
+    // Flush again but add sequenceids while we are flushing.
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
+    lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
+    m.put(ENCODED_REGION_NAME, lowest);
+    assertFalse(sida.areAllLower(m));
+    sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
+    // The cache flush will clear out all sequenceid accounting by region.
+    assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
+    sida.completeCacheFlush(ENCODED_REGION_NAME);
+    // No new edits have gone in so no sequenceid to work with.
+    assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
+    // Make an edit behind all we'll put now into sida.
+    m.put(ENCODED_REGION_NAME, sequenceid);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
+    assertTrue(sida.areAllLower(m));
+  }
+
+  @Test
+  public void testFindLower() {
+    SequenceIdAccounting sida = new SequenceIdAccounting();
+    sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
+    Map<byte[], Long> m = new HashMap<byte[], Long>();
+    m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
+    long sequenceid = 1;
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
+    sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
+    assertTrue(sida.findLower(m) == null);
+    m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME));
+    assertTrue(sida.findLower(m).length == 1);
+    m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1);
+    assertTrue(sida.findLower(m) == null);
+  }
+}
\ No newline at end of file


Mime
View raw message