bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-841: Bookie should calculate ledgers map writing a new entry log file
Date Tue, 23 Feb 2016 23:44:40 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 9bd9e061b -> 2c567d008


BOOKKEEPER-841: Bookie should calculate ledgers map writing a new entry log file

sijie I've addressed all comments from https://reviews.apache.org/r/33061

Author: Matteo Merli <mmerli@apache.org>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #5 from merlimat/bk-841


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/2c567d00
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/2c567d00
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/2c567d00

Branch: refs/heads/master
Commit: 2c567d008c644df5db8c441fb9aaf135ed36db95
Parents: 9bd9e06
Author: Matteo Merli <mmerli@apache.org>
Authored: Tue Feb 23 15:44:31 2016 -0800
Committer: Matteo Merli <mmerli@apache.org>
Committed: Tue Feb 23 15:44:31 2016 -0800

----------------------------------------------------------------------
 .../bookkeeper/bookie/EntryLogMetadata.java     | 101 ++++++++
 .../apache/bookkeeper/bookie/EntryLogger.java   | 256 ++++++++++++++++++-
 .../bookie/GarbageCollectorThread.java          | 110 +-------
 .../apache/bookkeeper/bookie/EntryLogTest.java  | 100 +++++++-
 4 files changed, 456 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/2c567d00/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
new file mode 100644
index 0000000..461736c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Records the total size, remaining size and the set of ledgers that comprise a entry log.
+ */
+public class EntryLogMetadata {
+    private final long entryLogId;
+    private long totalSize;
+    private long remainingSize;
+    private ConcurrentHashMap<Long, Long> ledgersMap;
+
+    public EntryLogMetadata(long logId) {
+        this.entryLogId = logId;
+
+        totalSize = remainingSize = 0;
+        ledgersMap = new ConcurrentHashMap<Long, Long>();
+    }
+
+    public void addLedgerSize(long ledgerId, long size) {
+        totalSize += size;
+        remainingSize += size;
+        Long ledgerSize = ledgersMap.get(ledgerId);
+        if (null == ledgerSize) {
+            ledgerSize = 0L;
+        }
+        ledgerSize += size;
+        ledgersMap.put(ledgerId, ledgerSize);
+    }
+
+    public void removeLedger(long ledgerId) {
+        Long size = ledgersMap.remove(ledgerId);
+        if (null == size) {
+            return;
+        }
+        remainingSize -= size;
+    }
+
+    public boolean containsLedger(long ledgerId) {
+        return ledgersMap.containsKey(ledgerId);
+    }
+
+    public double getUsage() {
+        if (totalSize == 0L) {
+            return 0.0f;
+        }
+        return (double) remainingSize / totalSize;
+    }
+
+    public boolean isEmpty() {
+        return ledgersMap.isEmpty();
+    }
+
+    public long getEntryLogId() {
+        return entryLogId;
+    }
+
+    public long getTotalSize() {
+        return totalSize;
+    }
+
+    public long getRemainingSize() {
+        return remainingSize;
+    }
+
+    Map<Long, Long> getLedgersMap() {
+        return ledgersMap;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ totalSize = ").append(totalSize).append(", remainingSize = ").append(remainingSize)
+                .append(", ledgersMap = ").append(ledgersMap).append(" }");
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/2c567d00/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 1030c80..a970a96 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -39,9 +39,11 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -74,14 +76,25 @@ public class EntryLogger {
 
     private static class BufferedLogChannel extends BufferedChannel {
         final private long logId;
+        private final EntryLogMetadata entryLogMetada;
+
         public BufferedLogChannel(FileChannel fc, int writeCapacity,
                                   int readCapacity, long logId) throws IOException {
             super(fc, writeCapacity, readCapacity);
             this.logId = logId;
+            this.entryLogMetada = new EntryLogMetadata(logId);
         }
         public long getLogId() {
             return logId;
         }
+
+        public void registerWrittenEntry(long ledgerId, long entrySize) {
+            entryLogMetada.addLedgerSize(ledgerId, entrySize);
+        }
+
+        public Map<Long, Long> getLedgersMap() {
+            return entryLogMetada.getLedgersMap();
+        }
     }
 
     volatile File currentDir;
@@ -101,14 +114,60 @@ public class EntryLogger {
     private final CopyOnWriteArrayList<EntryLogListener> listeners
         = new CopyOnWriteArrayList<EntryLogListener>();
 
+    private static final int HEADER_V0 = 0; // Old log file format (no ledgers map index)
+    private static final int HEADER_V1 = 1; // Introduced ledger map index
+    private static final int HEADER_CURRENT_VERSION = HEADER_V1;
+
+    private static class Header {
+        final int version;
+        final long ledgersMapOffset;
+        final int ledgersCount;
+
+        Header(int version, long ledgersMapOffset, int ledgersCount) {
+            this.version = version;
+            this.ledgersMapOffset = ledgersMapOffset;
+            this.ledgersCount = ledgersCount;
+        }
+    }
+
     /**
      * The 1K block at the head of the entry logger file
-     * that contains the fingerprint and (future) meta-data
+     * that contains the fingerprint and meta-data.
+     *
+     * Header is composed of:
+     * Fingerprint: 4 bytes "BKLO"
+     * Log file HeaderVersion enum: 4 bytes
+     * Ledger map offset: 8 bytes
+     * Ledgers Count: 4 bytes
      */
     final static int LOGFILE_HEADER_SIZE = 1024;
     final ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
+
+    final static int HEADER_VERSION_POSITION = 4;
+    final static int LEDGERS_MAP_OFFSET_POSITION = HEADER_VERSION_POSITION + 4;
+
+    /**
+     * Ledgers map is composed of multiple parts that can be split into separated entries.
Each of them is composed of:
+     *
+     * <pre>
+     * length: (4 bytes) [0-3]
+     * ledger id (-1): (8 bytes) [4 - 11]
+     * entry id: (8 bytes) [12-19]
+     * num ledgers stored in current metadata entry: (4 bytes) [20 - 23]
+     * ledger entries: sequence of (ledgerid, size) (8 + 8 bytes each) [24..]
+     * </pre>
+     */
+    final static int LEDGERS_MAP_HEADER_SIZE = 4 + 8 + 8 + 4;
+    final static int LEDGERS_MAP_ENTRY_SIZE = 8 + 8;
+
+    // Break the ledgers map into multiple batches, each of which can contain up to 10K ledgers
+    final static int LEDGERS_MAP_MAX_BATCH_SIZE = 10000;
+
     final static long INVALID_LID = -1L;
 
+    // EntryId used to mark an entry (belonging to INVALID_ID) as a component of the serialized
ledgers map
+    final static long LEDGERS_MAP_ENTRY_ID = -2L;
+
     final static int MIN_SANE_ENTRY_SIZE = 8 + 8;
     final static long MB = 1024 * 1024;
 
@@ -177,6 +236,7 @@ public class EntryLogger {
         // so there can be race conditions when entry logs are rolled over and
         // this header buffer is cleared before writing it into the new logChannel.
         LOGFILE_HEADER.put("BKLO".getBytes(UTF_8));
+        LOGFILE_HEADER.putInt(HEADER_CURRENT_VERSION);
 
         // Find the largest logId
         long logId = INVALID_LID;
@@ -370,9 +430,14 @@ public class EntryLogger {
             if (null == logChannelsToFlush) {
                 logChannelsToFlush = new LinkedList<BufferedLogChannel>();
             }
+
             // flush the internal buffer back to filesystem but not sync disk
             // so the readers could access the data from filesystem.
             logChannel.flush(false);
+
+            // Append ledgers map at the end of entry log
+            appendLedgersMap(logChannel);
+
             BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
             logChannelsToFlush.add(logChannel);
             LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry
loggers : {}.",
@@ -387,6 +452,55 @@ public class EntryLogger {
     }
 
     /**
+     * Append the ledger map at the end of the entry log.
+     * Updates the entry log file header with the offset and size of the map.
+     */
+    private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException
{
+        long ledgerMapOffset = entryLogChannel.position();
+
+        Map<Long, Long> ledgersMap = entryLogChannel.getLedgersMap();
+
+        Iterator<Entry<Long, Long>> iterator = ledgersMap.entrySet().iterator();
+        int numberOfLedgers = ledgersMap.size();
+        int remainingLedgers = numberOfLedgers;
+
+        // Write the ledgers map into several batches
+        while (iterator.hasNext()) {
+            // Start new batch
+            int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
+            int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
+            ByteBuffer serializedMap = ByteBuffer.allocate(ledgerMapSize);
+
+            serializedMap.putInt(ledgerMapSize - 4);
+            serializedMap.putLong(INVALID_LID);
+            serializedMap.putLong(LEDGERS_MAP_ENTRY_ID);
+            serializedMap.putInt(batchSize);
+
+            // Dump all ledgers for this batch
+            for (int i = 0; i < batchSize; i++) {
+                Entry<Long, Long> entry = iterator.next();
+                long ledgerId = entry.getKey();
+                long size = entry.getValue();
+
+                serializedMap.putLong(ledgerId);
+                serializedMap.putLong(size);
+                --remainingLedgers;
+            }
+
+            // Close current batch
+            serializedMap.flip();
+            entryLogChannel.fileChannel.write(serializedMap);
+        }
+
+        // Update the headers with the map offset and count of ledgers
+        ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
+        mapInfo.putLong(ledgerMapOffset);
+        mapInfo.putInt(numberOfLedgers);
+        mapInfo.flip();
+        entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
+    }
+
+    /**
      * An allocator pre-allocates entry log files.
      */
     class EntryLoggerAllocator {
@@ -642,12 +756,14 @@ public class EntryLogger {
                 shouldCreateNewEntryLog.set(false);
             }
         }
+
         ByteBuffer buff = ByteBuffer.allocate(4);
         buff.putInt(entry.remaining());
         buff.flip();
         logChannel.write(buff);
         long pos = logChannel.position();
         logChannel.write(entry);
+        logChannel.registerWrittenEntry(ledger, entrySize);
 
         return (logChannel.getLogId() << 32L) | pos;
     }
@@ -721,6 +837,30 @@ public class EntryLogger {
         return data;
     }
 
+    /**
+     * Read the header of an entry log
+     */
+    private Header getHeaderForLogId(long entryLogId) throws IOException {
+        BufferedReadChannel bc = getChannelForLogId(entryLogId);
+
+        // Allocate buffer to read (version, ledgersMapOffset, ledgerCount)
+        ByteBuffer headers = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
+        bc.read(headers, 0);
+        headers.flip();
+
+        // Skip marker string "BKLO"
+        headers.getInt();
+
+        int headerVersion = headers.getInt();
+        if (headerVersion < HEADER_V0 || headerVersion > HEADER_CURRENT_VERSION) {
+            LOG.info("Unknown entry log header version for log {}: {}", entryLogId, headerVersion);
+        }
+
+        long ledgersMapOffset = headers.getLong();
+        int ledgersCount = headers.getInt();
+        return new Header(headerVersion, ledgersMapOffset, ledgersCount);
+    }
+
     private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
         BufferedReadChannel fc = getFromChannels(entryLogId);
         if (fc != null) {
@@ -788,6 +928,7 @@ public class EntryLogger {
         // Start the read position in the current entry log file to be after
         // the header where all of the ledger entries are.
         long pos = LOGFILE_HEADER_SIZE;
+
         // Read through the entry log file and extract the ledger ID's.
         while (true) {
             // Check if we've finished reading the entry log file.
@@ -812,7 +953,7 @@ public class EntryLogger {
             lidBuff.flip();
             long lid = lidBuff.getLong();
             lidBuff.clear();
-            if (!scanner.accept(lid)) {
+            if (lid == INVALID_LID || !scanner.accept(lid)) {
                 // skip this entry
                 pos += entrySize;
                 continue;
@@ -834,6 +975,117 @@ public class EntryLogger {
         }
     }
 
+    public EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException {
+        // First try to extract the EntryLogMetada from the index, if there's no index then
fallback to scanning the
+        // entry log
+        try {
+            return extractEntryLogMetadataFromIndex(entryLogId);
+        } catch (Exception e) {
+            LOG.info("Failed to get ledgers map index from: {}.log : {}", entryLogId, e.getMessage());
+
+            // Fall-back to scanning
+            return extractEntryLogMetadataByScanning(entryLogId);
+        }
+    }
+
+    EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOException
{
+        Header header = getHeaderForLogId(entryLogId);
+
+        if (header.version < HEADER_V1) {
+            throw new IOException("Old log file header without ledgers map on entryLogId
" + entryLogId);
+        }
+
+        if (header.ledgersMapOffset == 0L) {
+            // The index was not stored in the log file (possibly because the bookie crashed
before flushing it)
+            throw new IOException("No ledgers map index found on entryLogId" + entryLogId);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovering ledgers maps for log {} at offset: {}", entryLogId, header.ledgersMapOffset);
+        }
+
+        BufferedReadChannel bc = getChannelForLogId(entryLogId);
+
+        // There can be multiple entries containing the various components of the serialized
ledgers map
+        long offset = header.ledgersMapOffset;
+        EntryLogMetadata meta = new EntryLogMetadata(entryLogId);
+
+        while (offset < bc.size()) {
+            // Read ledgers map size
+            ByteBuffer sizeBuf = ByteBuffer.allocate(4);
+            bc.read(sizeBuf, offset);
+            sizeBuf.flip();
+
+            int ledgersMapSize = sizeBuf.getInt();
+
+            // Read the index into a buffer
+            ByteBuffer ledgersMapBuffer = ByteBuffer.allocate(ledgersMapSize);
+            bc.read(ledgersMapBuffer, offset + 4);
+            ledgersMapBuffer.flip();
+
+            // Discard ledgerId and entryId
+            long lid = ledgersMapBuffer.getLong();
+            if (lid != INVALID_LID) {
+                throw new IOException("Cannot deserialize ledgers map from ledger " + lid
+ " -- entryLogId: " + entryLogId);
+            }
+
+            long entryId = ledgersMapBuffer.getLong();
+            if (entryId != LEDGERS_MAP_ENTRY_ID) {
+                throw new IOException("Cannot deserialize ledgers map from ledger " + lid
+ ":" + entryId + " -- entryLogId: " + entryLogId);
+            }
+
+            // Read the number of ledgers in the current entry batch
+            int ledgersCount = ledgersMapBuffer.getInt();
+
+            // Extract all (ledger,size) tuples from buffer
+            for (int i = 0; i < ledgersCount; i++) {
+                long ledgerId = ledgersMapBuffer.getLong();
+                long size = ledgersMapBuffer.getLong();
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Recovering ledgers maps for log {} -- Found ledger: {} with
size: {}",
+                            new Object[] { entryLogId, ledgerId, size });
+                }
+                meta.addLedgerSize(ledgerId, size);
+            }
+
+            if (ledgersMapBuffer.hasRemaining()) {
+                throw new IOException("Invalid entry size when reading ledgers map on entryLogId:
" + entryLogId);
+            }
+
+            // Move to next entry, if any
+            offset += ledgersMapSize + 4;
+        }
+
+        if (meta.getLedgersMap().size() != header.ledgersCount) {
+            throw new IOException("Not all ledgers were found in ledgers map index. expected:
" + header.ledgersCount
+                    + " -- found: " + meta.getLedgersMap().size() + " -- entryLogId: " +
entryLogId);
+        }
+
+        return meta;
+    }
+
+    private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId) throws IOException
{
+        final EntryLogMetadata meta = new EntryLogMetadata(entryLogId);
+
+        // Read through the entry log file and extract the entry log meta
+        scanEntryLog(entryLogId, new EntryLogScanner() {
+            @Override
+            public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException
{
+                // add new entry size of a ledger to entry log meta
+                meta.addLedgerSize(ledgerId, entry.limit() + 4);
+            }
+
+            @Override
+            public boolean accept(long ledgerId) {
+                return true;
+            }
+        });
+
+        LOG.debug("Retrieved entry log meta data entryLogId: {}, meta: {}", entryLogId, meta);
+        return meta;
+    }
+
     /**
      * Shutdown method to gracefully stop entry logger.
      */

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/2c567d00/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 1ca43e0..1c9c7e7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -426,7 +426,7 @@ public class GarbageCollectorThread extends BookieThread {
         // Loop through all of the entry logs and remove the non-active ledgers.
         for (Long entryLogId : entryLogMetaMap.keySet()) {
             EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
-            for (Long entryLogLedger : meta.ledgersMap.keySet()) {
+            for (Long entryLogLedger : meta.getLedgersMap().keySet()) {
                 // Remove the entry log ledger from the set if it isn't active.
                 if (!activeLedgers.containsKey(entryLogLedger)) {
                     meta.removeLedger(entryLogLedger);
@@ -457,8 +457,8 @@ public class GarbageCollectorThread extends BookieThread {
         Comparator<EntryLogMetadata> sizeComparator = new Comparator<EntryLogMetadata>()
{
             @Override
             public int compare(EntryLogMetadata m1, EntryLogMetadata m2) {
-                long unusedSize1 = m1.totalSize - m1.remainingSize;
-                long unusedSize2 = m2.totalSize - m2.remainingSize;
+                long unusedSize1 = m1.getTotalSize() - m1.getRemainingSize();
+                long unusedSize2 = m2.getTotalSize() - m2.getRemainingSize();
                 if (unusedSize1 > unusedSize2) {
                     return -1;
                 } else if (unusedSize1 < unusedSize2) {
@@ -477,10 +477,13 @@ public class GarbageCollectorThread extends BookieThread {
             if (meta.getUsage() >= threshold) {
                 break;
             }
-            LOG.debug("Compacting entry log {} below threshold {}.", meta.entryLogId, threshold);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Compacting entry log {} below threshold {}.", meta.getEntryLogId(),
threshold);
+            }
             try {
                 compactEntryLog(scannerFactory, meta);
-                toRemove.add(meta.entryLogId);
+                toRemove.add(meta.getEntryLogId());
             } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
                 LOG.warn("No writable ledger directory available, aborting compaction", nwlde);
                 break;
@@ -557,10 +560,10 @@ public class GarbageCollectorThread extends BookieThread {
             return;
         }
 
-        LOG.info("Compacting entry log : {}", entryLogMeta.entryLogId);
+        LOG.info("Compacting entry log : {}", entryLogMeta.getEntryLogId());
 
         try {
-            entryLogger.scanEntryLog(entryLogMeta.entryLogId,
+            entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(),
                                      scannerFactory.newScanner(entryLogMeta));
         } finally {
             // clear compacting flag
@@ -569,86 +572,6 @@ public class GarbageCollectorThread extends BookieThread {
     }
 
     /**
-     * Records the total size, remaining size and the set of ledgers that comprise a entry
log.
-     */
-    static class EntryLogMetadata {
-        long entryLogId;
-        long totalSize;
-        long remainingSize;
-        ConcurrentHashMap<Long, Long> ledgersMap;
-
-        public EntryLogMetadata(long logId) {
-            this.entryLogId = logId;
-
-            totalSize = remainingSize = 0;
-            ledgersMap = new ConcurrentHashMap<Long, Long>();
-        }
-
-        public void addLedgerSize(long ledgerId, long size) {
-            totalSize += size;
-            remainingSize += size;
-            Long ledgerSize = ledgersMap.get(ledgerId);
-            if (null == ledgerSize) {
-                ledgerSize = 0L;
-            }
-            ledgerSize += size;
-            ledgersMap.put(ledgerId, ledgerSize);
-        }
-
-        public void removeLedger(long ledgerId) {
-            Long size = ledgersMap.remove(ledgerId);
-            if (null == size) {
-                return;
-            }
-            remainingSize -= size;
-        }
-
-        public boolean containsLedger(long ledgerId) {
-            return ledgersMap.containsKey(ledgerId);
-        }
-
-        public double getUsage() {
-            if (totalSize == 0L) {
-                return 0.0f;
-            }
-            return (double)remainingSize / totalSize;
-        }
-
-        public boolean isEmpty() {
-            return ledgersMap.isEmpty();
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("{ totalSize = ").append(totalSize).append(", remainingSize = ")
-              .append(remainingSize).append(", ledgersMap = ").append(ledgersMap).append("
}");
-            return sb.toString();
-        }
-    }
-
-    /**
-     * A scanner used to extract entry log meta from entry log files.
-     */
-    static class ExtractionScanner implements EntryLogScanner {
-        EntryLogMetadata meta;
-
-        public ExtractionScanner(EntryLogMetadata meta) {
-            this.meta = meta;
-        }
-
-        @Override
-        public boolean accept(long ledgerId) {
-            return true;
-        }
-        @Override
-        public void process(long ledgerId, long offset, ByteBuffer entry) {
-            // add new entry size of a ledger to entry log meta
-            meta.addLedgerSize(ledgerId, entry.limit() + 4);
-        }
-    }
-
-    /**
      * Method to read in all of the entry logs (those that we haven't done so yet),
      * and find the set of ledger ID's that make up each entry log file.
      *
@@ -678,7 +601,7 @@ public class GarbageCollectorThread extends BookieThread {
 
             try {
                 // Read through the entry log file and extract the entry log meta
-                EntryLogMetadata entryLogMeta = extractMetaFromEntryLog(entryLogger, entryLogId);
+                EntryLogMetadata entryLogMeta = entryLogger.getEntryLogMetadata(entryLogId);
                 entryLogMetaMap.put(entryLogId, entryLogMeta);
             } catch (IOException e) {
                 hasExceptionWhenScan = true;
@@ -695,15 +618,4 @@ public class GarbageCollectorThread extends BookieThread {
         }
         return entryLogMetaMap;
     }
-
-    static EntryLogMetadata extractMetaFromEntryLog(EntryLogger entryLogger, long entryLogId)
-            throws IOException {
-        EntryLogMetadata entryLogMeta = new EntryLogMetadata(entryLogId);
-        ExtractionScanner scanner = new ExtractionScanner(entryLogMeta);
-        // Read through the entry log file and extract the entry log meta
-        entryLogger.scanEntryLog(entryLogId, scanner);
-        LOG.debug("Retrieved entry log meta data entryLogId: {}, meta: {}",
-                  entryLogId, entryLogMeta);
-        return entryLogMeta;
-    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/2c567d00/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 6b0ecd8..4e1004c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -28,8 +28,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.bookkeeper.bookie.GarbageCollectorThread.EntryLogMetadata;
-import org.apache.bookkeeper.bookie.GarbageCollectorThread.ExtractionScanner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
@@ -86,15 +84,11 @@ public class EntryLogTest {
         // now see which ledgers are in the log
         logger = new EntryLogger(conf, bookie.getLedgerDirsManager());
 
-        EntryLogMetadata meta = new EntryLogMetadata(0L);
-        ExtractionScanner scanner = new ExtractionScanner(meta);
-
-        logger.scanEntryLog(0L, scanner);
-
+        EntryLogMetadata meta = logger.getEntryLogMetadata(0L);
         LOG.info("Extracted Meta From Entry Log {}", meta);
-        assertNotNull(meta.ledgersMap.get(1L));
-        assertNull(meta.ledgersMap.get(2L));
-        assertNotNull(meta.ledgersMap.get(3L));
+        assertNotNull(meta.getLedgersMap().get(1L));
+        assertNull(meta.getLedgersMap().get(2L));
+        assertNotNull(meta.getLedgersMap().get(3L));
     }
 
     private ByteBuffer generateEntry(long ledger, long entry) {
@@ -220,4 +214,90 @@ public class EntryLogTest {
         Assert.assertTrue(0 == generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1)));
     }
 
+    /**
+     * Explicitely try to recover using the ledgers map index at the end of the entry log
+     */
+    @Test(timeout=60000)
+    public void testRecoverFromLedgersMap() throws Exception {
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(new String[] {tmpDir.toString()});
+        Bookie bookie = new Bookie(conf);
+
+        // create some entries
+        EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
+        logger.addEntry(1, generateEntry(1, 1));
+        logger.addEntry(3, generateEntry(3, 1));
+        logger.addEntry(2, generateEntry(2, 1));
+        logger.addEntry(1, generateEntry(1, 2));
+        logger.rollLog();
+        logger.flushRotatedLogs();
+
+        EntryLogMetadata meta = logger.extractEntryLogMetadataFromIndex(0L);
+        LOG.info("Extracted Meta From Entry Log {}", meta);
+        assertEquals(60, meta.getLedgersMap().get(1L).longValue());
+        assertEquals(30, meta.getLedgersMap().get(2L).longValue());
+        assertEquals(30, meta.getLedgersMap().get(3L).longValue());
+        assertNull(meta.getLedgersMap().get(4L));
+        assertEquals(120, meta.getTotalSize());
+        assertEquals(120, meta.getRemainingSize());
+    }
+
+    /**
+     * Explicitely try to recover using the ledgers map index at the end of the entry log
+     */
+    @Test(timeout = 60000)
+    public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception {
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        // create some entries
+        EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
+        logger.addEntry(1, generateEntry(1, 1));
+        logger.addEntry(3, generateEntry(3, 1));
+        logger.addEntry(2, generateEntry(2, 1));
+        logger.addEntry(1, generateEntry(1, 2));
+        logger.rollLog();
+
+        // Rewrite the entry log header to be on V0 format
+        File f = new File(curDir, "0.log");
+        RandomAccessFile raf = new RandomAccessFile(f, "rw");
+        raf.seek(EntryLogger.HEADER_VERSION_POSITION);
+        // Write zeros to indicate V0 + no ledgers map info
+        raf.write(new byte[4 + 8]);
+        raf.close();
+
+        // now see which ledgers are in the log
+        logger = new EntryLogger(conf, bookie.getLedgerDirsManager());
+
+        try {
+            logger.extractEntryLogMetadataFromIndex(0L);
+            fail("Should not be possible to recover from ledgers map index");
+        } catch (IOException e) {
+            // Ok
+        }
+
+        // Public method should succeed by falling back to scanning the file
+        EntryLogMetadata meta = logger.getEntryLogMetadata(0L);
+        LOG.info("Extracted Meta From Entry Log {}", meta);
+        assertEquals(60, meta.getLedgersMap().get(1L).longValue());
+        assertEquals(30, meta.getLedgersMap().get(2L).longValue());
+        assertEquals(30, meta.getLedgersMap().get(3L).longValue());
+        assertNull(meta.getLedgersMap().get(4L));
+        assertEquals(120, meta.getTotalSize());
+        assertEquals(120, meta.getRemainingSize());
+    }
+
 }


Mime
View raw message