zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1302851 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/
Date Tue, 20 Mar 2012 11:51:04 GMT
Author: ivank
Date: Tue Mar 20 11:51:03 2012
New Revision: 1302851

URL: http://svn.apache.org/viewvc?rev=1302851&view=rev
Log:
BOOKKEEPER-187: Create well defined interface for LedgerCache (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
      - copied, changed from r1302458, zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Mar 20 11:51:03 2012
@@ -90,6 +90,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-160: bookie server needs to do compaction over entry log files to reclaim
disk space (sijie via ivank)
 
+        BOOKKEEPER-187: Create well defined interface for LedgerCache (ivank)
+
       hedwig-server/
 
         BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
Tue Mar 20 11:51:03 2012
@@ -400,8 +400,9 @@ public class Bookie extends Thread {
 
         syncThread = new SyncThread(conf);
         entryLogger = new EntryLogger(conf);
-        ledgerCache = new LedgerCache(conf, ledgerManager);
+        ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
         gcThread = new GarbageCollectorThread(conf, this.zk, ledgerCache, entryLogger,
+                                              ledgerManager,
                                               new EntryLogCompactionScanner());
         // replay journals
         readJournal();
@@ -565,7 +566,7 @@ public class Bookie extends Thread {
             BKMBeanRegistry.getInstance().register(jmxBookieBean, parent);
 
             try {
-                jmxLedgerCacheBean = new LedgerCacheBean(this.ledgerCache);
+                jmxLedgerCacheBean = this.ledgerCache.getJMXBean();
                 BKMBeanRegistry.getInstance().register(jmxLedgerCacheBean, jmxBookieBean);
             } catch (Exception e) {
                 LOG.warn("Failed to register with JMX for ledger cache", e);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
Tue Mar 20 11:51:03 2012
@@ -74,6 +74,8 @@ public class GarbageCollectorThread exte
     // Ledger Cache Handle
     final LedgerCache ledgerCache;
 
+    final LedgerManager ledgerManager;
+
     // ZooKeeper Client
     final ZooKeeper zk;
 
@@ -116,6 +118,7 @@ public class GarbageCollectorThread exte
                                   ZooKeeper zookeeper,
                                   LedgerCache ledgerCache,
                                   EntryLogger entryLogger,
+                                  LedgerManager ledgerManager,
                                   EntryLogScanner scanner)
         throws IOException {
         super("GarbageCollectorThread");
@@ -123,6 +126,7 @@ public class GarbageCollectorThread exte
         this.zk = zookeeper;
         this.ledgerCache = ledgerCache;
         this.entryLogger = entryLogger;
+        this.ledgerManager = ledgerManager;
         this.scanner = scanner;
 
         this.gcWaitTime = conf.getGcWaitTime();
@@ -231,7 +235,7 @@ public class GarbageCollectorThread exte
      * Do garbage collection ledger index files
      */
     private void doGcLedgers() {
-        ledgerCache.activeLedgerManager.garbageCollectLedgers(
+        ledgerManager.garbageCollectLedgers(
         new LedgerManager.GarbageCollector() {
             @Override
             public void gc(long ledgerId) {
@@ -253,7 +257,7 @@ public class GarbageCollectorThread exte
             EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
             for (Long entryLogLedger : meta.ledgersMap.keySet()) {
                 // Remove the entry log ledger from the set if it isn't active.
-                if (!ledgerCache.activeLedgerManager.containsActiveLedger(entryLogLedger))
{
+                if (!ledgerManager.containsActiveLedger(entryLogLedger)) {
                     meta.removeLedger(entryLogLedger);
                 }
             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
Tue Mar 20 11:51:03 2012
@@ -21,556 +21,25 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class maps a ledger entry number into a location (entrylogid, offset) in
  * an entry log file. It does user level caching to more efficiently manage disk
  * head scheduling.
  */
-public class LedgerCache {
-    private final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
-
-    final File ledgerDirectories[];
-
-    public LedgerCache(ServerConfiguration conf, LedgerManager alm) {
-        this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
-        this.openFileLimit = conf.getOpenFileLimit();
-        this.pageSize = conf.getPageSize();
-        this.entriesPerPage = pageSize / 8;
-        
-        if (conf.getPageLimit() <= 0) {
-            // allocate half of the memory to the page cache
-            this.pageLimit = (int)((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
-        } else {
-            this.pageLimit = conf.getPageLimit();
-        }
-        LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
-        LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit
is " + pageLimit);
-        activeLedgerManager = alm;
-        // Retrieve all of the active ledgers.
-        getActiveLedgers();
-    }
-    /**
-     * the list of potentially clean ledgers
-     */
-    LinkedList<Long> cleanLedgers = new LinkedList<Long>();
-
-    /**
-     * the list of potentially dirty ledgers
-     */
-    LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
-
-    HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
-
-    LinkedList<Long> openLedgers = new LinkedList<Long>();
-
-    // Manage all active ledgers in LedgerManager
-    // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
-    final LedgerManager activeLedgerManager;
-
-    final int openFileLimit;
-    final int pageSize;
-    final int pageLimit;
-    final int entriesPerPage;
-
-    /**
-     * @return page size used in ledger cache
-     */
-    public int getPageSize() {
-        return pageSize;
-    }
-
-    /**
-     * @return entries per page used in ledger cache
-     */
-    public int getEntriesPerPage() {
-        return entriesPerPage;
-    }
-
-    /**
-     * @return page limitation in ledger cache
-     */
-    public int getPageLimit() {
-        return pageLimit;
-    }
-
-    // The number of pages that have actually been used
-    private int pageCount = 0;
-    HashMap<Long, HashMap<Long,LedgerEntryPage>> pages = new HashMap<Long,
HashMap<Long,LedgerEntryPage>>();
-
-    /**
-     * @return number of page used in ledger cache
-     */
-    public int getNumUsedPages() {
-        return pageCount;
-    }
-
-    private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table,
LedgerEntryPage lep) {
-        HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
-        if (map == null) {
-            map = new HashMap<Long, LedgerEntryPage>();
-            table.put(lep.getLedger(), map);
-        }
-        map.put(lep.getFirstEntry(), lep);
-    }
-
-    private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>>
table, Long ledger, Long firstEntry) {
-        HashMap<Long, LedgerEntryPage> map = table.get(ledger);
-        if (map != null) {
-            return map.get(firstEntry);
-        }
-        return null;
-    }
-
-    synchronized private LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry,
boolean onlyDirty) {
-        LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
-        try {
-            if (onlyDirty && lep.isClean()) {
-                return null;
-            }
-            return lep;
-        } finally {
-            if (lep != null) {
-                lep.usePage();
-            }
-        }
-    }
-
-    public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
-        int offsetInPage = (int) (entry % entriesPerPage);
-        // find the id of the first entry of the page that has the entry
-        // we are looking for
-        long pageEntry = entry-offsetInPage;
-        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
-        if (lep == null) {
-            // find a free page
-            lep = grabCleanPage(ledger, pageEntry);
-            updatePage(lep);
-            synchronized(this) {
-                putIntoTable(pages, lep);
-            }
-        }
-        if (lep != null) {
-            lep.setOffset(offset, offsetInPage*8);
-            lep.releasePage();
-            return;
-        }
-    }
-
-    public long getEntryOffset(long ledger, long entry) throws IOException {
-        int offsetInPage = (int) (entry%entriesPerPage);
-        // find the id of the first entry of the page that has the entry
-        // we are looking for
-        long pageEntry = entry-offsetInPage;
-        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
-        try {
-            if (lep == null) {
-                lep = grabCleanPage(ledger, pageEntry);
-                // should update page before we put it into table
-                // otherwise we would put an empty page in it
-                updatePage(lep);
-                synchronized(this) {
-                    putIntoTable(pages, lep);
-                }
-            }
-            return lep.getOffset(offsetInPage*8);
-        } finally {
-            if (lep != null) {
-                lep.releasePage();
-            }
-        }
-    }
-
-    static final String getLedgerName(long ledgerId) {
-        int parent = (int) (ledgerId & 0xff);
-        int grandParent = (int) ((ledgerId & 0xff00) >> 8);
-        StringBuilder sb = new StringBuilder();
-        sb.append(Integer.toHexString(grandParent));
-        sb.append('/');
-        sb.append(Integer.toHexString(parent));
-        sb.append('/');
-        sb.append(Long.toHexString(ledgerId));
-        sb.append(".idx");
-        return sb.toString();
-    }
-
-    static final private Random rand = new Random();
-
-    static final private File pickDirs(File dirs[]) {
-        return dirs[rand.nextInt(dirs.length)];
-    }
-
-    FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
-        synchronized(fileInfoCache) {
-            FileInfo fi = fileInfoCache.get(ledger);
-            if (fi == null) {
-                String ledgerName = getLedgerName(ledger);
-                File lf = null;
-                for(File d: ledgerDirectories) {
-                    lf = new File(d, ledgerName);
-                    if (lf.exists()) {
-                        break;
-                    }
-                    lf = null;
-                }
-                if (lf == null) {
-                    if (masterKey == null) {
-                        throw new Bookie.NoLedgerException(ledger);
-                    }
-                    File dir = pickDirs(ledgerDirectories);
-                    lf = new File(dir, ledgerName);
-                    // A new ledger index file has been created for this Bookie.
-                    // Add this new ledger to the set of active ledgers.
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("New ledger index file created for ledgerId: " + ledger);
-                    }
-                    activeLedgerManager.addActiveLedger(ledger, true);
-                }
-                if (openLedgers.size() > openFileLimit) {
-                    fileInfoCache.remove(openLedgers.removeFirst()).close();
-                }
-                fi = new FileInfo(lf, masterKey);
-                fileInfoCache.put(ledger, fi);
-                openLedgers.add(ledger);
-            }
-            if (fi != null) {
-                fi.use();
-            }
-            return fi;
-        }
-    }
-    private void updatePage(LedgerEntryPage lep) throws IOException {
-        if (!lep.isClean()) {
-            throw new IOException("Trying to update a dirty page");
-        }
-        FileInfo fi = null;
-        try {
-            fi = getFileInfo(lep.getLedger(), null);
-            long pos = lep.getFirstEntry()*8;
-            if (pos >= fi.size()) {
-                lep.zeroPage();
-            } else {
-                lep.readPage(fi);
-            }
-        } finally {
-            if (fi != null) {
-                fi.release();
-            }
-        }
-    }
-
-    void flushLedger(boolean doAll) throws IOException {
-        synchronized(dirtyLedgers) {
-            if (dirtyLedgers.isEmpty()) {
-                synchronized(this) {
-                    for(Long l: pages.keySet()) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Adding " + Long.toHexString(l) + " to dirty pages");
-                        }
-                        dirtyLedgers.add(l);
-                    }
-                }
-            }
-            if (dirtyLedgers.isEmpty()) {
-                return;
-            }
-            while(!dirtyLedgers.isEmpty()) {
-                Long l = dirtyLedgers.removeFirst();
-
-                flushLedger(l);
-
-                if (!doAll) {
-                    break;
-                }
-                // Yeild. if we are doing all the ledgers we don't want to block other flushes
that
-                // need to happen
-                try {
-                    dirtyLedgers.wait(1);
-                } catch (InterruptedException e) {
-                    // just pass it on
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-    }
-
-    /**
-     * Flush a specified ledger
-     *
-     * @param l
-     *          Ledger Id
-     * @throws IOException
-     */
-    private void flushLedger(long l) throws IOException {
-        LinkedList<Long> firstEntryList;
-        synchronized(this) {
-            HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
-            if (pageMap == null || pageMap.isEmpty()) {
-                return;
-            }
-            firstEntryList = new LinkedList<Long>();
-            for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
-                LedgerEntryPage lep = entry.getValue();
-                if (lep.isClean()) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Page is clean " + lep);
-                    }
-                    continue;
-                }
-                firstEntryList.add(lep.getFirstEntry());
-            }
-        }
-
-        if (firstEntryList.size() == 0) {
-            LOG.debug("Nothing to flush for ledger {}.", l);
-            // nothing to do
-            return;
-        }
-
-        // Now flush all the pages of a ledger
-        List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
-        FileInfo fi = null;
-        try {
-            for(Long firstEntry: firstEntryList) {
-                LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
-                if (lep != null) {
-                    entries.add(lep);
-                }
-            }
-            Collections.sort(entries, new Comparator<LedgerEntryPage>() {
-                    @Override
-                    public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
-                    return (int)(o1.getFirstEntry()-o2.getFirstEntry());
-                    }
-                    });
-            ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
-            fi = getFileInfo(l, null);
-            int start = 0;
-            long lastOffset = -1;
-            for(int i = 0; i < entries.size(); i++) {
-                versions.add(i, entries.get(i).getVersion());
-                if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset)
!= entriesPerPage) {
-                    // send up a sequential list
-                    int count = i - start;
-                    if (count == 0) {
-                        System.out.println("Count cannot possibly be zero!");
-                    }
-                    writeBuffers(l, entries, fi, start, count);
-                    start = i;
-                }
-                lastOffset = entries.get(i).getFirstEntry();
-            }
-            if (entries.size()-start == 0 && entries.size() != 0) {
-                System.out.println("Nothing to write, but there were entries!");
-            }
-            writeBuffers(l, entries, fi, start, entries.size()-start);
-            synchronized(this) {
-                for(int i = 0; i < entries.size(); i++) {
-                    LedgerEntryPage lep = entries.get(i);
-                    lep.setClean(versions.get(i));
-                }
-            }
-        } finally {
-            for(LedgerEntryPage lep: entries) {
-                lep.releasePage();
-            }
-            if (fi != null) {
-                fi.release();
-            }
-        }
-    }
-
-    private void writeBuffers(Long ledger,
-                              List<LedgerEntryPage> entries, FileInfo fi,
-                              int start, int count) throws IOException {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Writing " + count + " buffers of " + Long.toHexString(ledger));
-        }
-        if (count == 0) {
-            //System.out.println("Count is zero!");
-            return;
-        }
-        ByteBuffer buffs[] = new ByteBuffer[count];
-        for(int j = 0; j < count; j++) {
-            buffs[j] = entries.get(start+j).getPageToWrite();
-            if (entries.get(start+j).getLedger() != ledger) {
-                throw new IOException("Writing to " + ledger + " but page belongs to " +
entries.get(start+j).getLedger());
-            }
-        }
-        long totalWritten = 0;
-        while(buffs[buffs.length-1].remaining() > 0) {
-            long rc = fi.write(buffs, entries.get(start+0).getFirstEntry()*8);
-            if (rc <= 0) {
-                throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
-            }
-            //System.out.println("Wrote " + rc + " to " + ledger);
-            totalWritten += rc;
-        }
-        if (totalWritten != count * pageSize) {
-            throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten
+ " expected " + count * pageSize);
-        }
-    }
-    private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
-        if (entry % entriesPerPage != 0) {
-            throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
-        }
-        synchronized(this) {
-            if (pageCount  < pageLimit) {
-                // let's see if we can allocate something
-                LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
-                lep.setLedger(ledger);
-                lep.setFirstEntry(entry);
-
-                // note, this will not block since it is a new page
-                lep.usePage();
-                pageCount++;
-                return lep;
-            }
-        }
-
-        outerLoop:
-        while(true) {
-            synchronized(cleanLedgers) {
-                if (cleanLedgers.isEmpty()) {
-                    flushLedger(false);
-                    synchronized(this) {
-                        for(Long l: pages.keySet()) {
-                            cleanLedgers.add(l);
-                        }
-                    }
-                }
-                synchronized(this) {
-                    Long cleanLedger = cleanLedgers.getFirst();
-                    Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
-                    if (map == null || map.isEmpty()) {
-                        cleanLedgers.removeFirst();
-                        continue;
-                    }
-                    Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
-                    LedgerEntryPage lep = it.next().getValue();
-                    while((lep.inUse() || !lep.isClean())) {
-                        if (!it.hasNext()) {
-                            continue outerLoop;
-                        }
-                        lep = it.next().getValue();
-                    }
-                    it.remove();
-                    if (map.isEmpty()) {
-                        pages.remove(lep.getLedger());
-                    }
-                    lep.usePage();
-                    lep.zeroPage();
-                    lep.setLedger(ledger);
-                    lep.setFirstEntry(entry);
-                    return lep;
-                }
-            }
-        }
-    }
-
-    public long getLastEntry(long ledgerId) {
-        long lastEntry = 0;
-        // Find the last entry in the cache
-        synchronized(this) {
-            Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
-            if (map != null) {
-                for(LedgerEntryPage lep: map.values()) {
-                    if (lep.getFirstEntry() + entriesPerPage < lastEntry) {
-                        continue;
-                    }
-                    lep.usePage();
-                    long highest = lep.getLastEntry();
-                    if (highest > lastEntry) {
-                        lastEntry = highest;
-                    }
-                    lep.releasePage();
-                }
-            }
-        }
-
-        return lastEntry;
-    }
-
-    /**
-     * This method will look within the ledger directories for the ledger index
-     * files. That will comprise the set of active ledgers this particular
-     * BookieServer knows about that have not yet been deleted by the BookKeeper
-     * Client. This is called only once during initialization.
-     */
-    private void getActiveLedgers() {
-        // Ledger index files are stored in a file hierarchy with a parent and
-        // grandParent directory. We'll have to go two levels deep into these
-        // directories to find the index files.
-        for (File ledgerDirectory : ledgerDirectories) {
-            for (File grandParent : ledgerDirectory.listFiles()) {
-                if (grandParent.isDirectory()) {
-                    for (File parent : grandParent.listFiles()) {
-                        if (parent.isDirectory()) {
-                            for (File index : parent.listFiles()) {
-                                if (!index.isFile() || !index.getName().endsWith(".idx"))
{
-                                    continue;
-                                }
-                                // We've found a ledger index file. The file name is the
-                                // HexString representation of the ledgerId.
-                                String ledgerIdInHex = index.getName().substring(0, index.getName().length()
- 4);
-                                activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex,
16), true);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
+interface LedgerCache {
+    void setMasterKey(long ledgerId, byte[] masterKey) throws IOException;
+    byte[] readMasterKey(long ledgerId) throws IOException, BookieException;
+    boolean ledgerExists(long ledgerId) throws IOException;
 
-    /**
-     * This method is called whenever a ledger is deleted by the BookKeeper Client
-     * and we want to remove all relevant data for it stored in the LedgerCache.
-     */
-    void deleteLedger(long ledgerId) throws IOException {
-        if (LOG.isDebugEnabled())
-            LOG.debug("Deleting ledgerId: " + ledgerId);
-        // Delete the ledger's index file and close the FileInfo
-        FileInfo fi = getFileInfo(ledgerId, null);
-        fi.delete();
-        fi.close();
+    void putEntryOffset(long ledger, long entry, long offset) throws IOException;
+    long getEntryOffset(long ledger, long entry) throws IOException;
 
-        // Remove it from the active ledger manager
-        activeLedgerManager.removeActiveLedger(ledgerId);
+    void flushLedger(boolean doAll) throws IOException;
+    long getLastEntry(long ledgerId) throws IOException;
 
-        // Now remove it from all the other lists and maps.
-        // These data structures need to be synchronized first before removing entries.
-        synchronized(this) {
-            pages.remove(ledgerId);
-        }
-        synchronized(fileInfoCache) {
-            fileInfoCache.remove(ledgerId);
-        }
-        synchronized(cleanLedgers) {
-            cleanLedgers.remove(ledgerId);
-        }
-        synchronized(dirtyLedgers) {
-            dirtyLedgers.remove(ledgerId);
-        }
-        synchronized(openLedgers) {
-            openLedgers.remove(ledgerId);
-        }
-    }
+    void deleteLedger(long ledgerId) throws IOException;
 
+    LedgerCacheBean getJMXBean();
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheBean.java
Tue Mar 20 11:51:03 2012
@@ -23,57 +23,5 @@ import org.apache.bookkeeper.jmx.BKMBean
 /**
  * Ledger Cache Bean
  */
-public class LedgerCacheBean implements LedgerCacheMXBean, BKMBeanInfo {
-
-    final LedgerCache lc;
-
-    public LedgerCacheBean(LedgerCache lc) {
-        this.lc = lc;
-    }
-
-    @Override
-    public String getName() {
-        return "LedgerCache";
-    }
-
-    @Override
-    public boolean isHidden() {
-        return false;
-    }
-
-    @Override
-    public int getPageCount() {
-        return lc.getNumUsedPages();
-    }
-
-    @Override
-    public int getPageSize() {
-        return lc.getPageSize();
-    }
-
-    @Override
-    public int getOpenFileLimit() {
-        return lc.openFileLimit;
-    }
-
-    @Override
-    public int getPageLimit() {
-        return lc.getPageLimit();
-    }
-
-    @Override
-    public int getNumCleanLedgers() {
-        return lc.cleanLedgers.size();
-    }
-
-    @Override
-    public int getNumDirtyLedgers() {
-        return lc.dirtyLedgers.size();
-    }
-
-    @Override
-    public int getNumOpenLedgers() {
-        return lc.openLedgers.size();
-    }
-
+public interface LedgerCacheBean extends LedgerCacheMXBean, BKMBeanInfo {
 }

Copied: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
(from r1302458, zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?p2=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java&p1=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java&r1=1302458&r2=1302851&rev=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
Tue Mar 20 11:51:03 2012
@@ -40,21 +40,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class maps a ledger entry number into a location (entrylogid, offset) in
- * an entry log file. It does user level caching to more efficiently manage disk
- * head scheduling.
+ * Implementation of LedgerCache interface.
+ * This class serves two purposes.
  */
-public class LedgerCache {
+public class LedgerCacheImpl implements LedgerCache {
     private final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
 
     final File ledgerDirectories[];
 
-    public LedgerCache(ServerConfiguration conf, LedgerManager alm) {
+    public LedgerCacheImpl(ServerConfiguration conf, LedgerManager alm) {
         this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
         this.openFileLimit = conf.getOpenFileLimit();
         this.pageSize = conf.getPageSize();
         this.entriesPerPage = pageSize / 8;
-        
+
         if (conf.getPageLimit() <= 0) {
             // allocate half of the memory to the page cache
             this.pageLimit = (int)((Runtime.getRuntime().maxMemory() / 3) / this.pageSize);
@@ -131,7 +130,8 @@ public class LedgerCache {
         map.put(lep.getFirstEntry(), lep);
     }
 
-    private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>>
table, Long ledger, Long firstEntry) {
+    private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>>
table,
+                                                Long ledger, Long firstEntry) {
         HashMap<Long, LedgerEntryPage> map = table.get(ledger);
         if (map != null) {
             return map.get(firstEntry);
@@ -153,6 +153,7 @@ public class LedgerCache {
         }
     }
 
+    @Override
     public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
         int offsetInPage = (int) (entry % entriesPerPage);
         // find the id of the first entry of the page that has the entry
@@ -174,6 +175,7 @@ public class LedgerCache {
         }
     }
 
+    @Override
     public long getEntryOffset(long ledger, long entry) throws IOException {
         int offsetInPage = (int) (entry%entriesPerPage);
         // find the id of the first entry of the page that has the entry
@@ -221,20 +223,13 @@ public class LedgerCache {
         synchronized(fileInfoCache) {
             FileInfo fi = fileInfoCache.get(ledger);
             if (fi == null) {
-                String ledgerName = getLedgerName(ledger);
-                File lf = null;
-                for(File d: ledgerDirectories) {
-                    lf = new File(d, ledgerName);
-                    if (lf.exists()) {
-                        break;
-                    }
-                    lf = null;
-                }
+                File lf = findIndexFile(ledger);
                 if (lf == null) {
                     if (masterKey == null) {
                         throw new Bookie.NoLedgerException(ledger);
                     }
                     File dir = pickDirs(ledgerDirectories);
+                    String ledgerName = getLedgerName(ledger);
                     lf = new File(dir, ledgerName);
                     // A new ledger index file has been created for this Bookie.
                     // Add this new ledger to the set of active ledgers.
@@ -276,7 +271,8 @@ public class LedgerCache {
         }
     }
 
-    void flushLedger(boolean doAll) throws IOException {
+    @Override
+    public void flushLedger(boolean doAll) throws IOException {
         synchronized(dirtyLedgers) {
             if (dirtyLedgers.isEmpty()) {
                 synchronized(this) {
@@ -411,7 +407,8 @@ public class LedgerCache {
         for(int j = 0; j < count; j++) {
             buffs[j] = entries.get(start+j).getPageToWrite();
             if (entries.get(start+j).getLedger() != ledger) {
-                throw new IOException("Writing to " + ledger + " but page belongs to " +
entries.get(start+j).getLedger());
+                throw new IOException("Writing to " + ledger + " but page belongs to "
+                                      + entries.get(start+j).getLedger());
             }
         }
         long totalWritten = 0;
@@ -424,7 +421,8 @@ public class LedgerCache {
             totalWritten += rc;
         }
         if (totalWritten != count * pageSize) {
-            throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten
+ " expected " + count * pageSize);
+            throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten
+                                  + " expected " + count * pageSize);
         }
     }
     private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
@@ -485,7 +483,8 @@ public class LedgerCache {
         }
     }
 
-    public long getLastEntry(long ledgerId) {
+    @Override
+    public long getLastEntry(long ledgerId) throws IOException {
         long lastEntry = 0;
         // Find the last entry in the cache
         synchronized(this) {
@@ -505,6 +504,41 @@ public class LedgerCache {
             }
         }
 
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            long size = fi.size();
+            // make sure the file size is aligned with index entry size
+            // otherwise we may read incorret data
+            if (0 != size % 8) {
+                LOG.warn("Index file of ledger {} is not aligned with index entry size.",
ledgerId);
+                size = size - size % 8;
+            }
+            // we may not have the last entry in the cache
+            if (size > lastEntry*8) {
+                ByteBuffer bb = ByteBuffer.allocate(getPageSize());
+                long position = size - getPageSize();
+                if (position < 0) {
+                    position = 0;
+                }
+                fi.read(bb, position);
+                bb.flip();
+                long startingEntryId = position/8;
+                for(int i = getEntriesPerPage()-1; i >= 0; i--) {
+                    if (bb.getLong(i*8) != 0) {
+                        if (lastEntry < startingEntryId+i) {
+                            lastEntry = startingEntryId+i;
+                        }
+                        break;
+                    }
+                }
+            }
+        } finally {
+            if (fi != null) {
+                fi.release();
+            }
+        }
+
         return lastEntry;
     }
 
@@ -543,7 +577,8 @@ public class LedgerCache {
      * This method is called whenever a ledger is deleted by the BookKeeper Client
      * and we want to remove all relevant data for it stored in the LedgerCache.
      */
-    void deleteLedger(long ledgerId) throws IOException {
+    @Override
+    public void deleteLedger(long ledgerId) throws IOException {
         if (LOG.isDebugEnabled())
             LOG.debug("Deleting ledgerId: " + ledgerId);
         // Delete the ledger's index file and close the FileInfo
@@ -573,4 +608,105 @@ public class LedgerCache {
         }
     }
 
+    private File findIndexFile(long ledgerId) throws IOException {
+        String ledgerName = getLedgerName(ledgerId);
+        for(File d: ledgerDirectories) {
+            File lf = new File(d, ledgerName);
+            if (lf.exists()) {
+                return lf;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+        synchronized(fileInfoCache) {
+            FileInfo fi = fileInfoCache.get(ledgerId);
+            if (fi == null) {
+                File lf = findIndexFile(ledgerId);
+                if (lf == null) {
+                    throw new Bookie.NoLedgerException(ledgerId);
+                }
+                if (openLedgers.size() > openFileLimit) {
+                    fileInfoCache.remove(openLedgers.removeFirst()).close();
+                }
+                fi = new FileInfo(lf, null);
+                byte[] key = fi.getMasterKey();
+                fileInfoCache.put(ledgerId, fi);
+                openLedgers.add(ledgerId);
+                return key;
+            }
+            return fi.getMasterKey();
+        }
+    }
+
+    @Override
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        getFileInfo(ledgerId, masterKey);
+    }
+
+    @Override
+    public boolean ledgerExists(long ledgerId) throws IOException {
+        synchronized(fileInfoCache) {
+            FileInfo fi = fileInfoCache.get(ledgerId);
+            if (fi == null) {
+                File lf = findIndexFile(ledgerId);
+                if (lf == null) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public LedgerCacheBean getJMXBean() {
+        return new LedgerCacheBean() {
+            @Override
+            public String getName() {
+                return "LedgerCache";
+            }
+
+            @Override
+            public boolean isHidden() {
+                return false;
+            }
+
+            @Override
+            public int getPageCount() {
+                return getNumUsedPages();
+            }
+
+            @Override
+            public int getPageSize() {
+                return getPageSize();
+            }
+
+            @Override
+            public int getOpenFileLimit() {
+                return openFileLimit;
+            }
+
+            @Override
+            public int getPageLimit() {
+                return getPageLimit();
+            }
+
+            @Override
+            public int getNumCleanLedgers() {
+                return cleanLedgers.size();
+            }
+
+            @Override
+            public int getNumDirtyLedgers() {
+                return dirtyLedgers.size();
+            }
+
+            @Override
+            public int getNumOpenLedgers() {
+                return openLedgers.size();
+            }
+        };
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
Tue Mar 20 11:51:03 2012
@@ -37,11 +37,13 @@ import org.slf4j.LoggerFactory;
  */
 public class LedgerDescriptor {
     final static Logger LOG = LoggerFactory.getLogger(LedgerDescriptor.class);
-    LedgerCache ledgerCache;
+    LedgerCacheImpl ledgerCache;
     LedgerDescriptor(long ledgerId, EntryLogger entryLogger, LedgerCache ledgerCache) {
         this.ledgerId = ledgerId;
         this.entryLogger = entryLogger;
-        this.ledgerCache = ledgerCache;
+        // This cast is only here until ledgerDescriptor changes go in to make it
+        // unnecessary
+        this.ledgerCache = (LedgerCacheImpl)ledgerCache;
     }
 
     private byte[] masterKey = null;
@@ -136,42 +138,7 @@ public class LedgerDescriptor {
          * If entryId is -1, then return the last written.
          */
         if (entryId == -1) {
-            long lastEntry = ledgerCache.getLastEntry(ledgerId);
-            FileInfo fi = null;
-            try {
-                fi = ledgerCache.getFileInfo(ledgerId, null);
-                long size = fi.size();
-                // make sure the file size is aligned with index entry size
-                // otherwise we may read incorret data
-                if (0 != size % 8) {
-                    LOG.warn("Index file of ledger {} is not aligned with index entry size.",
ledgerId);
-                    size = size - size % 8;
-                }
-                // we may not have the last entry in the cache
-                if (size > lastEntry*8) {
-                    ByteBuffer bb = ByteBuffer.allocate(ledgerCache.getPageSize());
-                    long position = size - ledgerCache.getPageSize();
-                    if (position < 0) {
-                        position = 0;
-                    }
-                    fi.read(bb, position);
-                    bb.flip();
-                    long startingEntryId = position/8;
-                    for(int i = ledgerCache.getEntriesPerPage()-1; i >= 0; i--) {
-                        if (bb.getLong(i*8) != 0) {
-                            if (lastEntry < startingEntryId+i) {
-                                lastEntry = startingEntryId+i;
-                            }
-                            break;
-                        }
-                    }
-                }
-            } finally {
-                if (fi != null) {
-                    fi.release();
-                }
-            }
-            entryId = lastEntry;
+            entryId = ledgerCache.getLastEntry(ledgerId);
         }
 
         offset = ledgerCache.getEntryOffset(ledgerId, entryId);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
Tue Mar 20 11:51:03 2012
@@ -59,7 +59,7 @@ public class BookieJournalTest {
     private void writeIndexFileForLedger(File indexDir, long ledgerId,
                                          byte[] masterKey)
             throws Exception {
-        File fn = new File(indexDir, LedgerCache.getLedgerName(ledgerId));
+        File fn = new File(indexDir, LedgerCacheImpl.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file
@@ -70,7 +70,7 @@ public class BookieJournalTest {
     private void writePartialIndexFileForLedger(File indexDir, long ledgerId,
                                                 byte[] masterKey, boolean truncateToMasterKey)
             throws Exception {
-        File fn = new File(indexDir, LedgerCache.getLedgerName(ledgerId));
+        File fn = new File(indexDir, LedgerCacheImpl.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java?rev=1302851&r1=1302850&r2=1302851&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
Tue Mar 20 11:51:03 2012
@@ -72,7 +72,7 @@ public class UpgradeTest {
             throws Exception {
         long ledgerId = 1;
 
-        File fn = new File(dir, LedgerCache.getLedgerName(ledgerId));
+        File fn = new File(dir, LedgerCacheImpl.getLedgerName(ledgerId));
         fn.getParentFile().mkdirs();
         FileInfo fi = new FileInfo(fn, masterKey);
         // force creation of index file



Mime
View raw message