bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [bookkeeper] branch master updated: ISSUE #228: BookKeeper Server: Index Page Management Memory Growth
Date Mon, 17 Jul 2017 20:53:46 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 387dc83  ISSUE #228: BookKeeper Server: Index Page Management Memory Growth
387dc83 is described below

commit 387dc83d6d2db6a1d30bac3216a92f875083d144
Author: Sijie Guo <sijie@apache.org>
AuthorDate: Mon Jul 17 13:53:38 2017 -0700

    ISSUE #228: BookKeeper Server: Index Page Management Memory Growth
    
    Descriptions of the changes in this PR:
    
    ** Improvements **
    
    - Never delete a LedgerEntryPage that has been allocated.
    - Track free LedgerEntryPage in a separate list
    - Use pages from the free pages list as though they were being freshly allocated
    - This guarantees that the direct buffer allocation never exceeds the allocated space
for index pages
    
    Author: Sijie Guo <sijie@apache.org>
    
    Reviewers: Enrico Olivelli <eolivelli@gmail.com>
    
    This closes #229 from sijie/lru_related_changes, closes #228
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |  10 ++
 .../bookkeeper/bookie/IndexInMemPageMgr.java       | 186 +++++++++++++++------
 .../bookkeeper/bookie/IndexPersistenceMgr.java     |  12 +-
 .../apache/bookkeeper/bookie/LedgerEntryPage.java  |  27 ++-
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |   2 +-
 5 files changed, 180 insertions(+), 57 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index dc16b52..e0690fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -86,6 +86,9 @@ public interface BookKeeperServerStats {
     // Ledger Storage Stats
     String STORAGE_GET_OFFSET = "STORAGE_GET_OFFSET";
     String STORAGE_GET_ENTRY = "STORAGE_GET_ENTRY";
+    /** Ledger Cache Stats **/
+    String LEDGER_CACHE_READ_PAGE = "LEDGER_CACHE_READ_PAGE";
+    /** SkipList Stats **/
     String SKIP_LIST_GET_ENTRY = "SKIP_LIST_GET_ENTRY";
     String SKIP_LIST_PUT_ENTRY = "SKIP_LIST_PUT_ENTRY";
     String SKIP_LIST_SNAPSHOT = "SKIP_LIST_SNAPSHOT";
@@ -95,8 +98,15 @@ public interface BookKeeperServerStats {
     String JOURNAL_QUEUE_SIZE = "JOURNAL_QUEUE_SIZE";
     String READ_BYTES = "READ_BYTES";
     String WRITE_BYTES = "WRITE_BYTES";
+    /** Ledger Cache Counters **/
+    String LEDGER_CACHE_HIT = "LEDGER_CACHE_HIT";
+    String LEDGER_CACHE_MISS = "LEDGER_CACHE_MISS";
+    /** Compaction/Garbage Collection Related Counters **/
     String NUM_MINOR_COMP = "NUM_MINOR_COMP";
     String NUM_MAJOR_COMP = "NUM_MAJOR_COMP";
+    /** Index Related Counters **/
+    String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET";
+    String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE";
     String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
     String JOURNAL_NUM_FORCE_WRITES = "JOURNAL_NUM_FORCE_WRITES";
     String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
index b9b774f..361bc6e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
@@ -20,8 +20,12 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DirectMemoryUtils;
 import org.slf4j.Logger;
@@ -42,6 +46,11 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.INDEX_INMEM_ILLEGAL_STATE_DELETE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.INDEX_INMEM_ILLEGAL_STATE_RESET;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_HIT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_MISS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_READ_PAGE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_INDEX_PAGES;
 
 class IndexInMemPageMgr {
@@ -51,14 +60,20 @@ class IndexInMemPageMgr {
 
     private static class InMemPageCollection implements LEPStateChangeCallback {
 
-        ConcurrentMap<Long, ConcurrentMap<Long,LedgerEntryPage>> pages;
+        final ConcurrentMap<Long, ConcurrentMap<Long,LedgerEntryPage>> pages;
+        final Map<EntryKey, LedgerEntryPage> lruCleanPageMap;
+        final ConcurrentLinkedQueue<LedgerEntryPage> listOfFreePages;
 
-        Map<EntryKey, LedgerEntryPage> lruCleanPageMap;
+        // Stats
+        final Counter illegalStateResetCounter;
+        final Counter illegalStateDeleteCounter;
 
-        public InMemPageCollection() {
+        public InMemPageCollection(StatsLogger statsLogger) {
             pages = new ConcurrentHashMap<Long, ConcurrentMap<Long,LedgerEntryPage>>();
-            lruCleanPageMap =
-                Collections.synchronizedMap(new LinkedHashMap<EntryKey, LedgerEntryPage>(16,
0.75f, true));
+            lruCleanPageMap = Collections.synchronizedMap(new LinkedHashMap<EntryKey,
LedgerEntryPage>(16, 0.75f, true));
+            listOfFreePages = new ConcurrentLinkedQueue<LedgerEntryPage>();
+            illegalStateResetCounter = statsLogger.getCounter(INDEX_INMEM_ILLEGAL_STATE_RESET);
+            illegalStateDeleteCounter = statsLogger.getCounter(INDEX_INMEM_ILLEGAL_STATE_DELETE);
         }
 
         /**
@@ -140,18 +155,28 @@ class IndexInMemPageMgr {
          *          Ledger id
          * @returns number of pages removed
          */
-        private int removeEntriesForALedger(long ledgerId) {
+        private void removeEntriesForALedger(long ledgerId) {
             // remove pages first to avoid page flushed when deleting file info
             ConcurrentMap<Long, LedgerEntryPage> lPages = pages.remove(ledgerId);
             if (null != lPages) {
-                for (long entryId: lPages.keySet()) {
+                for (Map.Entry<Long, LedgerEntryPage> pageEntry : lPages.entrySet())
{
+                    long entryId = pageEntry.getKey();
                     synchronized(lruCleanPageMap) {
                         lruCleanPageMap.remove(new EntryKey(ledgerId, entryId));
                     }
+
+                    LedgerEntryPage lep = pageEntry.getValue();
+                    // Cannot imagine under what circumstances we would have a null entry
here
+                    // Just being safe
+                    if (null != lep) {
+                        if (lep.inUse()) {
+                            illegalStateDeleteCounter.inc();
+                        }
+                        listOfFreePages.add(lep);
+                    }
                 }
-                return lPages.size();
+
             }
-            return 0;
         }
 
         /**
@@ -232,7 +257,13 @@ class IndexInMemPageMgr {
          * @returns LedgerEntryPage if present
          */
         LedgerEntryPage grabCleanPage(long ledgerId, long firstEntry) {
-            LedgerEntryPage lep = null;
+            LedgerEntryPage lep = listOfFreePages.poll();
+            if (null != lep) {
+                lep.resetPage();
+                lep.setLedgerAndFirstEntry(ledgerId, firstEntry);
+                lep.usePage();
+                return lep;
+            }
             while (lruCleanPageMap.size() > 0) {
                 lep = null;
                 synchronized(lruCleanPageMap) {
@@ -286,6 +317,15 @@ class IndexInMemPageMgr {
             return lep;
         }
 
+        public void addToListOfFreePages(LedgerEntryPage lep) {
+            if ((null == lep) || lep.inUse()) {
+                illegalStateResetCounter.inc();
+            }
+            if (null != lep) {
+                listOfFreePages.add(lep);
+            }
+        }
+
         @Override
         public void onSetInUse(LedgerEntryPage lep) {
             removeFromCleanPageList(lep);
@@ -325,6 +365,11 @@ class IndexInMemPageMgr {
     private final ConcurrentLinkedQueue<Long> ledgersToFlush = new ConcurrentLinkedQueue<Long>();
     private final ConcurrentSkipListSet<Long> ledgersFlushing = new ConcurrentSkipListSet<Long>();
 
+    // Stats
+    private final Counter ledgerCacheHitCounter;
+    private final Counter ledgerCacheMissCounter;
+    private final OpStatsLogger ledgerCacheReadPageStats;
+
     public IndexInMemPageMgr(int pageSize,
                              int entriesPerPage,
                              ServerConfiguration conf,
@@ -333,7 +378,7 @@ class IndexInMemPageMgr {
         this.pageSize = pageSize;
         this.entriesPerPage = entriesPerPage;
         this.indexPersistenceManager = indexPersistenceManager;
-        this.pageMapAndList = new InMemPageCollection();
+        this.pageMapAndList = new InMemPageCollection(statsLogger);
 
         long maxDirectMemory = DirectMemoryUtils.maxDirectMemory();
 
@@ -346,17 +391,23 @@ class IndexInMemPageMgr {
         LOG.info("maxDirectMemory = {}, pageSize = {}, pageLimit = {}",
                 new Object[] { maxDirectMemory, pageSize, pageLimit });
         // Expose Stats
-        statsLogger.registerGauge(NUM_INDEX_PAGES, new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return getNumUsedPages();
-            }
-        });
+        this.ledgerCacheHitCounter = statsLogger.getCounter(LEDGER_CACHE_HIT);
+        this.ledgerCacheMissCounter = statsLogger.getCounter(LEDGER_CACHE_MISS);
+        this.ledgerCacheReadPageStats = statsLogger.getOpStatsLogger(LEDGER_CACHE_READ_PAGE);
+        // Export sampled stats for index pages, ledgers.
+        statsLogger.registerGauge(
+                NUM_INDEX_PAGES,
+                new Gauge<Integer>() {
+                    @Override
+                    public Integer getDefaultValue() {
+                        return 0;
+                    }
+                    @Override
+                    public Integer getSample() {
+                        return getNumUsedPages();
+                    }
+                }
+        );
     }
 
     /**
@@ -387,7 +438,31 @@ class IndexInMemPageMgr {
         return pageCount.get();
     }
 
-    LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
+        /**
+     * Get the ledger entry page for a given <i>pageEntry</i>.
+     *
+     * @param ledger
+     *          ledger id
+     * @param pageEntry
+     *          first entry id of a given page
+     * @return ledger entry page
+     * @throws IOException
+     */
+    public LedgerEntryPage getLedgerEntryPage(long ledger,
+                                              long pageEntry) throws IOException {
+        LedgerEntryPage lep = getLedgerEntryPageFromCache(ledger, pageEntry, false);
+        if (lep == null) {
+            ledgerCacheMissCounter.inc();
+            lep = grabLedgerEntryPage(ledger, pageEntry);
+        } else {
+            ledgerCacheHitCounter.inc();
+        }
+        return lep;
+    }
+
+    LedgerEntryPage getLedgerEntryPageFromCache(long ledger,
+                                                       long firstEntry,
+                                                       boolean onlyDirty) {
         LedgerEntryPage lep = pageMapAndList.getPage(ledger, firstEntry);
         if (onlyDirty && null != lep && lep.isClean()) {
             return null;
@@ -415,33 +490,37 @@ class IndexInMemPageMgr {
             // should get the up to date page from the persistence manager
             // before we put it into table otherwise we would put
             // an empty page in it
-            indexPersistenceManager.updatePage(lep);
-            LedgerEntryPage oldLep;
-            if (lep != (oldLep = pageMapAndList.putPage(lep))) {
-                lep.releasePage();
-                // Decrement the page count because we couldn't put this lep in the page
cache.
-                pageCount.decrementAndGet();
-                // Increment the use count of the old lep because this is unexpected
-                oldLep.usePage();
-                lep = oldLep;
+            Stopwatch readPageStopwatch = Stopwatch.createStarted();
+            boolean isNewPage = indexPersistenceManager.updatePage(lep);
+            if (!isNewPage) {
+                ledgerCacheReadPageStats.registerSuccessfulEvent(
+                        readPageStopwatch.elapsed(TimeUnit.MICROSECONDS),
+                        TimeUnit.MICROSECONDS);
             }
         } catch (IOException ie) {
             // if we grab a clean page, but failed to update the page
-            // we are exhausting the count of ledger entry pages.
-            // since this page will be never used, so we need to decrement
-            // page count of ledger cache.
-            lep.releasePage();
-            pageCount.decrementAndGet();
+            // we should put this page in the free page list so that it
+            // can be reassigned to the next grabPage request
+            lep.releasePageNoCallback();
+            pageMapAndList.addToListOfFreePages(lep);
             throw ie;
         }
+        LedgerEntryPage oldLep;
+        if (lep != (oldLep = pageMapAndList.putPage(lep))) {
+            // if we grab a clean page, but failed to put it in the cache
+            // we should put this page in the free page list so that it
+            // can be reassigned to the next grabPage request
+            lep.releasePageNoCallback();
+            pageMapAndList.addToListOfFreePages(lep);
+            // Increment the use count of the old lep because this is unexpected
+            oldLep.usePage();
+            lep = oldLep;
+        }
         return lep;
     }
 
     void removePagesForLedger(long ledgerId) {
-        int removedPageCount = pageMapAndList.removeEntriesForALedger(ledgerId);
-        if (pageCount.addAndGet(-removedPageCount) < 0) {
-            throw new RuntimeException("Page count of ledger cache has been decremented to
be less than zero.");
-        }
+        pageMapAndList.removeEntriesForALedger(ledgerId);
         ledgersToFlush.remove(ledgerId);
     }
 
@@ -524,7 +603,7 @@ class IndexInMemPageMgr {
         List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
         try {
             for(Long firstEntry: firstEntryList) {
-                LedgerEntryPage lep = getLedgerEntryPage(ledger, firstEntry, true);
+                LedgerEntryPage lep = getLedgerEntryPageFromCache(ledger, firstEntry, true);
                 if (lep != null) {
                     entries.add(lep);
                 }
@@ -542,13 +621,16 @@ class IndexInMemPageMgr {
         // find the id of the first entry of the page that has the entry
         // we are looking for
         long pageEntry = entry - offsetInPage;
-        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
-        if (lep == null) {
-            lep = grabLedgerEntryPage(ledger, pageEntry);
+        LedgerEntryPage lep = null;
+        try {
+            lep = getLedgerEntryPage(ledger, pageEntry);
+            assert lep != null;
+            lep.setOffset(offset, offsetInPage * LedgerEntryPage.getIndexEntrySize());
+        } finally {
+            if (null != lep) {
+                lep.releasePage();
+            }
         }
-        assert lep != null;
-        lep.setOffset(offset, offsetInPage * LedgerEntryPage.getIndexEntrySize());
-        lep.releasePage();
     }
 
     long getEntryOffset(long ledger, long entry) throws IOException {
@@ -556,12 +638,10 @@ class IndexInMemPageMgr {
         // find the id of the first entry of the page that has the entry
         // we are looking for
         long pageEntry = entry - offsetInPage;
-        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+        LedgerEntryPage lep = null;
         try {
-            if (lep == null) {
-                lep = grabLedgerEntryPage(ledger, pageEntry);
-            }
-            return lep.getOffset(offsetInPage * LedgerEntryPage.getIndexEntrySize());
+            lep = getLedgerEntryPage(ledger, pageEntry);
+            return lep.getOffset(offsetInPage  * LedgerEntryPage.getIndexEntrySize());
         } finally {
             if (lep != null) {
                 lep.releasePage();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 9a34101..6332996 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -629,7 +629,15 @@ public class IndexPersistenceMgr {
         }
     }
 
-    void updatePage(LedgerEntryPage lep) throws IOException {
+    /**
+     * Update the ledger entry page
+     *
+     * @param lep
+     *          ledger entry page
+     * @return true if it is a new page, otherwise false.
+     * @throws IOException
+     */
+    boolean updatePage(LedgerEntryPage lep) throws IOException {
         if (!lep.isClean()) {
             throw new IOException("Trying to update a dirty page");
         }
@@ -639,8 +647,10 @@ public class IndexPersistenceMgr {
             long pos = lep.getFirstEntryPosition();
             if (pos >= fi.size()) {
                 lep.zeroPage();
+                return true;
             } else {
                 lep.readPage(fi);
+                return false;
             }
         } finally {
             if (fi != null) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index 5aee2fe..ea96dff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -43,7 +43,7 @@ public class LedgerEntryPage {
     volatile private EntryKey entryKey = new EntryKey(-1, BookieProtocol.INVALID_ENTRY_ID);
     private final ByteBuffer page;
     volatile private boolean clean = true;
-    private final AtomicInteger useCount = new AtomicInteger();
+    private final AtomicInteger useCount = new AtomicInteger(0);
     private final AtomicInteger version = new AtomicInteger(0);
     volatile private int last = -1; // Last update position
     private final LEPStateChangeCallback callback;
@@ -66,6 +66,21 @@ public class LedgerEntryPage {
         }
     }
 
+    // Except for not allocating a new direct byte buffer; this should do everything that
+    // the constructor does
+    public void resetPage() {
+        page.clear();
+        ZeroBuffer.put(page);
+        last = -1;
+        entryKey = new EntryKey(-1, BookieProtocol.INVALID_ENTRY_ID);
+        clean = true;
+        useCount.set(0);
+        if (null != this.callback) {
+            callback.onResetInUse(this);
+        }
+    }
+
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
@@ -84,12 +99,20 @@ public class LedgerEntryPage {
         }
     }
 
+    public void releasePageNoCallback() {
+        releasePageInternal(false);
+    }
+
     public void releasePage() {
+        releasePageInternal(true);
+    }
+
+    private void releasePageInternal(boolean shouldCallback) {
         int newUseCount = useCount.decrementAndGet();
         if (newUseCount < 0) {
             throw new IllegalStateException("Use count has gone below 0");
         }
-        if ((null != callback) && (newUseCount == 0)) {
+        if (shouldCallback && (null != callback) && (newUseCount == 0)) {
             callback.onResetInUse(this);
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 41ab89c..8f4aacd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -374,7 +374,7 @@ public class LedgerCacheTest {
     public void testSyncThreadNPE() throws IOException {
         newLedgerCache();
         try {
-            ((LedgerCacheImpl) ledgerCache).getIndexPageManager().getLedgerEntryPage(0L,
0L, true);
+            ((LedgerCacheImpl) ledgerCache).getIndexPageManager().getLedgerEntryPageFromCache(0L,
0L, true);
         } catch (Exception e) {
             LOG.error("Exception when trying to get a ledger entry page", e);
             fail("Shouldn't have thrown an exception");

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message