bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-851: Configurable LedgerStorage implementation
Date Tue, 08 Mar 2016 05:50:01 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 2c567d008 -> 96adbf1d6


BOOKKEEPER-851: Configurable LedgerStorage implementation

sijie Addressed almost all comments from https://reviews.apache.org/r/33096

Only point still open is how to treat the `SortedLedgerStorage` and the flag `sortedLedgerStorageEnabled=true` in the config file

Author: Matteo Merli <mmerli@apache.org>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #6 from merlimat/bk-851-configurable-ledger-storage


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/96adbf1d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/96adbf1d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/96adbf1d

Branch: refs/heads/master
Commit: 96adbf1d613a63602c8b1b4aad1b0a7d17e6eee3
Parents: 2c567d0
Author: Matteo Merli <mmerli@apache.org>
Authored: Mon Mar 7 21:49:52 2016 -0800
Committer: Sijie Guo <sijie@apache.org>
Committed: Mon Mar 7 21:49:52 2016 -0800

----------------------------------------------------------------------
 bookkeeper-server/conf/bk_server.conf           |   3 +
 .../org/apache/bookkeeper/bookie/Bookie.java    |  19 ++-
 .../bookie/CompactableLedgerStorage.java        |  61 +++++++++
 .../apache/bookkeeper/bookie/EntryLocation.java |  34 +++++
 .../org/apache/bookkeeper/bookie/FileInfo.java  |   1 +
 .../bookie/GarbageCollectorThread.java          |  77 +++++------
 .../bookkeeper/bookie/IndexPersistenceMgr.java  |   9 +-
 .../bookie/InterleavedLedgerStorage.java        |  72 ++++++++---
 .../apache/bookkeeper/bookie/LedgerStorage.java |  26 +++-
 .../bookkeeper/bookie/LedgerStorageFactory.java |  35 +++++
 .../bookkeeper/bookie/ReadOnlyEntryLogger.java  |   2 +-
 .../bookie/ScanAndCompareGarbageCollector.java  |  47 ++++---
 .../bookkeeper/bookie/SortedLedgerStorage.java  |  21 +--
 .../bookkeeper/conf/ServerConfiguration.java    |  48 ++++++-
 .../bookkeeper/bookie/CompactionTest.java       |  39 +++---
 .../bookkeeper/bookie/LedgerCacheTest.java      |   2 +-
 .../bookkeeper/bookie/TestSyncThread.java       |  14 ++
 .../apache/bookkeeper/meta/GcLedgersTest.java   | 127 ++++++++++++++++++-
 .../test/ForceReadOnlyBookieTest.java           |   7 +-
 .../bookkeeper/test/LedgerDeleteTest.java       |   4 +-
 .../bookkeeper/test/ReadOnlyBookieTest.java     |   4 +-
 21 files changed, 502 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index 53db095..f73e633 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -67,6 +67,9 @@ ledgerDirectories=/tmp/bk-data
 # store all ledgers.
 # zkLedgersRootPath=/ledgers
 
+# Ledger storage implementation class
+# ledgerStorageClass=org.apache.bookkeeper.bookie.SortedLedgerStorage
+
 # Enable/Disable entry logger preallocation
 # entryLogFilePreallocationEnabled=true
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 74876ff..6272200 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -503,16 +503,11 @@ public class Bookie extends BookieCriticalThread {
         // instantiate the journal
         journal = new Journal(conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE));
 
-        // Check the type of storage.
-        if (conf.getSortedLedgerStorageEnabled()) {
-            ledgerStorage = new SortedLedgerStorage(conf, ledgerManager,
-                                                    ledgerDirsManager, indexDirsManager,
-                                                    journal, statsLogger);
-        } else {
-            ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
-                                                         ledgerDirsManager, indexDirsManager,
-                                                         journal, statsLogger);
-        }
+        // Instantiate the ledger storage implementation
+        String ledgerStorageClass = conf.getLedgerStorageClass();
+        LOG.info("Using ledger storage: {}", ledgerStorageClass);
+        ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
+        ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, journal, statsLogger);
         syncThread = new SyncThread(conf, getLedgerDirsListener(),
                                     ledgerStorage, journal);
 
@@ -712,7 +707,9 @@ public class Bookie extends BookieCriticalThread {
 
             try {
                 jmxLedgerStorageBean = this.ledgerStorage.getJMXBean();
-                BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean);
+                if (jmxLedgerStorageBean != null) {
+                    BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean);
+                }
             } catch (Exception e) {
                 LOG.warn("Failed to register with JMX for ledger cache", e);
                 jmxLedgerStorageBean = null;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
new file mode 100644
index 0000000..c76a485
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+
+/**
+ * Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction
+ */
+public interface CompactableLedgerStorage extends LedgerStorage {
+
+    /**
+     * @return the EntryLogger used by the ledger storage
+     */
+    EntryLogger getEntryLogger();
+
+    /**
+     * Get an iterator over a range of ledger ids stored in the bookie.
+     *
+     * @param firstLedgerId first ledger id in the sequence (included)
+     * @param lastLedgerId last ledger id in the sequence (not included)
+     * @return
+     */
+    Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId)
+            throws IOException;
+
+    /**
+     * Update the location of several entries
+     *
+     * @param locations
+     *            the list of locations to update
+     * @throws IOException
+     */
+    void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException;
+
+    /**
+     * Flush the entries locations index for the compacted entries
+     *
+     * @throws IOException
+     */
+    void flushEntriesLocationsIndex() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java
new file mode 100644
index 0000000..dbb85a1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+public class EntryLocation {
+    public final long ledger;
+    public final long entry;
+    public final long location;
+
+    public EntryLocation(long ledger, long entry, long location) {
+        this.ledger = ledger;
+        this.entry = entry;
+        this.location = location;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 44f004a..3b7c8dc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -265,6 +265,7 @@ class FileInfo {
         }
         if (useCount.get() == 0 && fc != null) {
             fc.close();
+            fc = null;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 1c9c7e7..4d7da25 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -39,7 +39,6 @@ import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,9 +79,7 @@ public class GarbageCollectorThread extends BookieThread {
     // Entry Logger Handle
     final EntryLogger entryLogger;
 
-    // Ledger Cache Handle
-    final LedgerCache ledgerCache;
-    final SnapshotMap<Long, Boolean> activeLedgers;
+    final CompactableLedgerStorage ledgerStorage;
 
     // flag to ensure gc thread will not be interrupted during compaction
     // to reduce the risk getting entry log corrupted
@@ -103,35 +100,23 @@ public class GarbageCollectorThread extends BookieThread {
     final GarbageCollector garbageCollector;
     final GarbageCleaner garbageCleaner;
 
-    private static class Offset {
-        final long ledger;
-        final long entry;
-        final long offset;
-
-        Offset(long ledger, long entry, long offset) {
-            this.ledger = ledger;
-            this.entry = entry;
-            this.offset = offset;
-        }
-    }
- 
     private static class Throttler {
         final RateLimiter rateLimiter;
         final boolean isThrottleByBytes;
         final int compactionRateByBytes;
         final int compactionRateByEntries;
 
-        Throttler(boolean isThrottleByBytes, 
-                  int compactionRateByBytes, 
+        Throttler(boolean isThrottleByBytes,
+                  int compactionRateByBytes,
                   int compactionRateByEntries) {
             this.isThrottleByBytes  = isThrottleByBytes;
             this.compactionRateByBytes = compactionRateByBytes;
             this.compactionRateByEntries = compactionRateByEntries;
-            this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ? 
-                                                  this.compactionRateByBytes : 
+            this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ?
+                                                  this.compactionRateByBytes :
                                                   this.compactionRateByEntries);
         }
-        
+
         // acquire. if bybytes: bytes of this entry; if byentries: 1.
         void acquire(int permits) {
             rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
@@ -142,11 +127,11 @@ public class GarbageCollectorThread extends BookieThread {
      * A scanner wrapper to check whether a ledger is alive in an entry log file
      */
     class CompactionScannerFactory implements EntryLogger.EntryLogListener {
-        List<Offset> offsets = new ArrayList<Offset>();
+        List<EntryLocation> offsets = new ArrayList<EntryLocation>();
 
         EntryLogScanner newScanner(final EntryLogMetadata meta) {
             final Throttler throttler = new Throttler (isThrottleByBytes,
-                                                       compactionRateByBytes, 
+                                                       compactionRateByBytes,
                                                        compactionRateByEntries);
 
             return new EntryLogScanner() {
@@ -168,7 +153,7 @@ public class GarbageCollectorThread extends BookieThread {
                         entry.rewind();
 
                         long newoffset = entryLogger.addEntry(ledgerId, entry);
-                        offsets.add(new Offset(ledgerId, entryId, newoffset));
+                        offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
                     }
                 }
             };
@@ -190,15 +175,15 @@ public class GarbageCollectorThread extends BookieThread {
                     return;
                 }
 
-                Offset lastOffset = offsets.get(offsets.size()-1);
-                long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
+                EntryLocation lastOffset = offsets.get(offsets.size()-1);
+                long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
                 while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) {
                     synchronized (flushLock) {
                         flushLock.wait(1000);
                     }
 
                     lastOffset = offsets.get(offsets.size()-1);
-                    lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
+                    lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
                 }
                 if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) {
                     throw new IOException("Shutdown before flushed");
@@ -208,16 +193,14 @@ public class GarbageCollectorThread extends BookieThread {
                 throw new IOException("Interrupted waiting for flush", ie);
             }
 
-            for (Offset o : offsets) {
-                ledgerCache.putEntryOffset(o.ledger, o.entry, o.offset);
-            }
+            ledgerStorage.updateEntriesLocations(offsets);
             offsets.clear();
         }
 
         synchronized void flush() throws IOException {
             waitEntrylogFlushed();
 
-            ledgerCache.flushLedger(true);
+            ledgerStorage.flushEntriesLocationsIndex();
         }
     }
 
@@ -230,16 +213,13 @@ public class GarbageCollectorThread extends BookieThread {
      * @throws IOException
      */
     public GarbageCollectorThread(ServerConfiguration conf,
-                                  final LedgerCache ledgerCache,
-                                  EntryLogger entryLogger,
-                                  SnapshotMap<Long, Boolean> activeLedgers,
-                                  LedgerManager ledgerManager)
+                                  LedgerManager ledgerManager,
+                                  final CompactableLedgerStorage ledgerStorage)
         throws IOException {
         super("GarbageCollectorThread");
 
-        this.ledgerCache = ledgerCache;
-        this.entryLogger = entryLogger;
-        this.activeLedgers = activeLedgers;
+        this.entryLogger = ledgerStorage.getEntryLogger();
+        this.ledgerStorage = ledgerStorage;
 
         this.gcWaitTime = conf.getGcWaitTime();
         this.isThrottleByBytes = conf.getIsThrottleByBytes();
@@ -256,14 +236,15 @@ public class GarbageCollectorThread extends BookieThread {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("delete ledger : " + ledgerId);
                     }
-                    ledgerCache.deleteLedger(ledgerId);
+
+                    ledgerStorage.deleteLedger(ledgerId);
                 } catch (IOException e) {
                     LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
                 }
             }
         };
 
-        this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, activeLedgers);
+        this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage);
 
         // compaction parameters
         minorCompactionThreshold = conf.getMinorCompactionThreshold();
@@ -333,7 +314,7 @@ public class GarbageCollectorThread extends BookieThread {
             LOG.info("Suspend Major Compaction triggered by thread: {}", Thread.currentThread().getName());
         }
     }
-    
+
     public void resumeMajorGC() {
         if (suspendMajorCompaction.compareAndSet(true, false)) {
             LOG.info("{} Major Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName());
@@ -345,7 +326,7 @@ public class GarbageCollectorThread extends BookieThread {
             LOG.info("Suspend Minor Compaction triggered by thread: {}", Thread.currentThread().getName());
         }
     }
-    
+
     public void resumeMinorGC() {
         if (suspendMinorCompaction.compareAndSet(true, false)) {
             LOG.info("{} Minor Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName());
@@ -389,7 +370,7 @@ public class GarbageCollectorThread extends BookieThread {
             }
 
             long curTime = MathUtils.now();
-            if (enableMajorCompaction && (!suspendMajor) && 
+            if (enableMajorCompaction && (!suspendMajor) &&
                 (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
                 // enter major compaction
                 LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
@@ -400,7 +381,7 @@ public class GarbageCollectorThread extends BookieThread {
                 continue;
             }
 
-            if (enableMinorCompaction && (!suspendMinor) && 
+            if (enableMinorCompaction && (!suspendMinor) &&
                 (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
                 // enter minor compaction
                 LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
@@ -428,8 +409,12 @@ public class GarbageCollectorThread extends BookieThread {
             EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
             for (Long entryLogLedger : meta.getLedgersMap().keySet()) {
                 // Remove the entry log ledger from the set if it isn't active.
-                if (!activeLedgers.containsKey(entryLogLedger)) {
-                    meta.removeLedger(entryLogLedger);
+                try {
+                    if (!ledgerStorage.ledgerExists(entryLogLedger)) {
+                        meta.removeLedger(entryLogLedger);
+                    }
+                } catch (IOException e) {
+                    LOG.error("Error reading from ledger storage", e);
                 }
             }
             if (meta.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 60b4099..573ad45 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -267,14 +267,7 @@ public class IndexPersistenceMgr {
     }
 
     boolean ledgerExists(long ledgerId) throws IOException {
-        FileInfo fi = fileInfoCache.get(ledgerId);
-        if (fi == null) {
-            File lf = findIndexFile(ledgerId);
-            if (lf == null) {
-                return false;
-            }
-        }
-        return true;
+        return activeLedgers.containsKey(ledgerId);
     }
 
     int getNumOpenLedgers() {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index eb27757..e85c8db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -25,17 +25,20 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 
 import java.util.concurrent.TimeUnit;
 
+import java.util.Map;
+import java.util.NavigableMap;
+
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
@@ -51,7 +54,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFF
  * This ledger storage implementation stores all entries in a single
  * file and maintains an index file for each ledger.
  */
-class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener {
+public class InterleavedLedgerStorage implements CompactableLedgerStorage, EntryLogListener {
     private final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
 
     // Hold the last checkpoint
@@ -77,7 +80,7 @@ class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener {
 
     EntryLogger entryLogger;
     LedgerCache ledgerCache;
-    private final CheckpointSource checkpointSource;
+    private CheckpointSource checkpointSource;
     protected final CheckpointHolder checkpointHolder = new CheckpointHolder();
 
     // A sorted map to stored all active ledger ids
@@ -86,32 +89,30 @@ class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener {
     // This is the thread that garbage collects the entry logs that do not
     // contain any active ledgers in them; and compacts the entry logs that
     // has lower remaining percentage to reclaim disk space.
-    final GarbageCollectorThread gcThread;
+    GarbageCollectorThread gcThread;
 
     // this indicates that a write has happened since the last flush
     private volatile boolean somethingWritten = false;
 
     // Expose Stats
-    private final OpStatsLogger getOffsetStats;
-    private final OpStatsLogger getEntryStats;
+    private OpStatsLogger getOffsetStats;
+    private OpStatsLogger getEntryStats;
 
-    InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
-                             LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource)
-            throws IOException {
-        this(conf, ledgerManager, ledgerDirsManager, ledgerDirsManager, checkpointSource, NullStatsLogger.INSTANCE);
+    InterleavedLedgerStorage() {
+        activeLedgers = new SnapshotMap<Long, Boolean>();
     }
 
-    InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
-                             LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                             CheckpointSource checkpointSource, StatsLogger statsLogger)
+    @Override
+    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
+                           LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                           CheckpointSource checkpointSource, StatsLogger statsLogger)
             throws IOException {
-        activeLedgers = new SnapshotMap<Long, Boolean>();
+
         this.checkpointSource = checkpointSource;
         entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
                 null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
-        gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
-                activeLedgers, ledgerManager);
+        gcThread = new GarbageCollectorThread(conf, ledgerManager, this);
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
         // Expose Stats
         getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
@@ -342,6 +343,45 @@ class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener {
     }
 
     @Override
+    public void deleteLedger(long ledgerId) throws IOException {
+        activeLedgers.remove(ledgerId);
+        ledgerCache.deleteLedger(ledgerId);
+    }
+
+    @Override
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) {
+        NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
+        Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot
+                .subMap(firstLedgerId, true, lastLedgerId, false);
+
+        return subBkActiveLedgers.keySet();
+    }
+
+    @Override
+    public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+        for (EntryLocation l : locations) {
+            try {
+                ledgerCache.putEntryOffset(l.ledger, l.entry, l.location);
+            } catch (NoLedgerException e) {
+                // Ledger was already deleted, we can skip it in the compaction
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Compaction failed for deleted ledger ledger: {} entry: {}", l.ledger, l.entry);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void flushEntriesLocationsIndex() throws IOException {
+        ledgerCache.flushLedger(true);
+    }
+
+    @Override
+    public EntryLogger getEntryLogger() {
+        return entryLogger;
+    }
+
+    @Override
     public BKMBeanInfo getJMXBean() {
         return ledgerCache.getJMXBean();
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index e992d03..f2f00c4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -25,13 +25,30 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
+import org.apache.bookkeeper.meta.LedgerManager;
 
 /**
  * Interface for storing ledger data
  * on persistant storage.
  */
-interface LedgerStorage {
+public interface LedgerStorage {
+
+    /**
+     * Initialize the LedgerStorage implementation
+     *
+     * @param conf
+     * @param ledgerManager
+     * @param ledgerDirsManager
+     */
+    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
+                           LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                           CheckpointSource checkpointSource, StatsLogger statsLogger)
+            throws IOException;
+
     /**
      * Start any background threads
      * belonging to the storage system. For example,
@@ -111,6 +128,13 @@ interface LedgerStorage {
      */
     Checkpoint checkpoint(Checkpoint checkpoint) throws IOException;
 
+    /*
+     *
+     * @param ledgerId
+     * @throws IOException
+     */
+    void deleteLedger(long ledgerId) throws IOException;
+
     /**
      * Get the JMX management bean for this LedgerStorage
      */

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageFactory.java
new file mode 100644
index 0000000..d2bac64
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageFactory.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.util.ReflectionUtils;
+
+public class LedgerStorageFactory {
+    public static LedgerStorage createLedgerStorage(String name) throws IOException {
+        try {
+            return ReflectionUtils.newInstance(name, LedgerStorage.class);
+        } catch (Throwable t) {
+            throw new IOException("Failed to instantiate ledger storage : " + name, t);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
index a2ce9e3..c0af6c3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
@@ -52,7 +52,7 @@ public class ReadOnlyEntryLogger extends EntryLogger {
     }
 
     @Override
-    synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+    public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
         throw new IOException("Can't add entry to a readonly entry logger.");
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
index d8a87e4..2b4e3c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
@@ -21,17 +21,17 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.util.Map;
-import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.Set;
 
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
-import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 /**
  * Garbage collector implementation using scan and compare.
  *
@@ -49,50 +49,49 @@ import org.slf4j.LoggerFactory;
 public class ScanAndCompareGarbageCollector implements GarbageCollector{
 
     static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class);
-    private SnapshotMap<Long, Boolean> activeLedgers;
-    private LedgerManager ledgerManager;
+    private final LedgerManager ledgerManager;
+    private final CompactableLedgerStorage ledgerStorage;
 
-    public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, SnapshotMap<Long, Boolean> activeLedgers) {
+    public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage) {
         this.ledgerManager = ledgerManager;
-        this.activeLedgers = activeLedgers;
+        this.ledgerStorage = ledgerStorage;
     }
 
     @Override
     public void gc(GarbageCleaner garbageCleaner) {
-        // create a snapshot first
-        NavigableMap<Long, Boolean> bkActiveLedgersSnapshot =
-                this.activeLedgers.snapshot();
-        LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges();
         try {
-            // Empty global active ledgers, need to remove all local active ledgers.
+            // Get a set of all ledgers on the bookie
+            NavigableSet<Long> bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+
+            // Iterate over all the ledger on the metadata store
+            LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges();
+
             if (!ledgerRangeIterator.hasNext()) {
-                for (Long bkLid : bkActiveLedgersSnapshot.keySet()) {
-                    // remove it from current active ledger
-                    bkActiveLedgersSnapshot.remove(bkLid);
-                    garbageCleaner.clean(bkLid);
+                // Empty global active ledgers, need to remove all local active ledgers.
+                for (long ledgerId : bkActiveLedgers) {
+                    garbageCleaner.clean(ledgerId);
                 }
             }
+
             long lastEnd = -1;
 
             while(ledgerRangeIterator.hasNext()) {
                 LedgerRange lRange = ledgerRangeIterator.next();
-                Map<Long, Boolean> subBkActiveLedgers = null;
 
                 Long start = lastEnd + 1;
                 Long end = lRange.end();
                 if (!ledgerRangeIterator.hasNext()) {
                     end = Long.MAX_VALUE;
                 }
-                subBkActiveLedgers = bkActiveLedgersSnapshot.subMap(
-                        start, true, end, true);
+
+                Iterable<Long> subBkActiveLedgers = bkActiveLedgers.subSet(start, true, end, true);
 
                 Set<Long> ledgersInMetadata = lRange.getLedgers();
-                LOG.debug("Active in metadata {}, Active in bookie {}",
-                          ledgersInMetadata, subBkActiveLedgers.keySet());
-                for (Long bkLid : subBkActiveLedgers.keySet()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Active in metadata {}, Active in bookie {}", ledgersInMetadata, subBkActiveLedgers);
+                }
+                for (Long bkLid : subBkActiveLedgers) {
                     if (!ledgersInMetadata.contains(bkLid)) {
-                        // remove it from current active ledger
-                        subBkActiveLedgers.remove(bkLid);
                         garbageCleaner.clean(bkLid);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index da14885..b67e01e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -39,14 +39,19 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
         implements LedgerStorage, CacheCallback, SkipListFlusher {
     private final static Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
 
-    private final EntryMemTable memTable;
-    private final ScheduledExecutorService scheduler;
-
-    public SortedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
-                               LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
-                               final CheckpointSource checkpointSource, StatsLogger statsLogger)
-                                       throws IOException {
-        super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, checkpointSource, statsLogger);
+    private EntryMemTable memTable;
+    private ScheduledExecutorService scheduler;
+
+    public SortedLedgerStorage() {
+        super();
+    }
+
+    @Override
+    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
+                           LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                           final CheckpointSource checkpointSource, StatsLogger statsLogger)
+            throws IOException {
+        super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, null, statsLogger);
         this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
         this.scheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder()

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d4305ca..76e5037 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -26,6 +26,8 @@ import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
 import org.apache.commons.lang.StringUtils;
 
 /**
@@ -118,6 +120,8 @@ public class ServerConfiguration extends AbstractConfiguration {
     protected final static String ENABLE_STATISTICS = "enableStatistics";
     protected final static String STATS_PROVIDER_CLASS = "statsProviderClass";
 
+    protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass";
+
     /**
      * Construct a default configuration object
      */
@@ -833,9 +837,9 @@ public class ServerConfiguration extends AbstractConfiguration {
      *
      * Force GC may get some space back, but may also fill up disk space more
      * quickly. This is because new log files are created before GC, while old
-     * garbage log files deleted after GC. 
+     * garbage log files deleted after GC.
      *
-     * @return true  - do force GC when disk full, 
+     * @return true  - do force GC when disk full,
      *         false - suspend GC when disk full.
      */
     public boolean getIsForceGCAllowWhenNoSpace() {
@@ -844,7 +848,7 @@ public class ServerConfiguration extends AbstractConfiguration {
 
     /**
      * Set whether force GC is allowed when disk full or almost full.
-     * 
+     *
      * @param force true to allow force GC; false to suspend GC
      *
      * @return ServerConfiguration
@@ -993,6 +997,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     /**
      * Set sorted-ledger storage enabled or not
      *
+     * @deprecated Use {@link #setLedgerStorageClass(String)} to configure the implementation class
      * @param enabled
      */
     public ServerConfiguration setSortedLedgerStorageEnabled(boolean enabled) {
@@ -1320,7 +1325,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     /**
      * Get whether use bytes to throttle garbage collector compaction or not
      *
-     * @return true  - use Bytes, 
+     * @return true  - use Bytes,
      *         false - use Entries.
      */
     public boolean getIsThrottleByBytes() {
@@ -1369,7 +1374,7 @@ public class ServerConfiguration extends AbstractConfiguration {
         setProperty(COMPACTION_MAX_OUTSTANDING_REQUESTS, maxOutstandingRequests);
         return this;
     }
-    
+
     /**
      * Get the rate of compaction adds. Default is 1,000.
      *
@@ -1457,6 +1462,39 @@ public class ServerConfiguration extends AbstractConfiguration {
         return this;
     }
 
+    /*
+     * Get the {@link LedgerStorage} implementation class name
+     *
+     * @return the class name
+     */
+    public String getLedgerStorageClass() {
+        String ledgerStorageClass = getString(LEDGER_STORAGE_CLASS, SortedLedgerStorage.class.getName());
+        if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName())
+                && getSortedLedgerStorageEnabled() == false) {
+            // This is to retain compatibility with BK-4.3 configuration
+            // In BK-4.3, the ledger storage is configured through the "sortedLedgerStorageEnabled" flag :
+            // sortedLedgerStorageEnabled == true (default) ---> use SortedLedgerStorage
+            // sortedLedgerStorageEnabled == false ---> use InterleavedLedgerStorage
+            //
+            // Since BK-4.4, one can specify the implementation class, but if it was using InterleavedLedgerStorage it
+            // should continue to use that with the same configuration
+            ledgerStorageClass = InterleavedLedgerStorage.class.getName();
+        }
+
+        return ledgerStorageClass;
+    }
+
+    /**
+     * Set the {@link LedgerStorage} implementation class name
+     *
+     * @param ledgerStorageClass the class name
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setLedgerStorageClass(String ledgerStorageClass) {
+        setProperty(LEDGER_STORAGE_CLASS, ledgerStorageClass);
+        return this;
+    }
+
     /**
      * Get whether bookie is using hostname for registration and in ledger
      * metadata. Defaults to false.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 16bd4da..4f3bb87 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -68,8 +69,8 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         return Arrays.asList(new Object[][] {{true}, {false}});
     }
 
-    private boolean isThrottleByBytes; 
-   
+    private boolean isThrottleByBytes;
+
     private final static Logger LOG = LoggerFactory.getLogger(CompactionTest.class);
     DigestType digestType;
 
@@ -118,7 +119,7 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         baseConf.setMinorCompactionInterval(minorCompactionInterval);
         baseConf.setMajorCompactionInterval(majorCompactionInterval);
         baseConf.setEntryLogFilePreAllocationEnabled(false);
-        baseConf.setSortedLedgerStorageEnabled(false);
+        baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         baseConf.setIsThrottleByBytes(this.isThrottleByBytes);
 
         super.setUp();
@@ -219,9 +220,10 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         for (File dir : dirManager.getAllLedgerDirs()) {
             Bookie.checkDirectoryStructure(dir);
         }
-        InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf,
-                        LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(),
-                        dirManager, cp);
+        InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
+        storage.initialize(conf,
+                LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(),
+                dirManager, dirManager, cp, NullStatsLogger.INSTANCE);
         storage.start();
         long startTime = MathUtils.now();
         Thread.sleep(2000);
@@ -424,8 +426,8 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         File log0 = new File(curDir, "0.log");
         LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs());
         assertFalse("Log shouldnt exist", log0.exists());
-        InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, manager,
-                                                                        dirs, checkpointSource);
+        InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
+        storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
         ledgers.add(1l);
         ledgers.add(2l);
         ledgers.add(3l);
@@ -443,7 +445,8 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         ledgers.remove(2l);
         ledgers.remove(3l);
 
-        storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource);
+        storage = new InterleavedLedgerStorage();
+        storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
         storage.start();
         for (int i = 0; i < 10; i++) {
             if (!log0.exists()) {
@@ -458,7 +461,8 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         storage.setMasterKey(4, KEY);
         storage.addEntry(genEntry(4, 1, ENTRY_SIZE)); // force ledger 1 page to flush
 
-        storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource);
+        storage = new InterleavedLedgerStorage();
+        storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
         storage.getEntry(1, 1); // entry should exist
     }
 
@@ -553,8 +557,8 @@ public class CompactionTest extends BookKeeperClusterTestCase {
                     boolean compact) throws IOException {
             }
         };
-        InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf,
-                manager, dirs, checkpointSource);
+        InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
+        storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
 
         double threshold = 0.1;
         // shouldn't throw exception
@@ -599,11 +603,12 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         for (File dir : dirManager.getAllLedgerDirs()) {
             Bookie.checkDirectoryStructure(dir);
         }
-        InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf,
-                        LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(),
-                        dirManager, cp);
+        InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
+        storage.initialize(conf,
+                LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(),
+                dirManager, dirManager, cp, NullStatsLogger.INSTANCE);
         storage.start();
-        
+
         // test suspend Major GC.
         Thread.sleep(conf.getMajorCompactionInterval() * 1000
                    + conf.getGcWaitTime());
@@ -613,7 +618,7 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         Thread.sleep(conf.getMajorCompactionInterval() * 1000
                    + conf.getGcWaitTime());
         assertTrue("major compaction triggered while set suspend",
-                storage.gcThread.lastMajorCompactionTime < startTime); 
+                storage.gcThread.lastMajorCompactionTime < startTime);
 
         // test suspend Minor GC.
         storage.gcThread.suspendMinorGC();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 19b6bcb..8812c90 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -321,7 +321,7 @@ public class LedgerCacheTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() })
             .setFlushInterval(1000)
             .setPageLimit(1)
-            .setSortedLedgerStorageEnabled(false);
+            .setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
 
         Bookie b = new Bookie(conf);
         b.start();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index d947dff..49c7e42 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -35,6 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -260,8 +262,20 @@ public class TestSyncThread {
                 throws IOException {
         }
     }
+
     private static class DummyLedgerStorage implements LedgerStorage {
         @Override
+        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
+                LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                CheckpointSource checkpointSource, StatsLogger statsLogger)
+                throws IOException {
+        }
+
+        @Override
+        public void deleteLedger(long ledgerId) throws IOException {
+        }
+
+        @Override
         public void start() {
         }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index de352b5..3875ec4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -22,11 +22,14 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
@@ -36,11 +39,21 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollector;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -160,15 +173,24 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         final CountDownLatch inGcProgress = new CountDownLatch(1);
         final CountDownLatch createLatch = new CountDownLatch(1);
         final CountDownLatch endLatch = new CountDownLatch(2);
-        final GarbageCollector garbageCollector =
-                new ScanAndCompareGarbageCollector(getLedgerManager(), activeLedgers);
+        final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
+        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
+                mockLedgerStorage);
         Thread gcThread = new Thread() {
             @Override
             public void run() {
                 garbageCollector.gc(new GarbageCollector.GarbageCleaner() {
                     boolean paused = false;
+
                     @Override
                     public void clean(long ledgerId) {
+                        try {
+                            mockLedgerStorage.deleteLedger(ledgerId);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                            return;
+                        }
+
                         if (!paused) {
                             inGcProgress.countDown();
                             try {
@@ -223,8 +245,8 @@ public class GcLedgersTest extends LedgerManagerTestCase {
 
         createLedgers(numLedgers, createdLedgers);
 
-        final GarbageCollector garbageCollector =
-                new ScanAndCompareGarbageCollector(getLedgerManager(), activeLedgers);
+        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
+                new MockLedgerStorage());
         GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
                 @Override
                 public void clean(long ledgerId) {
@@ -259,8 +281,8 @@ public class GcLedgersTest extends LedgerManagerTestCase {
 
         createLedgers(numLedgers, createdLedgers);
 
-        final GarbageCollector garbageCollector =
-                new ScanAndCompareGarbageCollector(getLedgerManager(), activeLedgers);
+        final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
+                new MockLedgerStorage());
         GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
                 @Override
                 public void clean(long ledgerId) {
@@ -287,4 +309,97 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         assertEquals("Should have cleaned something", 1, cleaned.size());
         assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.get(0));
     }
+
+    class MockLedgerStorage implements CompactableLedgerStorage {
+
+        @Override
+        public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
+                               LedgerDirsManager ledgerDirsManager,
+                               LedgerDirsManager indexDirsManager,
+                               CheckpointSource checkpointSource, StatsLogger statsLogger)
+                throws IOException {}
+
+        @Override
+        public void start() {
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException {
+        }
+
+        @Override
+        public boolean ledgerExists(long ledgerId) throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean setFenced(long ledgerId) throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean isFenced(long ledgerId) throws IOException {
+            return false;
+        }
+
+        @Override
+        public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        }
+
+        @Override
+        public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+            return null;
+        }
+
+        @Override
+        public long addEntry(ByteBuffer entry) throws IOException {
+            return 0;
+        }
+
+        @Override
+        public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void flush() throws IOException {
+        }
+
+        @Override
+        public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void deleteLedger(long ledgerId) throws IOException {
+            activeLedgers.remove(ledgerId);
+        }
+
+        @Override
+        public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) {
+            NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
+            Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot
+                    .subMap(firstLedgerId, true, lastLedgerId, false);
+
+            return subBkActiveLedgers.keySet();
+        }
+
+        @Override
+        public BKMBeanInfo getJMXBean() {
+            return null;
+        }
+
+        @Override
+        public EntryLogger getEntryLogger() {
+            return null;
+        }
+
+        @Override
+        public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+        }
+
+        @Override
+        public void flushEntriesLocationsIndex() throws IOException {
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java
index 914c7e2..161802f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.util.Enumeration;
 
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -42,7 +43,7 @@ public class ForceReadOnlyBookieTest extends BookKeeperClusterTestCase {
     private final static Logger LOG = LoggerFactory.getLogger(ForceReadOnlyBookieTest.class);
     public ForceReadOnlyBookieTest() {
         super(2);
-        baseConf.setSortedLedgerStorageEnabled(false);
+        baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         baseConf.setEntryLogFilePreAllocationEnabled(false);
     }
 
@@ -65,11 +66,11 @@ public class ForceReadOnlyBookieTest extends BookKeeperClusterTestCase {
         bsConfs.get(1).setForceReadOnlyBookie(true);
         restartBookies();
         Bookie bookie = bs.get(1).getBookie();
-        
+
         assertTrue("Bookie should be running and in readonly mode",
                 bookie.isRunning() && bookie.isReadOnly());
         LOG.info("successed force start ReadOnlyBookie");
- 
+
         // Check new bookie with readonly mode enabled.
         File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
         assertEquals("Only one ledger dir should be present", 1, ledgerDirs.length);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
index d45a2f3..0c1a187 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.test;
 import java.io.File;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -60,7 +62,7 @@ public class LedgerDeleteTest extends MultiLedgerManagerTestCase {
         baseConf.setEntryLogSizeLimit(2 * 1024 * 1024L);
         baseConf.setGcWaitTime(1000);
         baseConf.setEntryLogFilePreAllocationEnabled(false);
-        baseConf.setSortedLedgerStorageEnabled(false);
+        baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         super.setUp();
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index 124a420..8cf5618 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -24,13 +24,13 @@ import java.io.File;
 import java.util.Enumeration;
 
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.util.IOUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -42,7 +42,7 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
 
     public ReadOnlyBookieTest() {
         super(2);
-        baseConf.setSortedLedgerStorageEnabled(false);
+        baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         baseConf.setEntryLogFilePreAllocationEnabled(false);
     }
 


Mime
View raw message