bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jujj...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-986: Handle memtable flush failure
Date Wed, 21 Dec 2016 00:48:54 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 9a506c261 -> 6e738d0c0


BOOKKEEPER-986: Handle memtable flush failure

- If the memtable flush is failed previously then
for the next addEntry call it will try to flush the
existing snapshot

Author: Charan Reddy Guttapalem <cguttapalem@salesforce.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #92 from reddycharan/handlememtableflushfailure


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/6e738d0c
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/6e738d0c
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/6e738d0c

Branch: refs/heads/master
Commit: 6e738d0c02ebe6710a24aad505e2fd9386bb8821
Parents: 9a506c2
Author: Charan Reddy Guttapalem <cguttapalem@salesforce.com>
Authored: Tue Dec 20 16:48:48 2016 -0800
Committer: JV <vjujjuri@salesforce.com>
Committed: Tue Dec 20 16:48:48 2016 -0800

----------------------------------------------------------------------
 .../apache/bookkeeper/bookie/EntryMemTable.java | 33 ++++++--
 .../bookkeeper/bookie/SortedLedgerStorage.java  |  2 +-
 .../bookkeeper/bookie/LedgerCacheTest.java      | 89 ++++++++++++++++++++
 3 files changed, 116 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6e738d0c/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index 60da560..ff14d03 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -23,6 +23,7 @@ import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -106,6 +107,9 @@ public class EntryMemTable {
     final long skipListSizeLimit;
 
     SkipListArena allocator;
+    
+    // flag indicating the status of the previous flush call
+    private final AtomicBoolean previousFlushSucceeded;
 
     private EntrySkipList newSkipList() {
         return new EntrySkipList(checkpointSource.newCheckpoint());
@@ -130,6 +134,7 @@ public class EntryMemTable {
         this.conf = conf;
         this.size = new AtomicLong(0);
         this.allocator = new SkipListArena(conf);
+        this.previousFlushSucceeded = new AtomicBoolean(true);
         // skip list size limit
         this.skipListSizeLimit = conf.getSkipListSizeLimit();
 
@@ -199,7 +204,14 @@ public class EntryMemTable {
      * Flush snapshot and clear it.
      */
     long flush(final SkipListFlusher flusher) throws IOException {
-        return flushSnapshot(flusher, Checkpoint.MAX);
+        try {
+            long flushSize = flushSnapshot(flusher, Checkpoint.MAX);
+            previousFlushSucceeded.set(true);
+            return flushSize;
+        } catch (IOException ioe) {
+            previousFlushSucceeded.set(false);
+            throw ioe;
+        }
     }
 
     /**
@@ -209,11 +221,17 @@ public class EntryMemTable {
      *          all data before this checkpoint need to be flushed.
      */
     public long flush(SkipListFlusher flusher, Checkpoint checkpoint) throws IOException
{
-        long size = flushSnapshot(flusher, checkpoint);
-        if (null != snapshot(checkpoint)) {
-            size += flushSnapshot(flusher, checkpoint);
+        try {
+            long size = flushSnapshot(flusher, checkpoint);
+            if (null != snapshot(checkpoint)) {
+                size += flushSnapshot(flusher, checkpoint);
+            }
+            previousFlushSucceeded.set(true);
+            return size;
+        } catch (IOException ioe) {
+            previousFlushSucceeded.set(false);
+            throw ioe;
         }
-        return size;
     }
 
     /**
@@ -282,6 +300,7 @@ public class EntryMemTable {
     * Write an update
     * @param entry
     * @return approximate size of the passed key and value.
+     * @throws IOException 
     */
     public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final CacheCallback
cb)
             throws IOException {
@@ -289,9 +308,9 @@ public class EntryMemTable {
         long startTimeNanos = MathUtils.nowInNano();
         boolean success = false;
         try {
-            if (isSizeLimitReached()) {
+            if (isSizeLimitReached() || (!previousFlushSucceeded.get())) {
                 Checkpoint cp = snapshot();
-                if (null != cp) {
+                if ((null != cp) || (!previousFlushSucceeded.get())) {
                     cb.onSizeLimitReached();
                 } else {
                     throttleWriters();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6e738d0c/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 8e72852..27fb5f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -39,7 +39,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
         implements LedgerStorage, CacheCallback, SkipListFlusher {
     private final static Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
 
-    private EntryMemTable memTable;
+    EntryMemTable memTable;
     private ScheduledExecutorService scheduler;
 
     public SortedLedgerStorage() {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6e738d0c/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 8812c90..b63806e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -28,10 +28,14 @@ import java.nio.ByteBuffer;
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.SnapshotMap;
 import org.apache.bookkeeper.util.IOUtils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.ArrayList;
@@ -447,6 +451,91 @@ public class LedgerCacheTest {
         flushThread.interrupt();
     }
 
+    // Mock SortedLedgerStorage to simulate flush failure (Dependency Fault Injection)
+    static class FlushTestSortedLedgerStorage extends SortedLedgerStorage {
+        final AtomicBoolean injectMemTableSizeLimitReached;
+        final AtomicBoolean injectFlushException;
+
+        public FlushTestSortedLedgerStorage() {
+            super();
+            injectMemTableSizeLimitReached = new AtomicBoolean();
+            injectFlushException = new AtomicBoolean();
+        }
+
+        public void setInjectMemTableSizeLimitReached(boolean setValue) {
+            injectMemTableSizeLimitReached.set(setValue);
+        }
+
+        public void setInjectFlushException(boolean setValue) {
+            injectFlushException.set(setValue);
+        }
+
+        @Override
+        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
+                LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                final CheckpointSource checkpointSource, StatsLogger statsLogger) throws
IOException {
+            super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, checkpointSource,
statsLogger);
+            this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger) {
+                @Override
+                boolean isSizeLimitReached() {
+                    return (injectMemTableSizeLimitReached.get() || super.isSizeLimitReached());
+                }
+            };
+        }
+
+        @Override
+        public void process(long ledgerId, long entryId, ByteBuffer buffer) throws IOException
{
+            if (injectFlushException.get()) {
+                throw new IOException("Injected Exception");
+            }
+            super.process(ledgerId, entryId, buffer);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testEntryMemTableFlushFailure() throws Exception {
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+
+        Bookie bookie = new Bookie(conf);
+        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage)
bookie.ledgerStorage;
+        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+        // this bookie.addEntry call is required. FileInfo for Ledger 1 would be created
with this call.
+        // without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls will fail
because of BOOKKEEPER-965 change.
+        bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+        assertFalse("Bookie is expected to be in ReadWrite mode", bookie.isReadOnly());
+        assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
+
+        // set flags, so that FlushTestSortedLedgerStorage simulates FlushFailure scenario
+        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+        flushTestSortedLedgerStorage.setInjectFlushException(true);
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+        Thread.sleep(1000);
+
+        // since we simulated sizeLimitReached, snapshot shouldn't be empty
+        assertFalse("EntryMemTable SnapShot is not expected to be empty", memTable.snapshot.isEmpty());
+
+        // set the flags to false, so flush will succeed this time
+        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(false);
+        flushTestSortedLedgerStorage.setInjectFlushException(false);
+
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 3));
+        Thread.sleep(1000);
+        // since we expect memtable flush to succeed, memtable snapshot should be empty
+        assertTrue("EntryMemTable SnapShot is expected to be empty, because of successful
flush",
+                memTable.snapshot.isEmpty());
+    }
+    
     private ByteBuffer generateEntry(long ledger, long entry) {
         byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
         ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);


Mime
View raw message