zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1534640 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/util/
Date Tue, 22 Oct 2013 14:21:29 GMT
Author: ivank
Date: Tue Oct 22 14:21:28 2013
New Revision: 1534640

URL: http://svn.apache.org/r1534640
Log:
BOOKKEEPER-659: LRU page management in ledger cache. (Aniruddha, Robin Dhamankar & sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKey.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LEPStateChangeCallback.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZeroBuffer.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheMXBean.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Oct 22 14:21:28 2013
@@ -114,6 +114,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-688: NPE exception in PerChannelBookieClient (ivank via sijie)
 
+        BOOKKEEPER-659: LRU page management in ledger cache. (Aniruddha, Robin Dhamankar & sijie via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Oct 22 14:21:28 2013
@@ -35,6 +35,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -114,7 +116,7 @@ public class Bookie extends BookieThread
     BookieBean jmxBookieBean;
     BKMBeanInfo jmxLedgerStorageBean;
 
-    Map<Long, byte[]> masterKeyCache = Collections.synchronizedMap(new HashMap<Long, byte[]>());
+    final ConcurrentMap<Long, byte[]> masterKeyCache = new ConcurrentHashMap<Long, byte[]>();
 
     final private String zkBookieRegPath;
 
@@ -122,7 +124,7 @@ public class Bookie extends BookieThread
 
     public static class NoLedgerException extends IOException {
         private static final long serialVersionUID = 1L;
-        private long ledgerId;
+        private final long ledgerId;
         public NoLedgerException(long ledgerId) {
             super("Ledger " + ledgerId + " not found");
             this.ledgerId = ledgerId;
@@ -133,8 +135,8 @@ public class Bookie extends BookieThread
     }
     public static class NoEntryException extends IOException {
         private static final long serialVersionUID = 1L;
-        private long ledgerId;
-        private long entryId;
+        private final long ledgerId;
+        private final long entryId;
         public NoEntryException(long ledgerId, long entryId) {
             this("Entry " + entryId + " not found in " + ledgerId, ledgerId, entryId);
         }
@@ -885,8 +887,9 @@ public class Bookie extends BookieThread
             bb.put(masterKey);
             bb.flip();
 
-            journal.logAddEntry(bb, new NopWriteCallback(), null);
-            masterKeyCache.put(ledgerId, masterKey);
+            if (null == masterKeyCache.putIfAbsent(ledgerId, masterKey)) {
+                journal.logAddEntry(bb, new NopWriteCallback(), null);
+            }
         }
         return l;
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Tue Oct 22 14:21:28 2013
@@ -859,8 +859,7 @@ public class BookieShell implements Tool
         lep.usePage();
         try {
             while (curSize < size) {
-                lep.setLedger(ledgerId);
-                lep.setFirstEntry(curEntry);
+                lep.setLedgerAndFirstEntry(ledgerId, curEntry);
                 lep.readPage(fi);
 
                 // process a page

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java Tue Oct 22 14:21:28 2013
@@ -28,8 +28,10 @@ import java.nio.channels.FileChannel;
 /**
  * Provides a buffering layer in front of a FileChannel.
  */
-public class BufferedChannel
-{
+public class BufferedChannel {
+
+    static final byte zeroPage[] = new byte[64 * 1024];
+
     ByteBuffer writeBuffer;
     ByteBuffer readBuffer;
     private FileChannel bc;
@@ -193,7 +195,7 @@ public class BufferedChannel
                 if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition) {
                     readBufferStartPosition = writeBufferStartPosition - readBuffer.capacity();
                     if (readBufferStartPosition < 0) {
-                        readBuffer.put(LedgerEntryPage.zeroPage, 0, (int)-readBufferStartPosition);
+                        readBuffer.put(zeroPage, 0, (int) -readBufferStartPosition);
                     }
                 }
                 while(readBuffer.remaining() > 0) {
@@ -201,7 +203,7 @@ public class BufferedChannel
                         throw new IOException("Short read");
                     }
                 }
-                readBuffer.put(LedgerEntryPage.zeroPage, 0, readBuffer.remaining());
+                readBuffer.put(zeroPage, 0, readBuffer.remaining());
                 readBuffer.clear();
             }
         }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKey.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKey.java?rev=1534640&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKey.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKey.java Tue Oct 22 14:21:28 2013
@@ -0,0 +1,83 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+public class EntryKey {
+    long ledgerId;
+    long entryId;
+
+    public EntryKey() {
+        this(0, 0);
+    }
+
+    public EntryKey(long ledgerId, long entryId) {
+        this.ledgerId = ledgerId;
+        this.entryId = entryId;
+    }
+
+    public long getLedgerId() {
+        return ledgerId;
+    }
+
+    public long getEntryId() {
+        return entryId;
+    }
+
+    /**
+    * Comparator for the key portion
+    */
+    public static final KeyComparator COMPARATOR = new KeyComparator();
+
+    // Only compares the key portion
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof EntryKey)) {
+          return false;
+        }
+        EntryKey key = (EntryKey)other;
+        return ledgerId == key.ledgerId &&
+            entryId == key.entryId;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int)(ledgerId * 13 ^ entryId * 17);
+    }
+}
+
+/**
+* Compare EntryKey.
+*/
+class KeyComparator implements Comparator<EntryKey>, Serializable {
+
+    private static final long serialVersionUID = 0L;
+
+    @Override
+    public int compare(EntryKey left, EntryKey right) {
+        long ret = left.ledgerId - right.ledgerId;
+        if (ret == 0) {
+            ret = left.entryId - right.entryId;
+        }
+        return (ret < 0)? -1 : ((ret > 0)? 1 : 0);
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Tue Oct 22 14:21:28 2013
@@ -29,6 +29,7 @@ import java.io.RandomAccessFile;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,7 +72,7 @@ class FileInfo {
 
     static final long START_OF_DATA = 1024;
     private long size;
-    private int useCount;
+    private AtomicInteger useCount = new AtomicInteger(0);
     private boolean isClosed;
     private long sizeSinceLastwrite;
 
@@ -144,7 +145,7 @@ class FileInfo {
             throw new IOException(lf + " not found");
         }
 
-        if (!exists) { 
+        if (!exists) {
             if (create) {
                 // delayed the creation of parents directories
                 checkParents(lf);
@@ -221,18 +222,23 @@ class FileInfo {
         return rc;
     }
 
-    synchronized public int read(ByteBuffer bb, long position) throws IOException {
+    public int read(ByteBuffer bb, long position) throws IOException {
         return readAbsolute(bb, position + START_OF_DATA);
     }
 
     private int readAbsolute(ByteBuffer bb, long start) throws IOException {
         checkOpen(false);
-        if (fc == null) {
-            return 0;
+        synchronized (this) {
+            if (fc == null) {
+                return 0;
+            }
         }
         int total = 0;
+        int rc = 0;
         while(bb.remaining() > 0) {
-            int rc = fc.read(bb, start);
+            synchronized (this) {
+                rc = fc.read(bb, start);
+            }
             if (rc <= 0) {
                 throw new IOException("Short read");
             }
@@ -253,7 +259,7 @@ class FileInfo {
     synchronized public void close(boolean force) throws IOException {
         isClosed = true;
         checkOpen(force);
-        if (useCount == 0 && fc != null) {
+        if (useCount.get() == 0 && fc != null) {
             fc.close();
         }
     }
@@ -288,49 +294,51 @@ class FileInfo {
      */
     public synchronized void moveToNewLocation(File newFile, long size) throws IOException {
         checkOpen(false);
-        if (fc != null) {
-            if (size > fc.size()) {
-                size = fc.size();
-            }
-            File rlocFile = new File(newFile.getParentFile(), newFile.getName() + IndexPersistenceMgr.RLOC);
-            if (!rlocFile.exists()) {
-                checkParents(rlocFile);
-                if (!rlocFile.createNewFile()) {
-                    throw new IOException("Creating new cache index file " + rlocFile + " failed ");
-                }
+        // If the channel is null, or same file path, just return.
+        if (null == fc || isSameFile(newFile)) {
+            return;
+        }
+        if (size > fc.size()) {
+            size = fc.size();
+        }
+        File rlocFile = new File(newFile.getParentFile(), newFile.getName() + IndexPersistenceMgr.RLOC);
+        if (!rlocFile.exists()) {
+            checkParents(rlocFile);
+            if (!rlocFile.createNewFile()) {
+                throw new IOException("Creating new cache index file " + rlocFile + " failed ");
             }
-            // copy contents from old.idx to new.idx.rloc
-            FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel();
-            try {
-                long written = 0;
-                while (written < size) {
-                    long count = fc.transferTo(written, size, newFc);
-                    if (count <= 0) {
-                        throw new IOException("Copying to new location " + rlocFile + " failed");
-                    }
-                    written += count;
-                }
-                if (written <= 0 && size > 0) {
+        }
+        // copy contents from old.idx to new.idx.rloc
+        FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel();
+        try {
+            long written = 0;
+            while (written < size) {
+                long count = fc.transferTo(written, size, newFc);
+                if (count <= 0) {
                     throw new IOException("Copying to new location " + rlocFile + " failed");
                 }
-            } finally {
-                newFc.force(true);
-                newFc.close();
-            }
-            // delete old.idx
-            fc.close();
-            if (!delete()) {
-                LOG.error("Failed to delete the previous index file " + lf);
-                throw new IOException("Failed to delete the previous index file " + lf);
+                written += count;
             }
-
-            // rename new.idx.rloc to new.idx
-            if (!rlocFile.renameTo(newFile)) {
-                LOG.error("Failed to rename " + rlocFile + " to " + newFile);
-                throw new IOException("Failed to rename " + rlocFile + " to " + newFile);
+            if (written <= 0 && size > 0) {
+                throw new IOException("Copying to new location " + rlocFile + " failed");
             }
-            fc = new RandomAccessFile(newFile, mode).getChannel();
+        } finally {
+            newFc.force(true);
+            newFc.close();
+        }
+        // delete old.idx
+        fc.close();
+        if (!delete()) {
+            LOG.error("Failed to delete the previous index file " + lf);
+            throw new IOException("Failed to delete the previous index file " + lf);
+        }
+
+        // rename new.idx.rloc to new.idx
+        if (!rlocFile.renameTo(newFile)) {
+            LOG.error("Failed to rename " + rlocFile + " to " + newFile);
+            throw new IOException("Failed to rename " + rlocFile + " to " + newFile);
         }
+        fc = new RandomAccessFile(newFile, mode).getChannel();
         lf = newFile;
     }
 
@@ -339,18 +347,18 @@ class FileInfo {
         return masterKey;
     }
 
-    synchronized public void use() {
-        useCount++;
+    public void use() {
+        useCount.incrementAndGet();
     }
 
     @VisibleForTesting
-    synchronized int getUseCount() {
-        return useCount;
+    int getUseCount() {
+        return useCount.get();
     }
 
     synchronized public void release() {
-        useCount--;
-        if (isClosed && useCount == 0 && fc != null) {
+        int count = useCount.decrementAndGet();
+        if (isClosed && (count == 0) && fc != null) {
             try {
                 fc.close();
             } catch (IOException e) {
@@ -372,4 +380,8 @@ class FileInfo {
             throw new IOException("Counldn't mkdirs for " + parent);
         }
     }
+
+    public boolean isSameFile(File f) {
+        return this.lf.equals(f);
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java Tue Oct 22 14:21:28 2013
@@ -22,12 +22,13 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 class HandleFactoryImpl implements HandleFactory {
-    HashMap<Long, LedgerDescriptor> ledgers = new HashMap<Long, LedgerDescriptor>();
-    HashMap<Long, LedgerDescriptor> readOnlyLedgers
-        = new HashMap<Long, LedgerDescriptor>();
+    ConcurrentMap<Long, LedgerDescriptor> ledgers = new ConcurrentHashMap<Long, LedgerDescriptor>();
+    ConcurrentMap<Long, LedgerDescriptor> readOnlyLedgers
+        = new ConcurrentHashMap<Long, LedgerDescriptor>();
 
     final LedgerStorage ledgerStorage;
 
@@ -39,14 +40,16 @@ class HandleFactoryImpl implements Handl
     public LedgerDescriptor getHandle(long ledgerId, byte[] masterKey)
             throws IOException, BookieException {
         LedgerDescriptor handle = null;
-        synchronized (ledgers) {
-            handle = ledgers.get(ledgerId);
-            if (handle == null) {
-                handle = LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage);
-                ledgers.put(ledgerId, handle);
+        if (null == (handle = ledgers.get(ledgerId))) {
+            // LedgerDescriptor#create sets the master key in the ledger storage, calling it
+            // twice on the same ledgerId is safe because it eventually puts a value in the ledger cache
+            // that guarantees synchronized access across all cached entries.
+            handle = ledgers.putIfAbsent(ledgerId, LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage));
+            if (null == handle) {
+                handle = ledgers.get(ledgerId);
             }
-            handle.checkAccess(masterKey);
         }
+        handle.checkAccess(masterKey);
         return handle;
     }
 
@@ -54,13 +57,12 @@ class HandleFactoryImpl implements Handl
     public LedgerDescriptor getReadOnlyHandle(long ledgerId)
             throws IOException, Bookie.NoLedgerException {
         LedgerDescriptor handle = null;
-        synchronized (ledgers) {
-            handle = readOnlyLedgers.get(ledgerId);
-            if (handle == null) {
-                handle = LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage);
-                readOnlyLedgers.put(ledgerId, handle);
+        if (null == (handle = readOnlyLedgers.get(ledgerId))) {
+            handle = readOnlyLedgers.putIfAbsent(ledgerId, LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage));
+            if (null == handle) {
+                handle = readOnlyLedgers.get(ledgerId);
             }
         }
         return handle;
     }
-}
\ No newline at end of file
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java Tue Oct 22 14:21:28 2013
@@ -20,28 +20,293 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 class IndexInMemPageMgr {
     private final static Logger LOG = LoggerFactory.getLogger(IndexInMemPageMgr.class);
+    private final static ConcurrentHashMap<Long, LedgerEntryPage> EMPTY_PAGE_MAP
+        = new ConcurrentHashMap<Long, LedgerEntryPage>();
+
+    private static class InMemPageCollection implements LEPStateChangeCallback {
+
+        ConcurrentMap<Long, ConcurrentMap<Long,LedgerEntryPage>> pages;
+
+        Map<EntryKey, LedgerEntryPage> lruCleanPageMap;
+
+        public InMemPageCollection() {
+            pages = new ConcurrentHashMap<Long, ConcurrentMap<Long,LedgerEntryPage>>();
+            lruCleanPageMap =
+                Collections.synchronizedMap(new LinkedHashMap<EntryKey, LedgerEntryPage>(16, 0.75f, true));
+        }
+
+        /**
+         * Retrieve the LedgerEntryPage corresponding to the ledger and firstEntry
+         *
+         * @param ledgerId
+         *          Ledger id
+         * @param firstEntry
+         *          Id of the first entry in the page
+         * @returns LedgerEntryPage if present
+         */
+        private LedgerEntryPage getPage(long ledgerId, long firstEntry) {
+            ConcurrentMap<Long, LedgerEntryPage> map = pages.get(ledgerId);
+            if (null != map) {
+                return map.get(firstEntry);
+            }
+            return null;
+        }
+
+        /**
+         * Add a LedgerEntryPage to the page map
+         *
+         * @param lep
+         *          Ledger Entry Page object
+         */
+        private LedgerEntryPage putPage(LedgerEntryPage lep) {
+            // Do a get here to avoid too many new ConcurrentHashMaps() as putIntoTable is called frequently.
+            ConcurrentMap<Long, LedgerEntryPage> map = pages.get(lep.getLedger());
+            if (null == map) {
+                ConcurrentMap<Long, LedgerEntryPage> mapToPut = new ConcurrentHashMap<Long, LedgerEntryPage>();
+                map = pages.putIfAbsent(lep.getLedger(), mapToPut);
+                if (null == map) {
+                    map = mapToPut;
+                }
+            }
+            LedgerEntryPage oldPage = map.putIfAbsent(lep.getFirstEntry(), lep);
+            if (null == oldPage) {
+                oldPage = lep;
+                // Also include this in the clean page map if it qualifies.
+                // Note: This is done for symmetry and correctness, however it should never
+                // get exercised since we shouldn't attempt a put without the page being in use
+                addToCleanPagesList(lep);
+            }
+            return oldPage;
+        }
+
+        /**
+         * Traverse the pages for a given ledger in memory and find the highest
+         * entry amongst these pages
+         *
+         * @param ledgerId
+         *          Ledger id
+         * @returns last entry in the in memory pages
+         */
+        private long getLastEntryInMem(long ledgerId) {
+            long lastEntry = 0;
+            // Find the last entry in the cache
+            ConcurrentMap<Long, LedgerEntryPage> map = pages.get(ledgerId);
+            if (map != null) {
+                for(LedgerEntryPage lep: map.values()) {
+                    if (lep.getMaxPossibleEntry() < lastEntry) {
+                        continue;
+                    }
+                    lep.usePage();
+                    long highest = lep.getLastEntry();
+                    if (highest > lastEntry) {
+                        lastEntry = highest;
+                    }
+                    lep.releasePage();
+                }
+            }
+            return lastEntry;
+        }
+
+        /**
+         * Removes ledger entry pages for a given ledger
+         *
+         * @param ledgerId
+         *          Ledger id
+         * @returns number of pages removed
+         */
+        private int removeEntriesForALedger(long ledgerId) {
+            // remove pages first to avoid page flushed when deleting file info
+            ConcurrentMap<Long, LedgerEntryPage> lPages = pages.remove(ledgerId);
+            if (null != lPages) {
+                for (long entryId: lPages.keySet()) {
+                    synchronized(lruCleanPageMap) {
+                        lruCleanPageMap.remove(new EntryKey(ledgerId, entryId));
+                    }
+                }
+                return lPages.size();
+            }
+            return 0;
+        }
+
+        /**
+         * Gets the list of pages in memory that have been changed and hence need to
+         * be written as a part of the flush operation that is being issued
+         *
+         * @param ledgerId
+         *          Ledger id
+         * @returns last entry in the in memory pages.
+         */
+        private LinkedList<Long> getFirstEntryListToBeFlushed(long ledgerId) {
+            ConcurrentMap<Long, LedgerEntryPage> pageMap = pages.get(ledgerId);
+            if (pageMap == null || pageMap.isEmpty()) {
+                return null;
+            }
+
+            LinkedList<Long> firstEntryList = new LinkedList<Long>();
+            for(ConcurrentMap.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
+                LedgerEntryPage lep = entry.getValue();
+                if (lep.isClean()) {
+                    if (!lep.inUse()) {
+                        addToCleanPagesList(lep);
+                    }
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Page is clean " + lep);
+                    }
+                } else {
+                    firstEntryList.add(lep.getFirstEntry());
+                }
+            }
+            return firstEntryList;
+        }
+
+        /**
+         * Add the LedgerEntryPage to the clean page LRU map
+         *
+         * @param lep
+         *          Ledger Entry Page object
+         */
+        private void addToCleanPagesList(LedgerEntryPage lep) {
+            synchronized(lruCleanPageMap) {
+                if (lep.isClean() && !lep.inUse()) {
+                    lruCleanPageMap.put(lep.getEntryKey(), lep);
+                }
+            }
+        }
+
+        /**
+         * Remove the LedgerEntryPage from the clean page LRU map
+         *
+         * @param lep
+         *          Ledger Entry Page object
+         */
+        private void removeFromCleanPageList(LedgerEntryPage lep) {
+            synchronized(lruCleanPageMap) {
+                if (!lep.isClean() || lep.inUse()) {
+                    lruCleanPageMap.remove(lep.getEntryKey());
+                }
+            }
+        }
+
+        /**
+         * Get the set of active ledgers
+         *
+         */
+        Set<Long> getActiveLedgers() {
+            return pages.keySet();
+        }
+
+        /**
+         * Get a clean page and provision it for the specified ledger and firstEntry within
+         * the ledger
+         *
+         * @param ledgerId
+         *          Ledger id
+         * @param firstEntry
+         *          Id of the first entry in the page
+         * @returns LedgerEntryPage if present
+         */
+        LedgerEntryPage grabCleanPage(long ledgerId, long firstEntry) {
+            LedgerEntryPage lep = null;
+            while (lruCleanPageMap.size() > 0) {
+                lep = null;
+                synchronized(lruCleanPageMap) {
+                    Iterator<Map.Entry<EntryKey,LedgerEntryPage>> iterator = lruCleanPageMap.entrySet().iterator();
+
+                    Map.Entry<EntryKey,LedgerEntryPage> entry = null;
+                    while (iterator.hasNext())
+                    {
+                        entry = iterator.next();
+                        iterator.remove();
+                        if (entry.getValue().isClean() &&
+                                !entry.getValue().inUse()) {
+                            lep = entry.getValue();
+                            break;
+                        }
+                    }
+
+                    if (null == lep) {
+                        LOG.debug("Did not find eligible page in the first pass");
+                        return null;
+                    }
+                }
+
+                // We found a candidate page, lets see if we can reclaim it before its re-used
+                ConcurrentMap<Long, LedgerEntryPage> pageMap = pages.get(lep.getLedger());
+                // Remove from map only if nothing has changed since we checked this lep.
+                // Its possible for the ledger to have been deleted or the page to have already
+                // been reclaimed. The page map is the definitive source of information, if anything
+                // has changed we should leave this page along and continue iterating to find
+                // another suitable page.
+                if ((null != pageMap) && (pageMap.remove(lep.getFirstEntry(), lep))) {
+                    if (!lep.isClean()) {
+                        // Someone wrote to this page while we were reclaiming it.
+                        pageMap.put(lep.getFirstEntry(), lep);
+                        lep = null;
+                    } else {
+                        // Do some bookkeeping on the page table
+                        pages.remove(lep.getLedger(), EMPTY_PAGE_MAP);
+                        // We can now safely reset this lep and return it.
+                        lep.usePage();
+                        lep.zeroPage();
+                        lep.setLedgerAndFirstEntry(ledgerId, firstEntry);
+                        return lep;
+                    }
+                } else {
+                    lep = null;
+                }
+            }
+            return lep;
+        }
+
+        @Override
+        public void onSetInUse(LedgerEntryPage lep) {
+            removeFromCleanPageList(lep);
+        }
+
+        @Override
+        public void onResetInUse(LedgerEntryPage lep) {
+            addToCleanPagesList(lep);
+        }
+
+        @Override
+        public void onSetClean(LedgerEntryPage lep) {
+            addToCleanPagesList(lep);
+        }
+
+        @Override
+        public void onSetDirty(LedgerEntryPage lep) {
+            removeFromCleanPageList(lep);
+        }
+    }
 
     final int pageSize;
-    final int pageLimit;
     final int entriesPerPage;
-    final HashMap<Long, HashMap<Long, LedgerEntryPage>> pages;
+    final int pageLimit;
+    final InMemPageCollection pageMapAndList;
 
     // The number of pages that have actually been used
-    private int pageCount = 0;
+    private final AtomicInteger pageCount = new AtomicInteger(0);
 
     // The persistence manager that this page manager uses to
     // flush and read pages
@@ -50,20 +315,17 @@ class IndexInMemPageMgr {
     /**
      * the list of potentially dirty ledgers
      */
-    LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
-    /**
-     * the list of potentially clean ledgers
-     */
-    LinkedList<Long> cleanLedgers = new LinkedList<Long>();
+    private final ConcurrentLinkedQueue<Long> ledgersToFlush = new ConcurrentLinkedQueue<Long>();
+    private final ConcurrentSkipListSet<Long> ledgersFlushing = new ConcurrentSkipListSet<Long>();
 
     public IndexInMemPageMgr(int pageSize,
                              int entriesPerPage,
-                             ServerConfiguration conf, 
+                             ServerConfiguration conf,
                              IndexPersistenceMgr indexPersistenceManager) {
         this.pageSize = pageSize;
         this.entriesPerPage = entriesPerPage;
         this.indexPersistenceManager = indexPersistenceManager;
-        this.pages = new HashMap<Long, HashMap<Long, LedgerEntryPage>>();
+        this.pageMapAndList = new InMemPageCollection();
 
         if (conf.getPageLimit() <= 0) {
             // allocate half of the memory to the page cache
@@ -100,51 +362,21 @@ class IndexInMemPageMgr {
      * @return number of page used in ledger cache
      */
     public int getNumUsedPages() {
-        return pageCount;
-    }
-
-    public int getNumCleanLedgers() {
-        return cleanLedgers.size();
-    }
-
-    public int getNumDirtyLedgers() {
-        return dirtyLedgers.size();
+        return pageCount.get();
     }
 
-    private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, LedgerEntryPage lep) {
-        HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
-        if (map == null) {
-            map = new HashMap<Long, LedgerEntryPage>();
-            table.put(lep.getLedger(), map);
-        }
-        map.put(lep.getFirstEntry(), lep);
-    }
-
-    private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table,
-                                                Long ledger, Long firstEntry) {
-        HashMap<Long, LedgerEntryPage> map = table.get(ledger);
-        if (map != null) {
-            return map.get(firstEntry);
-        }
-        return null;
-    }
-
-    synchronized protected LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
-        LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
-        if (lep == null) {
+    LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
+        LedgerEntryPage lep = pageMapAndList.getPage(ledger, firstEntry);
+        if (onlyDirty && null != lep && lep.isClean()) {
             return null;
         }
-
-        lep.usePage();
-
-        if (onlyDirty && lep.isClean()) {
-            return null;
-        } else {
-            return lep;
+        if (null != lep) {
+            lep.usePage();
         }
+        return lep;
     }
 
-    /** 
+    /**
      * Grab ledger entry page whose first entry is <code>pageEntry</code>.
      *
      * If the page doesn't existed before, we allocate a memory page.
@@ -158,11 +390,18 @@ class IndexInMemPageMgr {
     private LedgerEntryPage grabLedgerEntryPage(long ledger, long pageEntry) throws IOException {
         LedgerEntryPage lep = grabCleanPage(ledger, pageEntry);
         try {
-            // should update page before we put it into table
-            // otherwise we would put an empty page in it
+            // should get the up to date page from the persistence manager
+            // before we put it into table otherwise we would put
+            // an empty page in it
             indexPersistenceManager.updatePage(lep);
-            synchronized (this) {
-                putIntoTable(pages, lep);
+            LedgerEntryPage oldLep;
+            if (lep != (oldLep = pageMapAndList.putPage(lep))) {
+                lep.releasePage();
+                // Decrement the page count because we couldn't put this lep in the page cache.
+                pageCount.decrementAndGet();
+                // Increment the use count of the old lep because this is unexpected
+                oldLep.usePage();
+                lep = oldLep;
             }
         } catch (IOException ie) {
             // if we grab a clean page, but failed to update the page
@@ -170,154 +409,70 @@ class IndexInMemPageMgr {
             // since this page will be never used, so we need to decrement
             // page count of ledger cache.
             lep.releasePage();
-            synchronized (this) {
-                --pageCount;
-            }
+            pageCount.decrementAndGet();
             throw ie;
         }
         return lep;
     }
 
     void removePagesForLedger(long ledgerId) {
-        // remove pages first to avoid page flushed when deleting file info
-        synchronized (this) {
-            Map<Long, LedgerEntryPage> lpages = pages.remove(ledgerId);
-            if (null != lpages) {
-                pageCount -= lpages.size();
-                if (pageCount < 0) {
-                    LOG.error("Page count of ledger cache has been decremented to be less than zero.");
-                }
-            }
+        int removedPageCount = pageMapAndList.removeEntriesForALedger(ledgerId);
+        if (pageCount.addAndGet(-removedPageCount) < 0) {
+            throw new RuntimeException("Page count of ledger cache has been decremented to be less than zero.");
         }
+        ledgersToFlush.remove(ledgerId);
     }
 
     long getLastEntryInMem(long ledgerId) {
-        long lastEntry = 0;
-        // Find the last entry in the cache
-        synchronized (this) {
-            Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
-            if (map != null) {
-                for (LedgerEntryPage lep : map.values()) {
-                    if (lep.getFirstEntry() + entriesPerPage < lastEntry) {
-                        continue;
-                    }
-                    lep.usePage();
-                    long highest = lep.getLastEntry();
-                    if (highest > lastEntry) {
-                        lastEntry = highest;
-                    }
-                    lep.releasePage();
-                }
-            }
-        }
-        return lastEntry;
+        return pageMapAndList.getLastEntryInMem(ledgerId);
     }
 
     private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
         if (entry % entriesPerPage != 0) {
             throw new IllegalArgumentException(entry + " is not a multiple of " + entriesPerPage);
         }
-        outerLoop: while (true) {
-            synchronized (this) {
-                if (pageCount < pageLimit) {
-                    // let's see if we can allocate something
-                    LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
-                    lep.setLedger(ledger);
-                    lep.setFirstEntry(entry);
-
-                    // note, this will not block since it is a new page
-                    lep.usePage();
-                    pageCount++;
-                    return lep;
-                }
-            }
 
-            synchronized (cleanLedgers) {
-                if (cleanLedgers.isEmpty()) {
-                    flushOneOrMoreLedgers(false);
-                    synchronized (this) {
-                        for (Long l : pages.keySet()) {
-                            cleanLedgers.add(l);
-                        }
-                    }
-                }
-                synchronized (this) {
-                    // if ledgers deleted between checking pageCount and putting
-                    // ledgers into cleanLedgers list, the cleanLedgers list would be empty.
-                    // so give it a chance to go back to check pageCount again because
-                    // deleteLedger would decrement pageCount to return the number of pages
-                    // occupied by deleted ledgers.
-                    if (cleanLedgers.isEmpty()) {
-                        continue outerLoop;
-                    }
-                    Long cleanLedger = cleanLedgers.getFirst();
-                    Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
-                    while (map == null || map.isEmpty()) {
-                        cleanLedgers.removeFirst();
-                        if (cleanLedgers.isEmpty()) {
-                            continue outerLoop;
-                        }
-                        cleanLedger = cleanLedgers.getFirst();
-                        map = pages.get(cleanLedger);
-                    }
-                    Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
-                    LedgerEntryPage lep = it.next().getValue();
-                    while ((lep.inUse() || !lep.isClean())) {
-                        if (!it.hasNext()) {
-                            // no clean page found in this ledger
-                            cleanLedgers.removeFirst();
-                            continue outerLoop;
-                        }
-                        lep = it.next().getValue();
-                    }
-                    it.remove();
-                    if (map.isEmpty()) {
-                        pages.remove(lep.getLedger());
-                    }
-                    lep.usePage();
-                    lep.zeroPage();
-                    lep.setLedger(ledger);
-                    lep.setFirstEntry(entry);
-                    return lep;
-                }
-            }
+        while(true) {
+            boolean canAllocate = false;
+            if (pageCount.incrementAndGet() <= pageLimit) {
+                canAllocate = true;
+            } else {
+                pageCount.decrementAndGet();
+            }
+
+            if (canAllocate) {
+                LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage, pageMapAndList);
+                lep.setLedgerAndFirstEntry(ledger, entry);
+                lep.usePage();
+                return lep;
+            }
+
+            LedgerEntryPage lep = pageMapAndList.grabCleanPage(ledger, entry);
+            if (null != lep) {
+                return lep;
+            }
+            LOG.info("Could not grab a clean page for ledger {}, entry {}, force flushing dirty ledgers.",
+                    ledger, entry);
+            flushOneOrMoreLedgers(false);
         }
     }
 
     void flushOneOrMoreLedgers(boolean doAll) throws IOException {
-        synchronized (dirtyLedgers) {
-            if (dirtyLedgers.isEmpty()) {
-                synchronized (this) {
-                    for (Long l : pages.keySet()) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Adding {} to dirty pages", Long.toHexString(l));
-                        }
-                        dirtyLedgers.add(l);
-                    }
-                }
-            }
-            if (dirtyLedgers.isEmpty()) {
-                return;
+        if (ledgersToFlush.isEmpty()) {
+            ledgersToFlush.addAll(pageMapAndList.getActiveLedgers());
+        }
+        Long potentiallyDirtyLedger;
+        while (null != (potentiallyDirtyLedger = ledgersToFlush.poll())) {
+            if (!ledgersFlushing.add(potentiallyDirtyLedger)) {
+                continue;
+            }
+            try {
+                flushSpecificLedger(potentiallyDirtyLedger);
+            } finally {
+                ledgersFlushing.remove(potentiallyDirtyLedger);
             }
-
-            indexPersistenceManager.relocateIndexFileIfDirFull(dirtyLedgers);
-
-            while (!dirtyLedgers.isEmpty()) {
-                Long l = dirtyLedgers.removeFirst();
-
-                flushSpecificLedger(l);
-
-                if (!doAll) {
-                    break;
-                }
-                // Yield. if we are doing all the ledgers we don't want to block other flushes that
-                // need to happen
-                try {
-                    dirtyLedgers.wait(1);
-                } catch (InterruptedException e) {
-                    // just pass it on
-                    Thread.currentThread().interrupt();
-                }
+            if (!doAll) {
+                break;
             }
         }
     }
@@ -325,31 +480,18 @@ class IndexInMemPageMgr {
     /**
      * Flush a specified ledger
      *
-     * @param l 
+     * @param ledger
      *          Ledger Id
      * @throws IOException
      */
-    private void flushSpecificLedger(long l) throws IOException {
-        LinkedList<Long> firstEntryList;
-        synchronized(this) {
-            HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
-            if (pageMap == null || pageMap.isEmpty()) {
-                indexPersistenceManager.flushLedgerHeader(l);
-                return;
-            }
-            firstEntryList = new LinkedList<Long>();
-            for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
-                LedgerEntryPage lep = entry.getValue();
-                if (lep.isClean()) {
-                    LOG.trace("Page is clean {}", lep);
-                    continue;
-                }
-                firstEntryList.add(lep.getFirstEntry());
-            }
-        }
+    private void flushSpecificLedger(long ledger) throws IOException {
+        LinkedList<Long> firstEntryList = pageMapAndList.getFirstEntryListToBeFlushed(ledger);
+
+        // flush ledger index file header if necessary
+        indexPersistenceManager.flushLedgerHeader(ledger);
 
-        if (firstEntryList.size() == 0) {
-            LOG.debug("Nothing to flush for ledger {}.", l);
+        if (null == firstEntryList || firstEntryList.size() == 0) {
+            LOG.debug("Nothing to flush for ledger {}.", ledger);
             // nothing to do
             return;
         }
@@ -358,12 +500,12 @@ class IndexInMemPageMgr {
         List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
         try {
             for(Long firstEntry: firstEntryList) {
-                LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
+                LedgerEntryPage lep = getLedgerEntryPage(ledger, firstEntry, true);
                 if (lep != null) {
                     entries.add(lep);
                 }
             }
-            indexPersistenceManager.flushLedgerEntries(l, entries);
+            indexPersistenceManager.flushLedgerEntries(ledger, entries);
         } finally {
             for(LedgerEntryPage lep: entries) {
                 lep.releasePage();
@@ -380,7 +522,8 @@ class IndexInMemPageMgr {
         if (lep == null) {
             lep = grabLedgerEntryPage(ledger, pageEntry);
         }
-        lep.setOffset(offset, offsetInPage * 8);
+        assert lep != null;
+        lep.setOffset(offset, offsetInPage * LedgerEntryPage.getIndexEntrySize());
         lep.releasePage();
     }
 
@@ -394,7 +537,7 @@ class IndexInMemPageMgr {
             if (lep == null) {
                 lep = grabLedgerEntryPage(ledger, pageEntry);
             }
-            return lep.getOffset(offsetInPage * 8);
+            return lep.getOffset(offsetInPage * LedgerEntryPage.getIndexEntrySize());
         } finally {
             if (lep != null) {
                 lep.releasePage();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java Tue Oct 22 14:21:28 2013
@@ -23,15 +23,13 @@ package org.apache.bookkeeper.bookie;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -62,7 +60,7 @@ public class IndexPersistenceMgr {
         return sb.toString();
     }
 
-    final HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
+    final ConcurrentMap<Long, FileInfo> fileInfoCache = new ConcurrentHashMap<Long, FileInfo>();
     final int openFileLimit;
     final int pageSize;
     final int entriesPerPage;
@@ -72,7 +70,6 @@ public class IndexPersistenceMgr {
     final SnapshotMap<Long, Boolean> activeLedgers;
     private LedgerDirsManager ledgerDirsManager;
     final LinkedList<Long> openLedgers = new LinkedList<Long>();
-    final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
 
     public IndexPersistenceMgr(int pageSize,
                                int entriesPerPage,
@@ -91,30 +88,53 @@ public class IndexPersistenceMgr {
     }
 
     FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
-        synchronized (fileInfoCache) {
-            FileInfo fi = fileInfoCache.get(ledger);
-            if (fi == null) {
-                File lf = findIndexFile(ledger);
-                if (lf == null) {
-                    if (masterKey == null) {
+        FileInfo fi = fileInfoCache.get(ledger);
+        if (null == fi) {
+            boolean createdNewFile = false;
+            File lf = null;
+            synchronized (this) {
+                // Check if the index file exists on disk.
+                lf = findIndexFile(ledger);
+                if (null == lf) {
+                    if (null == masterKey) {
                         throw new Bookie.NoLedgerException(ledger);
                     }
+                    // We don't have a ledger index file on disk, so create it.
                     lf = getNewLedgerIndexFile(ledger, null);
-                    // A new ledger index file has been created for this Bookie.
-                    // Add this new ledger to the set of active ledgers.
-                    LOG.debug("New ledger index file created for ledgerId: {}", ledger);
-                    activeLedgers.put(ledger, true);
+                    createdNewFile = true;
                 }
-                evictFileInfoIfNecessary();
-                fi = new FileInfo(lf, masterKey);
-                fileInfoCache.put(ledger, fi);
-                openLedgers.add(ledger);
             }
-            if (fi != null) {
-                fi.use();
+            fi = putFileInfo(ledger, masterKey, lf, createdNewFile);
+        }
+
+        assert null != fi;
+        fi.use();
+        return fi;
+    }
+
+    private FileInfo putFileInfo(Long ledger, byte masterKey[], File lf, boolean createdNewFile) throws IOException {
+        FileInfo fi = new FileInfo(lf, masterKey);
+        FileInfo oldFi = fileInfoCache.putIfAbsent(ledger, fi);
+        if (null != oldFi) {
+            // Some other thread won the race. We should delete our file if we created
+            // a new one and the paths are different.
+            if (createdNewFile && !oldFi.isSameFile(lf)) {
+                fi.delete();
+            }
+            fi = oldFi;
+        } else {
+            if (createdNewFile) {
+                // Else, we won and the active ledger manager should know about this.
+                LOG.debug("New ledger index file created for ledgerId: {}", ledger);
+                activeLedgers.put(ledger, true);
+            }
+            // Evict cached items from the file info cache if necessary
+            evictFileInfoIfNecessary();
+            synchronized (openLedgers) {
+                openLedgers.offer(ledger);
             }
-            return fi;
         }
+        return fi;
     }
 
     /**
@@ -205,10 +225,7 @@ public class IndexPersistenceMgr {
         activeLedgers.remove(ledgerId);
 
         // Now remove it from all the other lists and maps.
-        // These data structures need to be synchronized first before removing entries.
-        synchronized (fileInfoCache) {
-            fileInfoCache.remove(ledgerId);
-        }
+        fileInfoCache.remove(ledgerId);
         synchronized (openLedgers) {
             openLedgers.remove(ledgerId);
         }
@@ -226,13 +243,11 @@ public class IndexPersistenceMgr {
     }
 
     boolean ledgerExists(long ledgerId) throws IOException {
-        synchronized (fileInfoCache) {
-            FileInfo fi = fileInfoCache.get(ledgerId);
-            if (fi == null) {
-                File lf = findIndexFile(ledgerId);
-                if (lf == null) {
-                    return false;
-                }
+        FileInfo fi = fileInfoCache.get(ledgerId);
+        if (fi == null) {
+            File lf = findIndexFile(ledgerId);
+            if (lf == null) {
+                return false;
             }
         }
         return true;
@@ -244,45 +259,47 @@ public class IndexPersistenceMgr {
 
     // evict file info if necessary
     private void evictFileInfoIfNecessary() throws IOException {
-        synchronized (fileInfoCache) {
-            if (openLedgers.size() > openFileLimit) {
-                long ledgerToRemove = openLedgers.removeFirst();
-                // TODO Add a statistic here, we don't care really which
-                // ledger is evicted, but the rate at which they get evicted 
-                fileInfoCache.remove(ledgerToRemove).close(true);
+        if (openLedgers.size() > openFileLimit) {
+            Long ledgerToRemove;
+            synchronized (openLedgers) {
+                ledgerToRemove = openLedgers.poll();
+            }
+            if (null == ledgerToRemove) {
+                // Should not reach here. We probably cleared this while the thread
+                // was executing.
+                return;
             }
-        }
+            // TODO Add a statistic here, we don't care really which
+            // ledger is evicted, but the rate at which they get evicted
+            FileInfo fi = fileInfoCache.remove(ledgerToRemove);
+            if (null == fi) {
+                // Seems like someone else already closed the file.
+                return;
+            }
+            fi.close(true);
+         }
     }
 
     void close() throws IOException {
-        synchronized (fileInfoCache) {
-            for (Entry<Long, FileInfo> fileInfo : fileInfoCache.entrySet()) {
-                FileInfo value = fileInfo.getValue();
-                if (value != null) {
-                    value.close(true);
-                }
+        for (Entry<Long, FileInfo> fileInfo : fileInfoCache.entrySet()) {
+            FileInfo value = fileInfo.getValue();
+            if (value != null) {
+                value.close(true);
             }
-            fileInfoCache.clear();
         }
+        fileInfoCache.clear();
     }
 
     byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
-        synchronized (fileInfoCache) {
-            FileInfo fi = fileInfoCache.get(ledgerId);
-            if (fi == null) {
-                File lf = findIndexFile(ledgerId);
-                if (lf == null) {
-                    throw new Bookie.NoLedgerException(ledgerId);
-                }
-                evictFileInfoIfNecessary();
-                fi = new FileInfo(lf, null);
-                byte[] key = fi.getMasterKey();
-                fileInfoCache.put(ledgerId, fi);
-                openLedgers.add(ledgerId);
-                return key;
+        FileInfo fi = fileInfoCache.get(ledgerId);
+        if (fi == null) {
+            File lf = findIndexFile(ledgerId);
+            if (lf == null) {
+                throw new Bookie.NoLedgerException(ledgerId);
             }
-            return fi.getMasterKey();
+            fi = putFileInfo(ledgerId, null, lf, false);
         }
+        return fi.getMasterKey();
     }
 
     void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
@@ -328,9 +345,7 @@ public class IndexPersistenceMgr {
         return new LedgerDirsListener() {
             @Override
             public void diskFull(File disk) {
-                // If the current entry log disk is full, then create new entry
-                // log.
-                shouldRelocateIndexFile.set(true);
+                // Nothing to handle here. Will be handled in Bookie
             }
 
             @Override
@@ -350,26 +365,12 @@ public class IndexPersistenceMgr {
         };
     }
 
-    void relocateIndexFileIfDirFull(Collection<Long> dirtyLedgers) throws IOException {
-        if (shouldRelocateIndexFile.get()) {
-            // if some new dir detected as full, then move all corresponding
-            // open index files to new location
-            for (Long l : dirtyLedgers) {
-                FileInfo fi = null;
-                try {
-                    fi = getFileInfo(l, null);
-                    File currentDir = getLedgerDirForLedger(fi);
-                    if (ledgerDirsManager.isDirFull(currentDir)) {
-                        moveLedgerIndexFile(l, fi);
-                    }
-                } finally {
-                    if (null != fi) {
-                        fi.release();
-                    }
-                }
-            }
-            shouldRelocateIndexFile.set(false);
+    private void relocateIndexFileAndFlushHeader(long ledger, FileInfo fi) throws IOException {
+        File currentDir = getLedgerDirForLedger(fi);
+        if (ledgerDirsManager.isDirFull(currentDir)) {
+            moveLedgerIndexFile(ledger, fi);
         }
+        fi.flushHeader();
     }
 
     /**
@@ -391,16 +392,16 @@ public class IndexPersistenceMgr {
         FileInfo fi = null;
         try {
             fi = getFileInfo(ledger, null);
-            fi.flushHeader();
+            relocateIndexFileAndFlushHeader(ledger, fi);
         } catch (Bookie.NoLedgerException nle) {
             // ledger has been deleted
+            LOG.info("No ledger {} found when flushing header.", ledger);
             return;
         } finally {
             if (null != fi) {
                 fi.release();
             }
         }
-        return;
     }
 
     void flushLedgerEntries(long l, List<LedgerEntryPage> entries) throws IOException {
@@ -412,20 +413,21 @@ public class IndexPersistenceMgr {
                     return (int) (o1.getFirstEntry() - o2.getFirstEntry());
                 }
             });
-            ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
+            int[] versions = new int[entries.size()];
             try {
                 fi = getFileInfo(l, null);
             } catch (Bookie.NoLedgerException nle) {
                 // ledger has been deleted
+                LOG.info("No ledger {} found when flushing entries.", l);
                 return;
             }
 
             // flush the header if necessary
-            fi.flushHeader();
+            relocateIndexFileAndFlushHeader(l, fi);
             int start = 0;
             long lastOffset = -1;
             for (int i = 0; i < entries.size(); i++) {
-                versions.add(i, entries.get(i).getVersion());
+                versions[i] = entries.get(i).getVersion();
                 if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != entriesPerPage) {
                     // send up a sequential list
                     int count = i - start;
@@ -441,11 +443,12 @@ public class IndexPersistenceMgr {
                 LOG.warn("Nothing to write, but there were entries!");
             }
             writeBuffers(l, entries, fi, start, entries.size() - start);
-            synchronized (this) {
-                for (int i = 0; i < entries.size(); i++) {
-                    LedgerEntryPage lep = entries.get(i);
-                    lep.setClean(versions.get(i));
-                }
+            for (int i = 0; i < entries.size(); i++) {
+                LedgerEntryPage lep = entries.get(i);
+                lep.setClean(versions[i]);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Flushed ledger {} with {} pages.", l, entries.size());
             }
         } finally {
             if (fi != null) {
@@ -473,7 +476,7 @@ public class IndexPersistenceMgr {
         }
         long totalWritten = 0;
         while (buffs[buffs.length - 1].remaining() > 0) {
-            long rc = fi.write(buffs, entries.get(start + 0).getFirstEntry() * 8);
+            long rc = fi.write(buffs, entries.get(start + 0).getFirstEntryPosition());
             if (rc <= 0) {
                 throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
             }
@@ -492,7 +495,7 @@ public class IndexPersistenceMgr {
         FileInfo fi = null;
         try {
             fi = getFileInfo(lep.getLedger(), null);
-            long pos = lep.getFirstEntry() * 8;
+            long pos = lep.getFirstEntryPosition();
             if (pos >= fi.size()) {
                 lep.zeroPage();
             } else {
@@ -513,12 +516,12 @@ public class IndexPersistenceMgr {
             long size = fi.size();
             // make sure the file size is aligned with index entry size
             // otherwise we may read incorret data
-            if (0 != size % 8) {
+            if (0 != size % LedgerEntryPage.getIndexEntrySize()) {
                 LOG.warn("Index file of ledger {} is not aligned with index entry size.", ledgerId);
-                size = size - size % 8;
+                size = size - size % LedgerEntryPage.getIndexEntrySize();
             }
             // we may not have the last entry in the cache
-            if (size > lastEntry * 8) {
+            if (size > lastEntry * LedgerEntryPage.getIndexEntrySize()) {
                 ByteBuffer bb = ByteBuffer.allocate(pageSize);
                 long position = size - pageSize;
                 if (position < 0) {
@@ -526,9 +529,9 @@ public class IndexPersistenceMgr {
                 }
                 fi.read(bb, position);
                 bb.flip();
-                long startingEntryId = position / 8;
+                long startingEntryId = position / LedgerEntryPage.getIndexEntrySize();
                 for (int i = entriesPerPage - 1; i >= 0; i--) {
-                    if (bb.getLong(i * 8) != 0) {
+                    if (bb.getLong(i * LedgerEntryPage.getIndexEntrySize()) != 0) {
                         if (lastEntry < startingEntryId + i) {
                             lastEntry = startingEntryId + i;
                         }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LEPStateChangeCallback.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LEPStateChangeCallback.java?rev=1534640&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LEPStateChangeCallback.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LEPStateChangeCallback.java Tue Oct 22 14:21:28 2013
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+/**
+ * Callback interface when state of ledger entry page changed.
+ */
+interface LEPStateChangeCallback {
+    public void onSetInUse(LedgerEntryPage lep);
+    public void onResetInUse(LedgerEntryPage lep);
+    public void onSetClean(LedgerEntryPage lep);
+    public void onSetDirty(LedgerEntryPage lep);
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Tue Oct 22 14:21:28 2013
@@ -162,16 +162,6 @@ public class LedgerCacheImpl implements 
             }
 
             @Override
-            public int getNumCleanLedgers() {
-                return LedgerCacheImpl.this.indexPageManager.getNumCleanLedgers();
-            }
-
-            @Override
-            public int getNumDirtyLedgers() {
-                return LedgerCacheImpl.this.indexPageManager.getNumDirtyLedgers();
-            }
-
-            @Override
             public int getNumOpenLedgers() {
                 return LedgerCacheImpl.this.indexPersistenceManager.getNumOpenLedgers();
             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheMXBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheMXBean.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheMXBean.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheMXBean.java Tue Oct 22 14:21:28 2013
@@ -44,16 +44,6 @@ public interface LedgerCacheMXBean {
     public int getPageLimit();
 
     /**
-     * @return number of clean ledgers
-     */
-    public int getNumCleanLedgers();
-
-    /**
-     * @return number of dirty ledgers
-     */
-    public int getNumDirtyLedgers();
-
-    /**
      * @return number of open ledgers
      */
     public int getNumOpenLedgers();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java?rev=1534640&r1=1534639&r2=1534640&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java Tue Oct 22 14:21:28 2013
@@ -21,30 +21,45 @@
 
 package org.apache.bookkeeper.bookie;
 
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.util.ZeroBuffer;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
-
-import org.apache.bookkeeper.proto.BookieProtocol;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * This is a page in the LedgerCache. It holds the locations
  * (entrylogfile, offset) for entry ids.
  */
 public class LedgerEntryPage {
+    private final static int indexEntrySize = 8;
     private final int pageSize;
     private final int entriesPerPage;
-    private long ledger = -1;
-    private long firstEntry = BookieProtocol.INVALID_ENTRY_ID;
+    volatile private EntryKey entryKey = new EntryKey(-1, BookieProtocol.INVALID_ENTRY_ID);
     private final ByteBuffer page;
-    private boolean clean = true;
-    private boolean pinned = false;
-    private int useCount;
-    private int version;
+    volatile private boolean clean = true;
+    private final AtomicInteger useCount = new AtomicInteger();
+    private final AtomicInteger version = new AtomicInteger(0);
+    volatile private int last = -1; // Last update position
+    private final LEPStateChangeCallback callback;
+
+    public static int getIndexEntrySize() {
+        return indexEntrySize;
+    }
 
     public LedgerEntryPage(int pageSize, int entriesPerPage) {
+        this(pageSize, entriesPerPage, null);
+    }
+
+    public LedgerEntryPage(int pageSize, int entriesPerPage, LEPStateChangeCallback callback) {
         this.pageSize = pageSize;
         this.entriesPerPage = entriesPerPage;
         page = ByteBuffer.allocateDirect(pageSize);
+        this.callback = callback;
+        if (null != this.callback) {
+            callback.onResetInUse(this);
+        }
     }
 
     @Override
@@ -54,32 +69,33 @@ public class LedgerEntryPage {
         sb.append('@');
         sb.append(getFirstEntry());
         sb.append(clean ? " clean " : " dirty ");
-        sb.append(useCount);
+        sb.append(useCount.get());
         return sb.toString();
     }
-    synchronized public void usePage() {
-        useCount++;
-    }
-    synchronized public void pin() {
-        pinned = true;
-    }
-    synchronized public void unpin() {
-        pinned = false;
-    }
-    synchronized public boolean isPinned() {
-        return pinned;
+
+    public void usePage() {
+        int oldVal = useCount.getAndIncrement();
+        if ((0 == oldVal) && (null != callback)) {
+            callback.onSetInUse(this);
+        }
     }
-    synchronized public void releasePage() {
-        useCount--;
-        if (useCount < 0) {
+
+    public void releasePage() {
+        int newUseCount = useCount.decrementAndGet();
+        if (newUseCount < 0) {
             throw new IllegalStateException("Use count has gone below 0");
         }
+        if ((null != callback) && (newUseCount == 0)) {
+            callback.onResetInUse(this);
+        }
     }
-    synchronized private void checkPage() {
-        if (useCount <= 0) {
+
+    private void checkPage() {
+        if (useCount.get() <= 0) {
             throw new IllegalStateException("Page not marked in use");
         }
     }
+
     @Override
     public boolean equals(Object other) {
         if (other instanceof LedgerEntryPage) {
@@ -89,75 +105,120 @@ public class LedgerEntryPage {
             return false;
         }
     }
+
     @Override
     public int hashCode() {
         return (int)getLedger() ^ (int)(getFirstEntry());
     }
+
     void setClean(int versionOfCleaning) {
-        this.clean = (versionOfCleaning == version);
+        this.clean = (versionOfCleaning == version.get());
+
+        if ((null != callback) && clean) {
+            callback.onSetClean(this);
+        }
     }
+
     boolean isClean() {
         return clean;
     }
+
     public void setOffset(long offset, int position) {
         checkPage();
-        version++;
-        this.clean = false;
         page.putLong(position, offset);
+        version.incrementAndGet();
+        if (last < position/getIndexEntrySize()) {
+            last = position/getIndexEntrySize();
+        }
+        this.clean = false;
+
+        if (null != callback) {
+            callback.onSetDirty(this);
+        }
     }
+
     public long getOffset(int position) {
         checkPage();
         return page.getLong(position);
     }
-    static final byte zeroPage[] = new byte[64*1024];
+
     public void zeroPage() {
         checkPage();
         page.clear();
-        page.put(zeroPage, 0, page.remaining());
+        ZeroBuffer.put(page);
+        last = -1;
         clean = true;
     }
+
     public void readPage(FileInfo fi) throws IOException {
         checkPage();
         page.clear();
         while(page.remaining() != 0) {
-            if (fi.read(page, getFirstEntry()*8) <= 0) {
-                throw new IOException("Short page read of ledger " + getLedger() + " tried to get " + page.capacity() + " from position " + getFirstEntry()*8 + " still need " + page.remaining());
+            if (fi.read(page, getFirstEntryPosition()) <= 0) {
+                throw new IOException("Short page read of ledger " + getLedger()
+                                + " tried to get " + page.capacity() + " from position " + getFirstEntryPosition()
+                                + " still need " + page.remaining());
             }
         }
+        last = getLastEntryIndex();
         clean = true;
     }
+
     public ByteBuffer getPageToWrite() {
         checkPage();
         page.clear();
         return page;
     }
-    void setLedger(long ledger) {
-        this.ledger = ledger;
-    }
+
     long getLedger() {
-        return ledger;
+        return entryKey.getLedgerId();
     }
+
     int getVersion() {
-        return version;
+        return version.get();
     }
-    void setFirstEntry(long firstEntry) {
+
+    public EntryKey getEntryKey() {
+        return entryKey;
+    }
+
+    void setLedgerAndFirstEntry(long ledgerId, long firstEntry) {
         if (firstEntry % entriesPerPage != 0) {
             throw new IllegalArgumentException(firstEntry + " is not a multiple of " + entriesPerPage);
         }
-        this.firstEntry = firstEntry;
+        this.entryKey = new EntryKey(ledgerId, firstEntry);
     }
     long getFirstEntry() {
-        return firstEntry;
+        return entryKey.getEntryId();
     }
+
+    long getMaxPossibleEntry() {
+        return entryKey.getEntryId() + entriesPerPage;
+    }
+
+    long getFirstEntryPosition() {
+        return entryKey.getEntryId() * indexEntrySize;
+    }
+
     public boolean inUse() {
-        return useCount > 0;
+        return useCount.get() > 0;
     }
-    public long getLastEntry() {
+
+    private int getLastEntryIndex() {
         for(int i = entriesPerPage - 1; i >= 0; i--) {
-            if (getOffset(i*8) > 0) {
-                return i + firstEntry;
+            if (getOffset(i*getIndexEntrySize()) > 0) {
+                return i;
             }
         }
-        return 0;
+        return -1;
+    }
+
+    public long getLastEntry() {
+        if (last >= 0) {
+            return last + entryKey.getEntryId();
+        } else {
+            int index = getLastEntryIndex();
+            return index >= 0 ? (index + entryKey.getEntryId()) : 0;
+        }
     }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZeroBuffer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZeroBuffer.java?rev=1534640&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZeroBuffer.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZeroBuffer.java Tue Oct 22 14:21:28 2013
@@ -0,0 +1,73 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Zero buffer utility.
+ *
+ */
+
+public class ZeroBuffer {
+    static final byte zeroBytes[] = new byte[64*1024];
+
+    /**
+     * fill zeros into given buffer
+     * @param dst
+     */
+    public static void put(ByteBuffer dst) {
+        put(dst, dst.remaining());
+    }
+
+    /**
+     * fill zeros into given buffer up to given length
+     * @param dst
+     * @param length
+     */
+    public static void put(ByteBuffer dst, int length) {
+        while (length > zeroBytes.length) {
+            dst.put(zeroBytes);
+            length -= zeroBytes.length;
+        }
+        if (length > 0) {
+            dst.put(zeroBytes, 0, length);
+        }
+    }
+
+    /**
+     * returns read-only zero-filled buffer,
+     * @param length
+     * @return ByteBuffer
+     */
+    public static ByteBuffer readOnlyBuffer(int length) {
+        ByteBuffer buffer;
+        if (length <= zeroBytes.length) {
+            buffer = ByteBuffer.wrap(zeroBytes, 0, length);
+        }
+        else {
+            buffer = ByteBuffer.allocate(length);
+            put(buffer);
+        }
+        return buffer.asReadOnlyBuffer();
+    }
+}



Mime
View raw message