bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject [bookkeeper] branch master updated: BOOKKEEPER-1106: Introduce write FileInfo cache and read FileInfo cache
Date Tue, 03 Oct 2017 10:33:30 GMT
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bbecd7  BOOKKEEPER-1106: Introduce write FileInfo cache and read FileInfo cache
2bbecd7 is described below

commit 2bbecd7da52824c991e1a28573ccf8c8515ea5dd
Author: Yiming Zang <yzang@twitter.com>
AuthorDate: Tue Oct 3 12:31:59 2017 +0200

    BOOKKEEPER-1106: Introduce write FileInfo cache and read FileInfo cache
    
    Problem: when read behind happens, it would quickly read bunch of ledgers, which will
evict current active ledgers for writing from the ledger cache. with the ledger being evicted
from cache, it would impact the write performance.
    
    This feature is contributed by sijie , in which we introduced write file info cache and
read file info cache to cache the ledger index separately for read and write, so that when
catch up read happens, it will not evict the file info for writes.
    
    Author: Yiming Zang <yzang@twitter.com>
    
    Reviewers: Enrico Olivelli <eolivelli@apache.org>, Jia Zhai
    
    This closes #513 from yzang/yzang/BOOKKEEPER-1106
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   3 +
 .../bookkeeper/bookie/IndexPersistenceMgr.java     | 307 ++++++++++++++-------
 .../bookkeeper/conf/ServerConfiguration.java       |  54 +++-
 .../bookkeeper/bookie/IndexPersistenceMgrTest.java | 138 ++++++++-
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |   3 +-
 5 files changed, 398 insertions(+), 107 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index cf539ed..2e104cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -119,6 +119,9 @@ public interface BookKeeperServerStats {
     String SKIP_LIST_THROTTLING = "SKIP_LIST_THROTTLING";
     String READ_LAST_ENTRY_NOENTRY_ERROR = "READ_LAST_ENTRY_NOENTRY_ERROR";
     String LEDGER_CACHE_NUM_EVICTED_LEDGERS = "LEDGER_CACHE_NUM_EVICTED_LEDGERS";
+    String PENDING_GET_FILE_INFO = "PENDING_GET_FILE_INFO";
+    String WRITE_FILE_INFO_CACHE_SIZE = "WRITE_FILE_INFO_CACHE_SIZE";
+    String READ_FILE_INFO_CACHE_SIZE = "READ_FILE_INFO_CACHE_SIZE";
     String BOOKIES_JOINED = "BOOKIES_JOINED";
     String BOOKIES_LEFT = "BOOKIES_LEFT";
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 2fd03a2..6281eea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -21,22 +21,34 @@
 package org.apache.bookkeeper.bookie;
 
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.PENDING_GET_FILE_INFO;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_FILE_INFO_CACHE_SIZE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_FILE_INFO_CACHE_SIZE;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
 import io.netty.buffer.ByteBuf;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map.Entry;
+import java.util.Map;
 import java.util.Observable;
 import java.util.Observer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -70,19 +82,25 @@ public class IndexPersistenceMgr {
         return sb.toString();
     }
 
-    final ConcurrentMap<Long, FileInfo> fileInfoCache = new ConcurrentHashMap<Long,
FileInfo>();
+    // use two separate cache for write and read
+    final Cache<Long, FileInfo> writeFileInfoCache;
+    final Cache<Long, FileInfo> readFileInfoCache;
     final int openFileLimit;
     final int pageSize;
     final int entriesPerPage;
+    // Lock
+    final ReentrantReadWriteLock fileInfoLock = new ReentrantReadWriteLock();
+    // ThreadPool
+    final ScheduledExecutorService evictionThreadPool = Executors.newSingleThreadScheduledExecutor();
 
     // Manage all active ledgers in LedgerManager
     // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
     final SnapshotMap<Long, Boolean> activeLedgers;
-    private LedgerDirsManager ledgerDirsManager;
-    final LinkedList<Long> openLedgers = new LinkedList<Long>();
+    final LedgerDirsManager ledgerDirsManager;
 
     // Stats
     private final Counter evictedLedgersCounter;
+    private final Counter pendingGetFileInfoCounter;
 
     public IndexPersistenceMgr(int pageSize,
                                int entriesPerPage,
@@ -100,71 +118,160 @@ public class IndexPersistenceMgr {
         getActiveLedgers();
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
 
+        // build the file info cache
+        int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
+        RemovalListener<Long, FileInfo> fileInfoEvictionListener = this::handleLedgerEviction;
+        writeFileInfoCache = buildCache(
+            concurrencyLevel,
+            conf.getFileInfoCacheInitialCapacity(),
+            openFileLimit,
+            conf.getFileInfoMaxIdleTime(),
+            fileInfoEvictionListener);
+        readFileInfoCache = buildCache(
+            concurrencyLevel,
+            2 * conf.getFileInfoCacheInitialCapacity(),
+            2 * openFileLimit,
+            conf.getFileInfoMaxIdleTime(),
+            fileInfoEvictionListener);
+
         // Expose Stats
         evictedLedgersCounter = statsLogger.getCounter(LEDGER_CACHE_NUM_EVICTED_LEDGERS);
-        statsLogger.registerGauge(NUM_OPEN_LEDGERS, new Gauge<Integer>() {
+        pendingGetFileInfoCounter = statsLogger.getCounter(PENDING_GET_FILE_INFO);
+        statsLogger.registerGauge(WRITE_FILE_INFO_CACHE_SIZE, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return writeFileInfoCache.size();
+            }
+        });
+        statsLogger.registerGauge(READ_FILE_INFO_CACHE_SIZE, new Gauge<Number>() {
             @Override
-            public Integer getDefaultValue() {
+            public Number getDefaultValue() {
                 return 0;
             }
 
             @Override
-            public Integer getSample() {
-                return getNumOpenLedgers();
+            public Number getSample() {
+                return readFileInfoCache.size();
             }
         });
     }
 
-    FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
-        FileInfo fi = fileInfoCache.get(ledger);
-        if (null == fi) {
-            boolean createdNewFile = false;
-            File lf = null;
-            synchronized (this) {
-                // Check if the index file exists on disk.
-                lf = findIndexFile(ledger);
-                if (null == lf) {
-                    if (null == masterKey) {
-                        throw new Bookie.NoLedgerException(ledger);
+    private static Cache<Long, FileInfo> buildCache(int concurrencyLevel,
+                                            int initialCapacity,
+                                            int maximumSize,
+                                            long expireAfterAccessSeconds,
+                                            RemovalListener<Long, FileInfo> removalListener)
{
+        CacheBuilder<Long, FileInfo> builder = CacheBuilder.newBuilder()
+            .concurrencyLevel(concurrencyLevel)
+            .initialCapacity(initialCapacity)
+            .maximumSize(maximumSize)
+            .removalListener(removalListener);
+        if (expireAfterAccessSeconds > 0) {
+            builder.expireAfterAccess(expireAfterAccessSeconds, TimeUnit.SECONDS);
+        }
+        return builder.build();
+    }
+
+    /**
+     * When a ledger is evicted, we need to make sure there's no other thread
+     * trying to get FileInfo for that ledger at the same time when we close
+     * the FileInfo.
+     */
+    private void handleLedgerEviction(RemovalNotification<Long, FileInfo> notification)
{
+        FileInfo fileInfo = notification.getValue();
+        Long ledgerId = notification.getKey();
+        if (null == fileInfo || null == notification.getKey()) {
+            return;
+        }
+        if (notification.wasEvicted()) {
+            evictedLedgersCounter.inc();
+            // we need to acquire the write lock in another thread,
+            // otherwise there could be dead lock happening.
+            evictionThreadPool.execute(() -> {
+                fileInfoLock.writeLock().lock();
+                try {
+                    // We only close the fileInfo when we evict the FileInfo from both cache
+                    if (!readFileInfoCache.asMap().containsKey(ledgerId) &&
+                        !writeFileInfoCache.asMap().containsKey(ledgerId)) {
+                        fileInfo.close(true);
                     }
-                    // We don't have a ledger index file on disk, so create it.
-                    lf = getNewLedgerIndexFile(ledger, null);
-                    createdNewFile = true;
+                } catch (IOException e) {
+                    LOG.error("Exception closing file info when ledger {} is evicted from
file info cache.",
+                        ledgerId, e);
+                } finally {
+                    fileInfoLock.writeLock().unlock();
                 }
-            }
-            fi = putFileInfo(ledger, masterKey, lf, createdNewFile);
+            });
         }
-
-        assert null != fi;
-        fi.use();
-        return fi;
+        fileInfo.release();
     }
 
-    private FileInfo putFileInfo(Long ledger, byte masterKey[], File lf, boolean createdNewFile)
throws IOException {
-        FileInfo fi = new FileInfo(lf, masterKey);
-        FileInfo oldFi = fileInfoCache.putIfAbsent(ledger, fi);
-        if (null != oldFi) {
-            // Some other thread won the race. We should delete our file if we created
-            // a new one and the paths are different.
-            if (createdNewFile && !oldFi.isSameFile(lf)) {
-                fi.delete();
-            }
-            fi = oldFi;
-        } else {
-            if (createdNewFile) {
-                // Else, we won and the active ledger manager should know about this
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("New ledger index file created for ledgerId: {}", ledger);
+    class FileInfoLoader implements Callable<FileInfo> {
+
+        final long ledger;
+        final byte[] masterKey;
+
+        FileInfoLoader(long ledger, byte[] masterKey) {
+            this.ledger = ledger;
+            this.masterKey = masterKey;
+        }
+
+        @Override
+        public FileInfo call() throws IOException {
+            File lf = findIndexFile(ledger);
+            if (null == lf) {
+                if (null == masterKey) {
+                    throw new Bookie.NoLedgerException(ledger);
                 }
+                // We don't have a ledger index file on disk or in cache, so create it.
+                lf = getNewLedgerIndexFile(ledger, null);
                 activeLedgers.put(ledger, true);
             }
-            // Evict cached items from the file info cache if necessary
-            evictFileInfoIfNecessary();
-            synchronized (openLedgers) {
-                openLedgers.offer(ledger);
+            FileInfo fi = new FileInfo(lf, masterKey);
+            fi.use();
+            return fi;
+        }
+    }
+
+    /**
+     * Get the FileInfo and increase reference count.
+     * When we get FileInfo from cache, we need to make sure it is synchronized
+     * with eviction, otherwise there might be a race condition as we get
+     * the FileInfo from cache, that FileInfo is then evicted and closed before we
+     * could even increase the reference counter.
+     */
+    FileInfo getFileInfo(final Long ledger, final byte masterKey[]) throws IOException {
+        try {
+            FileInfo fi;
+            pendingGetFileInfoCounter.inc();
+            fileInfoLock.readLock().lock();
+            if (null != masterKey) {
+                fi = writeFileInfoCache.get(ledger,
+                    new FileInfoLoader(ledger, masterKey));
+                if (null == readFileInfoCache.asMap().putIfAbsent(ledger, fi)) {
+                    fi.use();
+                }
+            } else {
+                fi = readFileInfoCache.get(ledger,
+                    new FileInfoLoader(ledger, null));
+            }
+            fi.use();
+            return fi;
+        } catch (ExecutionException | UncheckedExecutionException ee) {
+            if (ee.getCause() instanceof IOException) {
+                throw (IOException) ee.getCause();
+            } else {
+                throw new IOException("Failed to load file info for ledger " + ledger, ee);
             }
+        } finally {
+            pendingGetFileInfoCounter.dec();
+            fileInfoLock.readLock().unlock();
         }
-        return fi;
     }
 
     /**
@@ -248,29 +355,32 @@ public class IndexPersistenceMgr {
      * This method is called whenever a ledger is deleted by the BookKeeper Client
      * and we want to remove all relevant data for it stored in the LedgerCache.
      */
-    void removeLedger(long ledgerId) throws IOException {
+    void removeLedger(Long ledgerId) throws IOException {
         // Delete the ledger's index file and close the FileInfo
         FileInfo fi = null;
+        fileInfoLock.writeLock().lock();
         try {
             fi = getFileInfo(ledgerId, null);
+            // Don't force flush. There's no need since we're deleting the ledger
+            // anyway, and recreating the file at this point, although safe, will
+            // force the garbage collector to do more work later.
             fi.close(false);
             fi.delete();
         } finally {
-            // should release use count
-            // otherwise the file channel would not be closed.
-            if (null != fi) {
-                fi.release();
+            try {
+                if (fi != null) {
+                    // should release use count
+                    fi.release();
+                    // Remove it from the active ledger manager
+                    activeLedgers.remove(ledgerId);
+                    // Now remove it from cache
+                    writeFileInfoCache.invalidate(ledgerId);
+                    readFileInfoCache.invalidate(ledgerId);
+                }
+            } finally {
+                fileInfoLock.writeLock().unlock();
             }
         }
-
-        // Remove it from the active ledger manager
-        activeLedgers.remove(ledgerId);
-
-        // Now remove it from all the other lists and maps.
-        fileInfoCache.remove(ledgerId);
-        synchronized (openLedgers) {
-            openLedgers.remove(ledgerId);
-        }
     }
 
     private File findIndexFile(long ledgerId) throws IOException {
@@ -288,40 +398,29 @@ public class IndexPersistenceMgr {
         return activeLedgers.containsKey(ledgerId);
     }
 
-    int getNumOpenLedgers() {
-        return openLedgers.size();
-    }
-
-    // evict file info if necessary
-    private void evictFileInfoIfNecessary() throws IOException {
-        if (openLedgers.size() > openFileLimit) {
-            Long ledgerToRemove;
-            synchronized (openLedgers) {
-                ledgerToRemove = openLedgers.poll();
-            }
-            if (null == ledgerToRemove) {
-                // Should not reach here. We probably cleared this while the thread
-                // was executing.
-                return;
-            }
-            evictedLedgersCounter.inc();
-            FileInfo fi = fileInfoCache.remove(ledgerToRemove);
-            if (null == fi) {
-                // Seems like someone else already closed the file.
-                return;
-            }
-            fi.close(true);
-         }
-    }
-
     void close() throws IOException {
-        for (Entry<Long, FileInfo> fileInfo : fileInfoCache.entrySet()) {
-            FileInfo value = fileInfo.getValue();
-            if (value != null) {
-                value.close(true);
+        // Don't force create the file. We may have many dirty ledgers and file create/flush
+        // can be quite expensive as a result. We can use this optimization in this case
+        // because metadata will be recovered from the journal when we restart anyway.
+        try {
+            fileInfoLock.writeLock().lock();
+            for(Map.Entry<Long, FileInfo> entry : writeFileInfoCache.asMap().entrySet())
{
+                entry.getValue().close(false);
             }
+            for(Map.Entry<Long, FileInfo> entry : readFileInfoCache.asMap().entrySet())
{
+                entry.getValue().close(false);
+            }
+            writeFileInfoCache.invalidateAll();
+            readFileInfoCache.invalidateAll();
+        } finally {
+            fileInfoLock.writeLock().unlock();
+        }
+        evictionThreadPool.shutdown();
+        try {
+            evictionThreadPool.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            //ignore
         }
-        fileInfoCache.clear();
     }
 
     Long getLastAddConfirmed(long ledgerId) throws IOException {
@@ -361,15 +460,15 @@ public class IndexPersistenceMgr {
     }
 
     byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
-        FileInfo fi = fileInfoCache.get(ledgerId);
-        if (fi == null) {
-            File lf = findIndexFile(ledgerId);
-            if (lf == null) {
-                throw new Bookie.NoLedgerException(ledgerId);
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return fi.getMasterKey();
+        } finally {
+            if (null != fi) {
+                fi.release();
             }
-            fi = putFileInfo(ledgerId, null, lf, false);
         }
-        return fi.getMasterKey();
     }
 
     void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index ee55a0f..21e00c2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -64,6 +64,8 @@ public class ServerConfiguration extends AbstractConfiguration {
     protected final static String OPEN_FILE_LIMIT = "openFileLimit";
     protected final static String PAGE_LIMIT = "pageLimit";
     protected final static String PAGE_SIZE = "pageSize";
+    protected final static String FILEINFO_CACHE_INITIAL_CAPACITY = "fileInfoCacheInitialCapacity";
+    protected final static String FILEINFO_MAX_IDLE_TIME = "fileInfoMaxIdleTime";
     // Journal Parameters
     protected final static String MAX_JOURNAL_SIZE = "journalMaxSizeMB";
     protected final static String MAX_BACKUP_JOURNALS = "journalMaxBackups";
@@ -402,6 +404,56 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get the minimum total size for the internal file info cache tables.
+     * Providing a large enough estimate at construction time avoids the need for
+     * expensive resizing operations later, but setting this value unnecessarily high
+     * wastes memory.
+     *
+     * @return minimum size of initial file info cache.
+     */
+    public int getFileInfoCacheInitialCapacity() {
+        return getInt(FILEINFO_CACHE_INITIAL_CAPACITY, 64);
+    }
+
+    /**
+     * Set the minimum total size for the internal file info cache tables for initialization.
+     *
+     * @param initialCapacity
+     *          Initial capacity of file info cache table.
+     * @return server configuration instance.
+     */
+    public ServerConfiguration setFileInfoCacheInitialCapacity(int initialCapacity) {
+        setProperty(FILEINFO_CACHE_INITIAL_CAPACITY, initialCapacity);
+        return this;
+    }
+
+    /**
+     * Get the max idle time allowed for a open file info existed in file info cache.
+     * If the file info is idle for a long time, exceed the given time period. The file
+     * info will be evicted and closed. If the value is zero, the file info is evicted
+     * only when opened files reached openFileLimit.
+     *
+     * @see #getOpenFileLimit
+     * @return max idle time of a file info in the file info cache.
+     */
+    public long getFileInfoMaxIdleTime() {
+        return this.getLong(FILEINFO_MAX_IDLE_TIME, 0L);
+    }
+
+    /**
+     * Set the max idle time allowed for a open file info existed in file info cache.
+     *
+     * @param idleTime
+     *          Idle time, in seconds.
+     * @see #getFileInfoMaxIdleTime
+     * @return server configuration object.
+     */
+    public ServerConfiguration setFileInfoMaxIdleTime(long idleTime) {
+        setProperty(FILEINFO_MAX_IDLE_TIME, idleTime);
+        return this;
+    }
+
+    /**
      * Max journal file size
      *
      * @return max journal file size
@@ -611,7 +663,7 @@ public class ServerConfiguration extends AbstractConfiguration {
      * {@link #setUseHostNameAsBookieID(boolean)}.
      *
      * @see #getAdvertisedAddress()
-     * @param allow
+     * @param advertisedAddress
      *            whether to allow loopback interfaces
      * @return server configuration
      */
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index d3ef23c..19862a5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -32,6 +32,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.Observable;
+import java.util.Observer;
 
 import static com.google.common.base.Charsets.UTF_8;
 import static org.junit.Assert.*;
@@ -107,7 +109,7 @@ public class IndexPersistenceMgrTest {
     }
 
     private void evictFileInfoTest(boolean createFile) throws Exception {
-        IndexPersistenceMgr indexPersistenceMgr = createIndexPersistenceManager(5);
+        IndexPersistenceMgr indexPersistenceMgr = createIndexPersistenceManager(2);
         try {
             long lid = 99999L;
             byte[] masterKey = "evict-file-info".getBytes(UTF_8);
@@ -128,4 +130,138 @@ public class IndexPersistenceMgrTest {
             indexPersistenceMgr.close();
         }
     }
+
+    final long lid = 1L;
+    final byte[] masterKey = "write".getBytes();
+
+    @Test(timeout = 60000)
+    public void testGetFileInfoReadBeforeWrite() throws Exception {
+        IndexPersistenceMgr indexPersistenceMgr = null;
+        try {
+            indexPersistenceMgr = createIndexPersistenceManager(1);
+            // get the file info for read
+            try {
+                indexPersistenceMgr.getFileInfo(lid, null);
+                fail("Should fail get file info for reading if the file doesn't exist");
+            } catch (Bookie.NoLedgerException nle) {
+                // exepcted
+            }
+            assertEquals(0, indexPersistenceMgr.writeFileInfoCache.size());
+            assertEquals(0, indexPersistenceMgr.readFileInfoCache.size());
+
+            FileInfo writeFileInfo = indexPersistenceMgr.getFileInfo(lid, masterKey);
+            assertEquals(3, writeFileInfo.getUseCount());
+            assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
+            assertEquals(1, indexPersistenceMgr.readFileInfoCache.size());
+            writeFileInfo.release();
+            assertEquals(2, writeFileInfo.getUseCount());
+        } finally {
+            if (null != indexPersistenceMgr) {
+                indexPersistenceMgr.close();
+            }
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testGetFileInfoWriteBeforeRead() throws Exception {
+        IndexPersistenceMgr indexPersistenceMgr = null;
+        try {
+            indexPersistenceMgr = createIndexPersistenceManager(1);
+
+            FileInfo writeFileInfo = indexPersistenceMgr.getFileInfo(lid, masterKey);
+            assertEquals(3, writeFileInfo.getUseCount());
+            assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
+            assertEquals(1, indexPersistenceMgr.readFileInfoCache.size());
+            writeFileInfo.release();
+
+            FileInfo readFileInfo = indexPersistenceMgr.getFileInfo(lid, null);
+            assertEquals(3, readFileInfo.getUseCount());
+            assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
+            assertEquals(1, indexPersistenceMgr.readFileInfoCache.size());
+            readFileInfo.release();
+            assertEquals(2, writeFileInfo.getUseCount());
+            assertEquals(2, readFileInfo.getUseCount());
+        } finally {
+            if (null != indexPersistenceMgr) {
+                indexPersistenceMgr.close();
+            }
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testReadFileInfoCacheEviction() throws Exception {
+        IndexPersistenceMgr indexPersistenceMgr = null;
+        try {
+            indexPersistenceMgr = createIndexPersistenceManager(1);
+            for (int i = 0; i < 3; i++) {
+                FileInfo fileInfo = indexPersistenceMgr.getFileInfo(lid+i, masterKey);
+                // We need to make sure index file is created, otherwise the test case can
be flaky
+                fileInfo.checkOpen(true);
+            }
+
+            indexPersistenceMgr.getFileInfo(lid, masterKey);
+            assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
+            assertEquals(2, indexPersistenceMgr.readFileInfoCache.size());
+
+            // trigger file info eviction on read file info cache
+            for (int i = 1; i <= 2; i++) {
+                indexPersistenceMgr.getFileInfo(lid + i, null);
+            }
+            assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
+            assertEquals(2, indexPersistenceMgr.readFileInfoCache.size());
+
+            FileInfo fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid);
+            assertNotNull(fileInfo);
+            assertEquals(2, fileInfo.getUseCount());
+            fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid+1);
+            assertNull(fileInfo);
+            fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid+2);
+            assertNull(fileInfo);
+            fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid);
+            assertNull(fileInfo);
+            fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid+1);
+            assertNotNull(fileInfo);
+            assertEquals(2, fileInfo.getUseCount());
+            fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid+2);
+            assertNotNull(fileInfo);
+            assertEquals(2, fileInfo.getUseCount());
+        } finally {
+            if (null != indexPersistenceMgr) {
+                indexPersistenceMgr.close();
+            }
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testEvictionShouldNotAffectLongPollRead() throws Exception {
+        IndexPersistenceMgr indexPersistenceMgr = null;
+        Observer observer = (obs, obj) -> {
+            //no-ops
+        };
+        try {
+            indexPersistenceMgr = createIndexPersistenceManager(1);
+            indexPersistenceMgr.getFileInfo(lid, masterKey);
+            indexPersistenceMgr.getFileInfo(lid, null);
+            indexPersistenceMgr.updateLastAddConfirmed(lid, 1);
+            Observable observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid,
1, observer);
+            // observer shouldn't be null because ledger is not evicted or closed
+            assertNotNull("Observer should not be null", observable);
+            // now evict ledger 1 from write cache
+            indexPersistenceMgr.getFileInfo(lid + 1, masterKey);
+            observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
+            // even if ledger 1 is evicted from write cache, observer still shouldn't be
null
+            assertNotNull("Observer should not be null", observable);
+            // now evict ledger 1 from read cache
+            indexPersistenceMgr.getFileInfo(lid + 2, masterKey);
+            indexPersistenceMgr.getFileInfo(lid + 2, null);
+            // even if ledger 1 is evicted from both cache, observer still shouldn't be null
because it
+            // will create a new FileInfo when cache miss
+            observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
+            assertNotNull("Observer should not be null", observable);
+        } finally {
+            if (null != indexPersistenceMgr) {
+                indexPersistenceMgr.close();
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 2a7e5c3..3f5d565 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -283,7 +283,8 @@ public class LedgerCacheTest {
 
         // Simulate the flush failure
         FileInfo newFileInfo = new FileInfo(fileInfo.getLf(), fileInfo.getMasterKey());
-        ledgerCache.getIndexPersistenceManager().fileInfoCache.put(Long.valueOf(1), newFileInfo);
+        ledgerCache.getIndexPersistenceManager().writeFileInfoCache.put(Long.valueOf(1),
newFileInfo);
+        ledgerCache.getIndexPersistenceManager().readFileInfoCache.put(Long.valueOf(1), newFileInfo);
         // Add entries
         ledgerStorage.addEntry(generateEntry(1, 1));
         ledgerStorage.addEntry(generateEntry(1, 2));

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message