bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [bookkeeper] branch master updated: ISSUE #659: Fix Checkpoint logic in SortedLedgerStorage
Date Sat, 02 Dec 2017 05:05:59 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 81cbba3  ISSUE #659: Fix Checkpoint logic in SortedLedgerStorage
81cbba3 is described below

commit 81cbba3cf620f2a6df48c0da6ee1ac019de24fbc
Author: Sijie Guo <sijie@apache.org>
AuthorDate: Fri Dec 1 22:05:53 2017 -0700

    ISSUE #659: Fix Checkpoint logic in SortedLedgerStorage
    
    Descriptions of the changes in this PR:
    
    - add a test case to reproduce the behavior
    
    Author: Sijie Guo <sijie@apache.org>
    
    Reviewers: Ivan Kelly <ivank@apache.org>, Charan Reddy Guttapalem <reddycharan18@gmail.com>
    
    This closes #677 from sijie/fix_checkpoint, closes #659
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  18 +-
 .../apache/bookkeeper/bookie/CacheCallback.java    |   3 +-
 .../{CacheCallback.java => Checkpointer.java}      |  20 +-
 .../apache/bookkeeper/bookie/EntryMemTable.java    |   2 +-
 .../bookie/InterleavedLedgerStorage.java           |  57 ++---
 .../apache/bookkeeper/bookie/LedgerStorage.java    |  11 +-
 .../bookkeeper/bookie/SortedLedgerStorage.java     |  55 +++--
 .../org/apache/bookkeeper/bookie/SyncThread.java   |  89 ++++----
 .../apache/bookkeeper/bookie/CompactionTest.java   |  68 ++++--
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |  19 +-
 .../bookkeeper/bookie/LedgerStorageTestBase.java   |  88 ++++++++
 .../bookie/SortedLedgerStorageCheckpointTest.java  | 231 +++++++++++++++++++++
 .../bookkeeper/bookie/TestEntryMemTable.java       |  11 +-
 .../apache/bookkeeper/bookie/TestSyncThread.java   |  59 +++---
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |  17 +-
 .../bookkeeper/meta/LedgerManagerTestCase.java     |  15 +-
 16 files changed, 579 insertions(+), 184 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 0983395..c08ff46 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -705,9 +705,18 @@ public class Bookie extends BookieCriticalThread {
         String ledgerStorageClass = conf.getLedgerStorageClass();
         LOG.info("Using ledger storage: {}", ledgerStorageClass);
         ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
-        ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, checkpointSource,
-                                 statsLogger);
         syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
+
+        ledgerStorage.initialize(
+            conf,
+            ledgerManager,
+            ledgerDirsManager,
+            indexDirsManager,
+            checkpointSource,
+            syncThread,
+            statsLogger);
+
+
         handles = new HandleFactoryImpl(ledgerStorage);
 
         // Expose Stats
@@ -818,11 +827,6 @@ public class Bookie extends BookieCriticalThread {
             idxMonitor.start();
         }
 
-        // start sync thread first, so during replaying journals, we could do checkpoint
-        // which reduce the chance that we need to replay journals again if bookie restarted
-        // again before finished journal replays.
-        syncThread.start();
-
         // replay journals
         try {
             readJournal();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
index df9a848..fb41cc5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 
 /**
  * Interface plugged into caching to receive callback notifications.
@@ -30,5 +31,5 @@ public interface CacheCallback {
     /**
      * Process notification that cache size limit reached.
      */
-    void onSizeLimitReached() throws IOException;
+    void onSizeLimitReached(Checkpoint cp) throws IOException;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
similarity index 67%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
index df9a848..967d3e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CacheCallback.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,19 +15,26 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.bookkeeper.bookie;
 
-import java.io.IOException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 
 /**
- * Interface plugged into caching to receive callback notifications.
+ * The instance that is responsible for checkpointing ledger storage.
  */
-public interface CacheCallback {
+public interface Checkpointer {
+
+    Checkpointer NULL = checkpoint -> {
+        // do nothing;
+    };
+
     /**
-     * Process notification that cache size limit reached.
+     * Start checkpointing for a given <i>checkpoint</i> location.
+     *
+     * @param checkpoint the checkpoint location to checkpoint.
      */
-    void onSizeLimitReached() throws IOException;
+    void startCheckpoint(Checkpoint checkpoint);
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index c26c8e3..c8f9483 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -309,7 +309,7 @@ public class EntryMemTable {
             if (isSizeLimitReached() || (!previousFlushSucceeded.get())) {
                 Checkpoint cp = snapshot();
                 if ((null != cp) || (!previousFlushSucceeded.get())) {
-                    cb.onSizeLimitReached();
+                    cb.onSizeLimitReached(cp);
                 } else {
                     throttleWriters();
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 19a6105..6db3e6d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -21,13 +21,12 @@
 
 package org.apache.bookkeeper.bookie;
 
-
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -60,33 +59,10 @@ import org.slf4j.LoggerFactory;
 public class InterleavedLedgerStorage implements CompactableLedgerStorage, EntryLogListener {
     private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
 
-    /**
-     * Hold the last checkpoint.
-     */
-    protected static class CheckpointHolder {
-        Checkpoint lastCheckpoint = Checkpoint.MAX;
-
-        protected synchronized void setNextCheckpoint(Checkpoint cp) {
-            if (Checkpoint.MAX.equals(lastCheckpoint) || lastCheckpoint.compareTo(cp) < 0) {
-                lastCheckpoint = cp;
-            }
-        }
-
-        protected synchronized void clearLastCheckpoint(Checkpoint done) {
-            if (0 == lastCheckpoint.compareTo(done)) {
-                lastCheckpoint = Checkpoint.MAX;
-            }
-        }
-
-        protected synchronized Checkpoint getLastCheckpoint() {
-            return lastCheckpoint;
-        }
-    }
-
     EntryLogger entryLogger;
     LedgerCache ledgerCache;
-    private CheckpointSource checkpointSource;
-    protected final CheckpointHolder checkpointHolder = new CheckpointHolder();
+    protected CheckpointSource checkpointSource;
+    protected Checkpointer checkpointer;
     private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners =
             Lists.newCopyOnWriteArrayList();
 
@@ -110,12 +86,19 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     }
 
     @Override
-    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
-                           LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                           CheckpointSource checkpointSource, StatsLogger statsLogger)
+    public void initialize(ServerConfiguration conf,
+                           LedgerManager ledgerManager,
+                           LedgerDirsManager ledgerDirsManager,
+                           LedgerDirsManager indexDirsManager,
+                           CheckpointSource checkpointSource,
+                           Checkpointer checkpointer,
+                           StatsLogger statsLogger)
             throws IOException {
+        checkNotNull(checkpointSource, "invalid null checkpoint source");
+        checkNotNull(checkpointer, "invalid null checkpointer");
 
         this.checkpointSource = checkpointSource;
+        this.checkpointer = checkpointer;
         entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
                 null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
@@ -361,21 +344,12 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     }
 
     @Override
-    public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
-        Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint();
-        // if checkpoint is less than last checkpoint, we don't need to do checkpoint again.
-        if (lastCheckpoint.compareTo(checkpoint) > 0) {
-            return lastCheckpoint;
-        }
+    public void checkpoint(Checkpoint checkpoint) throws IOException {
         // we don't need to check somethingwritten since checkpoint
         // is scheduled when rotate an entry logger file. and we could
         // not set somethingWritten to false after checkpoint, since
         // current entry logger file isn't flushed yet.
         flushOrCheckpoint(true);
-        // after the ledger storage finished checkpointing, try to clear the done checkpoint
-
-        checkpointHolder.clearLastCheckpoint(lastCheckpoint);
-        return lastCheckpoint;
     }
 
     @Override
@@ -465,6 +439,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         // TODO: we could consider remove checkpointSource and checkpointSouce#newCheckpoint
         // later if we provide kind of LSN (Log/Journal Squeuence Number)
         // mechanism when adding entry. {@link https://github.com/apache/bookkeeper/issues/279}
-        checkpointHolder.setNextCheckpoint(checkpointSource.newCheckpoint());
+        Checkpoint checkpoint = checkpointSource.newCheckpoint();
+        checkpointer.startCheckpoint(checkpoint);
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 0207ba0..d2055a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -42,8 +42,13 @@ public interface LedgerStorage {
      * @param ledgerManager
      * @param ledgerDirsManager
      */
-    void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
-                    LedgerDirsManager indexDirsManager, CheckpointSource checkpointSource, StatsLogger statsLogger)
+    void initialize(ServerConfiguration conf,
+                    LedgerManager ledgerManager,
+                    LedgerDirsManager ledgerDirsManager,
+                    LedgerDirsManager indexDirsManager,
+                    CheckpointSource checkpointSource,
+                    Checkpointer checkpointer,
+                    StatsLogger statsLogger)
             throws IOException;
 
     /**
@@ -138,7 +143,7 @@ public interface LedgerStorage {
      * @throws IOException
      * @return the checkpoint that the ledger storage finished.
      */
-    Checkpoint checkpoint(Checkpoint checkpoint) throws IOException;
+    void checkpoint(Checkpoint checkpoint) throws IOException;
 
     /**
      * @param ledgerId
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index ea1ff5a..972f085 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
@@ -53,11 +54,22 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
     }
 
     @Override
-    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
-                           LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                           final CheckpointSource checkpointSource, StatsLogger statsLogger)
+    public void initialize(ServerConfiguration conf,
+                           LedgerManager ledgerManager,
+                           LedgerDirsManager ledgerDirsManager,
+                           LedgerDirsManager indexDirsManager,
+                           CheckpointSource checkpointSource,
+                           Checkpointer checkpointer,
+                           StatsLogger statsLogger)
             throws IOException {
-        super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, checkpointSource, statsLogger);
+        super.initialize(
+            conf,
+            ledgerManager,
+            ledgerDirsManager,
+            indexDirsManager,
+            checkpointSource,
+            checkpointer,
+            statsLogger);
         this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
         this.scheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder()
@@ -65,6 +77,11 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
                 .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2).build());
     }
 
+    @VisibleForTesting
+    ScheduledExecutorService getScheduler() {
+        return scheduler;
+    }
+
     @Override
     public void start() {
         try {
@@ -146,14 +163,16 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
     }
 
     @Override
-    public Checkpoint checkpoint(final Checkpoint checkpoint) throws IOException {
-        Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint();
-        // if checkpoint is less than last checkpoint, we don't need to do checkpoint again.
-        if (lastCheckpoint.compareTo(checkpoint) > 0) {
-            return lastCheckpoint;
+    public void checkpoint(final Checkpoint checkpoint) throws IOException {
+        long numBytesFlushed = memTable.flush(this, checkpoint);
+        if (numBytesFlushed > 0) {
+            // if bytes are added between previous flush and this checkpoint,
+            // it means bytes might live at current active entry log, we need
+            // roll current entry log and then issue checkpoint to underlying
+            // interleaved ledger storage.
+            entryLogger.rollLog();
         }
-        memTable.flush(this, checkpoint);
-        return super.checkpoint(checkpoint);
+        super.checkpoint(checkpoint);
     }
 
     @Override
@@ -170,7 +189,8 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
 
     // CacheCallback functions.
     @Override
-    public void onSizeLimitReached() throws IOException {
+    public void onSizeLimitReached(final Checkpoint cp) throws IOException {
+        LOG.info("Reached size {}", cp);
         // when size limit reached, we get the previous checkpoint from snapshot mem-table.
         // at this point, we are safer to schedule a checkpoint, since the entries added before
         // this checkpoint already written to entry logger.
@@ -194,8 +214,9 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
                     // for performance consideration: since we don't wanna checkpoint a new log file that ledger
                     // storage is writing to.
                     if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) {
-                        entryLogger.rollLog();
                         LOG.info("Rolling entry logger since it reached size limitation");
+                        entryLogger.rollLog();
+                        checkpointer.startCheckpoint(cp);
                     }
                 } catch (IOException e) {
                     // TODO: if we failed to flush data, we should switch the bookie back to readonly mode
@@ -205,4 +226,12 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
             }
         });
     }
+
+    @Override
+    public void onRotateEntryLog() {
+        // override the behavior at interleaved ledger storage.
+        // we don't trigger any checkpoint logic when an entry log file is rotated, because entry log file rotation
+        // can happen because compaction. in a sorted ledger storage, checkpoint should happen after the data is
+        // flushed to the entry log file.
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index ca001a4..44bf6ae 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -24,18 +24,16 @@ package org.apache.bookkeeper.bookie;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * SyncThread is a background thread which help checkpointing ledger storage
@@ -54,8 +52,8 @@ import org.slf4j.LoggerFactory;
  * for manual recovery in critical disaster.
  * </p>
  */
-class SyncThread {
-    private static final Logger LOG = LoggerFactory.getLogger(SyncThread.class);
+@Slf4j
+class SyncThread implements Checkpointer {
 
     final ScheduledExecutorService executor;
     final int flushInterval;
@@ -78,47 +76,43 @@ class SyncThread {
             .setNameFormat("SyncThread-" + conf.getBookiePort() + "-%d");
         this.executor = Executors.newSingleThreadScheduledExecutor(tfb.build());
         flushInterval = conf.getFlushInterval();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Flush Interval : {}", flushInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("Flush Interval : {}", flushInterval);
         }
     }
 
-    void start() {
-        executor.scheduleAtFixedRate(new Runnable() {
-            public void run() {
-                try {
-                    synchronized (suspensionLock) {
-                        while (suspended) {
-                            try {
-                                suspensionLock.wait();
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                continue;
-                            }
+    @Override
+    public void startCheckpoint(Checkpoint checkpoint) {
+        executor.submit(() -> {
+            try {
+                synchronized (suspensionLock) {
+                    while (suspended) {
+                        try {
+                            suspensionLock.wait();
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            continue;
                         }
                     }
-                    if (!disableCheckpoint) {
-                        checkpoint(checkpointSource.newCheckpoint());
-                    }
-                } catch (Throwable t) {
-                    LOG.error("Exception in SyncThread", t);
-                    dirsListener.fatalError();
                 }
+                if (!disableCheckpoint) {
+                    checkpoint(checkpoint);
+                }
+            } catch (Throwable t) {
+                log.error("Exception in SyncThread", t);
+                dirsListener.fatalError();
             }
-        }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+        });
     }
 
     public Future<Void> requestFlush() {
-        return executor.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                try {
-                    flush();
-                } catch (Throwable t) {
-                    LOG.error("Exception flushing ledgers ", t);
-                }
-                return null;
+        return executor.submit(() -> {
+            try {
+                flush();
+            } catch (Throwable t) {
+                log.error("Exception flushing ledgers ", t);
             }
+            return null;
         });
     }
 
@@ -127,11 +121,11 @@ class SyncThread {
         try {
             ledgerStorage.flush();
         } catch (NoWritableLedgerDirException e) {
-            LOG.error("No writeable ledger directories", e);
+            log.error("No writeable ledger directories", e);
             dirsListener.allDisksFull();
             return;
         } catch (IOException e) {
-            LOG.error("Exception flushing ledgers", e);
+            log.error("Exception flushing ledgers", e);
             return;
         }
 
@@ -139,32 +133,37 @@ class SyncThread {
             return;
         }
 
-        LOG.info("Flush ledger storage at checkpoint {}.", checkpoint);
+        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
         try {
             checkpointSource.checkpointComplete(checkpoint, false);
         } catch (IOException e) {
-            LOG.error("Exception marking checkpoint as complete", e);
+            log.error("Exception marking checkpoint as complete", e);
             dirsListener.allDisksFull();
         }
     }
 
     @VisibleForTesting
     public void checkpoint(Checkpoint checkpoint) {
+        if (null == checkpoint) {
+            // do nothing if checkpoint is null
+            return;
+        }
+
         try {
-            checkpoint = ledgerStorage.checkpoint(checkpoint);
+            ledgerStorage.checkpoint(checkpoint);
         } catch (NoWritableLedgerDirException e) {
-            LOG.error("No writeable ledger directories", e);
+            log.error("No writeable ledger directories", e);
             dirsListener.allDisksFull();
             return;
         } catch (IOException e) {
-            LOG.error("Exception flushing ledgers", e);
+            log.error("Exception flushing ledgers", e);
             return;
         }
 
         try {
             checkpointSource.checkpointComplete(checkpoint, true);
         } catch (IOException e) {
-            LOG.error("Exception marking checkpoint as complete", e);
+            log.error("Exception marking checkpoint as complete", e);
             dirsListener.allDisksFull();
         }
     }
@@ -197,13 +196,13 @@ class SyncThread {
 
     // shutdown sync thread
     void shutdown() throws InterruptedException {
-        LOG.info("Shutting down SyncThread");
+        log.info("Shutting down SyncThread");
         requestFlush();
         executor.shutdown();
         long start = MathUtils.now();
         while (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
             long now = MathUtils.now();
-            LOG.info("SyncThread taking a long time to shutdown. Has taken {}"
+            log.info("SyncThread taking a long time to shutdown. Has taken {}"
                     + " seconds so far", now - start);
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 17cc04f..08bb131 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -20,9 +20,14 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTED_SUFFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -34,7 +39,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
-
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -56,14 +60,11 @@ import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.TestUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTED_SUFFIX;
-import static org.junit.Assert.*;
 /**
  * This class tests the entry log compaction functionality.
  */
@@ -240,9 +241,14 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             Bookie.checkDirectoryStructure(dir);
         }
         InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
-        storage.initialize(conf,
-                LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(),
-                dirManager, dirManager, cp, NullStatsLogger.INSTANCE);
+        storage.initialize(
+            conf,
+            LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(),
+            dirManager,
+            dirManager,
+            cp,
+            Checkpointer.NULL,
+            NullStatsLogger.INSTANCE);
         storage.start();
         long startTime = MathUtils.now();
         storage.gcThread.enableForceGC();
@@ -623,7 +629,14 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         assertFalse("Log shouldnt exist", log0.exists());
         InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
-        storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
+        storage.initialize(
+            conf,
+            manager,
+            dirs,
+            dirs,
+            checkpointSource,
+            Checkpointer.NULL,
+            NullStatsLogger.INSTANCE);
         ledgers.add(1L);
         ledgers.add(2L);
         ledgers.add(3L);
@@ -642,7 +655,13 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         ledgers.remove(3L);
 
         storage = new InterleavedLedgerStorage();
-        storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
+        storage.initialize(
+            conf,
+            manager,
+            dirs, dirs,
+            checkpointSource,
+            Checkpointer.NULL,
+            NullStatsLogger.INSTANCE);
         storage.start();
         for (int i = 0; i < 10; i++) {
             if (!log0.exists()) {
@@ -658,7 +677,14 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         storage.addEntry(genEntry(4, 1, ENTRY_SIZE)); // force ledger 1 page to flush
 
         storage = new InterleavedLedgerStorage();
-        storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
+        storage.initialize(
+            conf,
+            manager,
+            dirs,
+            dirs,
+            checkpointSource,
+            Checkpointer.NULL,
+            NullStatsLogger.INSTANCE);
         storage.getEntry(1, 1); // entry should exist
     }
 
@@ -755,7 +781,14 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             }
         };
         InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
-        storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
+        storage.initialize(
+            conf,
+            manager,
+            dirs,
+            dirs,
+            checkpointSource,
+            Checkpointer.NULL,
+            NullStatsLogger.INSTANCE);
 
         double threshold = 0.1;
         // shouldn't throw exception
@@ -803,9 +836,14 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             Bookie.checkDirectoryStructure(dir);
         }
         InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
-        storage.initialize(conf,
-                LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(),
-                dirManager, dirManager, cp, NullStatsLogger.INSTANCE);
+        storage.initialize(
+            conf,
+            LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(),
+            dirManager,
+            dirManager,
+            cp,
+            Checkpointer.NULL,
+            NullStatsLogger.INSTANCE);
         storage.start();
 
         // test suspend Major GC.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 5f62ecc..8de2bfc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -475,10 +475,21 @@ public class LedgerCacheTest {
         }
 
         @Override
-        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
-                LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                final CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException {
-            super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, checkpointSource, statsLogger);
+        public void initialize(ServerConfiguration conf,
+                               LedgerManager ledgerManager,
+                               LedgerDirsManager ledgerDirsManager,
+                               LedgerDirsManager indexDirsManager,
+                               CheckpointSource checkpointSource,
+                               Checkpointer checkpointer,
+                               StatsLogger statsLogger) throws IOException {
+            super.initialize(
+                conf,
+                ledgerManager,
+                ledgerDirsManager,
+                indexDirsManager,
+                checkpointSource,
+                checkpointer,
+                statsLogger);
             this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger) {
                 @Override
                 boolean isSizeLimitReached() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTestBase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTestBase.java
new file mode 100644
index 0000000..29268eb
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTestBase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Test the checkpoint logic in bookies.
+ */
+@Slf4j
+public abstract class LedgerStorageTestBase {
+
+    @Rule
+    public TestName testName = new TestName();
+
+    protected ServerConfiguration conf;
+    protected File journalDir, ledgerDir;
+    protected LedgerDirsManager ledgerDirsManager;
+
+    private File createTempDir(String suffix) throws Exception {
+        File dir = File.createTempFile(testName.getMethodName(), suffix);
+        dir.delete();
+        dir.mkdirs();
+        return dir;
+    }
+
+    protected LedgerStorageTestBase() {
+        conf = TestBKConfiguration.newServerConfiguration();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        journalDir = createTempDir("journal");
+        ledgerDir = createTempDir("ledger");
+
+        // create current directories
+        Bookie.getCurrentDirectory(journalDir).mkdir();
+        Bookie.getCurrentDirectory(ledgerDir).mkdir();
+
+        // build the configuration
+        conf.setZkServers(null);
+        conf.setJournalDirName(journalDir.getPath());
+        conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+        // build the ledger monitor
+        DiskChecker checker = new DiskChecker(
+            conf.getDiskUsageThreshold(),
+            conf.getDiskUsageWarnThreshold());
+        ledgerDirsManager = new LedgerDirsManager(
+            conf,
+            conf.getLedgerDirs(),
+            checker);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        FileUtils.deleteDirectory(journalDir);
+        FileUtils.deleteDirectory(ledgerDir);
+    }
+
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
new file mode 100644
index 0000000..c70b0e0
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test {@link SortedLedgerStorage}.
+ */
+@Slf4j
+public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
+
+    @Data
+    @RequiredArgsConstructor
+    @ToString
+    @EqualsAndHashCode
+    private static class TestCheckpoint implements Checkpoint {
+
+        private final long offset;
+
+        @Override
+        public int compareTo(Checkpoint o) {
+            if (Checkpoint.MAX == o) {
+                return -1;
+            }
+
+            TestCheckpoint other = (TestCheckpoint) o;
+            return Long.compare(offset, other.offset);
+        }
+
+    }
+
+    @RequiredArgsConstructor
+    private static class TestCheckpointSource implements CheckpointSource {
+
+        private long currentOffset = 0;
+
+        void advanceOffset(long numBytes) {
+            currentOffset += numBytes;
+        }
+
+        @Override
+        public Checkpoint newCheckpoint() {
+            TestCheckpoint cp = new TestCheckpoint(currentOffset);
+            log.info("New checkpoint : {}", cp);
+            return cp;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+            throws IOException {
+            log.info("Complete checkpoint : {}", checkpoint);
+        }
+    }
+
+    private SortedLedgerStorage storage;
+    private Checkpointer checkpointer;
+    private final LinkedBlockingQueue<Checkpoint> checkpoints;
+    private final TestCheckpointSource checkpointSrc = new TestCheckpointSource();
+
+    public SortedLedgerStorageCheckpointTest() {
+        super();
+        conf.setEntryLogSizeLimit(1);
+        this.checkpoints = new LinkedBlockingQueue<>();
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        // initial checkpoint
+
+        this.storage = new SortedLedgerStorage();
+        this.checkpointer = checkpoint -> storage.getScheduler().submit(() -> {
+            log.info("Checkpoint the storage at {}", checkpoint);
+            try {
+                storage.checkpoint(checkpoint);
+                checkpoints.add(checkpoint);
+            } catch (IOException e) {
+                log.error("Failed to checkpoint at {}", checkpoint, e);
+            }
+        });
+        this.storage.initialize(
+            conf,
+            mock(LedgerManager.class),
+            ledgerDirsManager,
+            ledgerDirsManager,
+            checkpointSrc,
+            checkpointer,
+            NullStatsLogger.INSTANCE);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        if (null != storage) {
+            storage.shutdown();
+        }
+        super.tearDown();
+    }
+
+    ByteBuf prepareEntry(long ledgerId, long entryId) {
+        ByteBuf entry = Unpooled.buffer(4 * Long.BYTES);
+        // ledger id, entry id, lac
+        entry.writeLong(ledgerId);
+        entry.writeLong(entryId);
+        entry.writeLong(entryId - 1);
+        // data
+        entry.writeLong(entryId);
+        return entry;
+    }
+
+    @Test
+    public void testCheckpoint() throws Exception {
+        // memory table holds the first checkpoint, but it is not completed yet.
+        Checkpoint memtableCp = storage.memTable.kvmap.cp;
+        assertEquals(new TestCheckpoint(0), memtableCp);
+
+        // write entries into ledger storage
+        long lid = System.currentTimeMillis();
+        storage.setMasterKey(lid, new byte[0]);
+        for (int i = 0; i < 20; i++) {
+            storage.addEntry(prepareEntry(lid, i));
+        }
+        // simulate journal persists the entries in journal;
+        checkpointSrc.advanceOffset(100);
+
+        // memory table holds the first checkpoint, but it is not completed yet.
+        memtableCp = storage.memTable.kvmap.cp;
+        assertEquals(new TestCheckpoint(0), memtableCp);
+
+        // trigger a memtable flush
+        storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
+        // wait for checkpoint to complete
+        checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        assertEquals(new TestCheckpoint(100), storage.memTable.kvmap.cp);
+        assertEquals(0, storage.memTable.kvmap.size());
+    }
+
+    @Test
+    public void testCheckpointAfterEntryLogRotated() throws Exception {
+        // memory table holds the first checkpoint, but it is not completed yet.
+        Checkpoint memtableCp = storage.memTable.kvmap.cp;
+        assertEquals(new TestCheckpoint(0), memtableCp);
+
+        // write entries into ledger storage
+        long lid = System.currentTimeMillis();
+        storage.setMasterKey(lid, new byte[0]);
+        for (int i = 0; i < 20; i++) {
+            storage.addEntry(prepareEntry(lid, i));
+        }
+        // simulate journal persists the entries in journal;
+        checkpointSrc.advanceOffset(100);
+
+        // memory table holds the first checkpoint, but it is not completed yet.
+        memtableCp = storage.memTable.kvmap.cp;
+        assertEquals(new TestCheckpoint(0), memtableCp);
+        assertEquals(20, storage.memTable.kvmap.size());
+
+        final CountDownLatch readyLatch = new CountDownLatch(1);
+        storage.getScheduler().submit(() -> {
+            try {
+                readyLatch.await();
+            } catch (InterruptedException e) {
+            }
+        });
+
+        // simulate entry log is rotated (due to compaction)
+        storage.entryLogger.rollLog();
+        long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId();
+        long currentLogId = storage.entryLogger.getCurrentLogId();
+        log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
+
+        readyLatch.countDown();
+        assertNull(checkpoints.poll());
+        assertEquals(new TestCheckpoint(0), storage.memTable.kvmap.cp);
+        assertEquals(20, storage.memTable.kvmap.size());
+
+        // trigger a memtable flush
+        storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
+        assertEquals(new TestCheckpoint(100), checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+
+        // all the entries are flushed out
+        assertEquals(new TestCheckpoint(100), storage.memTable.kvmap.cp);
+        assertEquals(0, storage.memTable.kvmap.size());
+        assertTrue(
+            "current log " + currentLogId + " contains entries added from memtable should be forced to disk"
+            + " but least unflushed log is " + storage.entryLogger.getLeastUnflushedLogId(),
+            storage.entryLogger.getLeastUnflushedLogId() > currentLogId);
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index 0ca108e..ec60799 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -18,20 +18,21 @@
  */
 package org.apache.bookkeeper.bookie;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.HashSet;
-
+import java.util.Random;
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.Test;
 import org.junit.Before;
-import java.nio.ByteBuffer;
-import java.util.Random;
 
 public class TestEntryMemTable implements CacheCallback, SkipListFlusher, CheckpointSource {
 
@@ -92,7 +93,7 @@ public class TestEntryMemTable implements CacheCallback, SkipListFlusher, Checkp
     }
 
     @Override
-    public void onSizeLimitReached() throws IOException {
+    public void onSizeLimitReached(Checkpoint cp) throws IOException {
         // No-op
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 0d8f8a4..5f30557 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -21,7 +21,6 @@
 package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Observable;
@@ -34,7 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -101,7 +99,7 @@ public class TestSyncThread {
                 }
 
                 @Override
-                public Checkpoint checkpoint(Checkpoint checkpoint)
+                public void checkpoint(Checkpoint checkpoint)
                         throws IOException {
                     checkpointCalledLatch.countDown();
                     try {
@@ -111,27 +109,24 @@ public class TestSyncThread {
                         LOG.error("Interrupted in checkpoint thread", ie);
                         failedSomewhere.set(true);
                     }
-                    return checkpoint;
                 }
             };
 
         final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
-        t.start();
+        t.startCheckpoint(Checkpoint.MAX);
         assertTrue("Checkpoint should have been called",
                    checkpointCalledLatch.await(10, TimeUnit.SECONDS));
-        Future<Boolean> done = executor.submit(new Callable<Boolean>() {
-                public Boolean call() {
-                    try {
-                        t.shutdown();
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                        LOG.error("Interrupted shutting down sync thread", ie);
-                        failedSomewhere.set(true);
-                        return false;
-                    }
-                    return true;
-                }
-            });
+        Future<Boolean> done = executor.submit(() -> {
+            try {
+                t.shutdown();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Interrupted shutting down sync thread", ie);
+                failedSomewhere.set(true);
+                return false;
+            }
+            return true;
+        });
         checkpointLatch.countDown();
         assertFalse("Shutdown shouldn't have finished", done.isDone());
         assertTrue("Flush should have been called",
@@ -160,14 +155,13 @@ public class TestSyncThread {
         final AtomicInteger checkpointCount = new AtomicInteger(0);
         LedgerStorage storage = new DummyLedgerStorage() {
                 @Override
-                public Checkpoint checkpoint(Checkpoint checkpoint)
+                public void checkpoint(Checkpoint checkpoint)
                         throws IOException {
                     checkpointCount.incrementAndGet();
-                    return checkpoint;
                 }
             };
         final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
-        t.start();
+        t.startCheckpoint(Checkpoint.MAX);
         while (checkpointCount.get() == 0) {
             Thread.sleep(flushInterval);
         }
@@ -175,6 +169,7 @@ public class TestSyncThread {
         Thread.sleep(flushInterval);
         int count = checkpointCount.get();
         for (int i = 0; i < 10; i++) {
+            t.startCheckpoint(Checkpoint.MAX);
             assertEquals("Checkpoint count shouldn't change", count, checkpointCount.get());
         }
         t.resumeSync();
@@ -210,13 +205,13 @@ public class TestSyncThread {
 
         LedgerStorage storage = new DummyLedgerStorage() {
                 @Override
-                public Checkpoint checkpoint(Checkpoint checkpoint)
+                public void checkpoint(Checkpoint checkpoint)
                         throws IOException {
                     throw new RuntimeException("Fatal error in sync thread");
                 }
             };
         final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
-        t.start();
+        t.startCheckpoint(Checkpoint.MAX);
         assertTrue("Should have called fatal error", fatalLatch.await(10, TimeUnit.SECONDS));
         t.shutdown();
     }
@@ -242,13 +237,13 @@ public class TestSyncThread {
 
         LedgerStorage storage = new DummyLedgerStorage() {
                 @Override
-                public Checkpoint checkpoint(Checkpoint checkpoint)
+                public void checkpoint(Checkpoint checkpoint)
                         throws IOException {
                     throw new NoWritableLedgerDirException("Disk full error in sync thread");
                 }
             };
         final SyncThread t = new SyncThread(conf, listener, storage, checkpointSource);
-        t.start();
+        t.startCheckpoint(Checkpoint.MAX);
         assertTrue("Should have disk full error", diskFullLatch.await(10, TimeUnit.SECONDS));
         t.shutdown();
     }
@@ -267,9 +262,14 @@ public class TestSyncThread {
 
     private static class DummyLedgerStorage implements LedgerStorage {
         @Override
-        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
-                LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                CheckpointSource checkpointSource, StatsLogger statsLogger)
+        public void initialize(
+            ServerConfiguration conf,
+            LedgerManager ledgerManager,
+            LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager,
+            CheckpointSource checkpointSource,
+            Checkpointer checkpointer,
+            StatsLogger statsLogger)
                 throws IOException {
         }
 
@@ -346,9 +346,8 @@ public class TestSyncThread {
         }
 
         @Override
-        public Checkpoint checkpoint(Checkpoint checkpoint)
+        public void checkpoint(Checkpoint checkpoint)
                 throws IOException {
-            return checkpoint;
         }
 
         @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index ecdfa77..ad21fc1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import io.netty.buffer.ByteBuf;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,10 +47,10 @@ import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
 import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
@@ -320,9 +319,14 @@ public class GcLedgersTest extends LedgerManagerTestCase {
     class MockLedgerStorage implements CompactableLedgerStorage {
 
         @Override
-        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
-                LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException {
+        public void initialize(
+            ServerConfiguration conf,
+            LedgerManager ledgerManager,
+            LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager,
+            CheckpointSource checkpointSource,
+            Checkpointer checkpointer,
+            StatsLogger statsLogger) throws IOException {
         }
 
         @Override
@@ -386,8 +390,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         }
 
         @Override
-        public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
-            return null;
+        public void checkpoint(Checkpoint checkpoint) throws IOException {
         }
 
         @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index c6f2a36..5d357c3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -32,6 +32,7 @@ import java.util.Observer;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
 import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
@@ -112,9 +113,14 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
     public class MockLedgerStorage implements CompactableLedgerStorage {
 
         @Override
-        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
-                LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException {
+        public void initialize(
+            ServerConfiguration conf,
+            LedgerManager ledgerManager,
+            LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager,
+            CheckpointSource checkpointSource,
+            Checkpointer checkpointer,
+            StatsLogger statsLogger) throws IOException {
         }
 
         @Override
@@ -169,8 +175,7 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         }
 
         @Override
-        public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
-            return null;
+        public void checkpoint(Checkpoint checkpoint) throws IOException {
         }
 
         @Override

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message