ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/50] ignite git commit: GG-11044 - Implement page evictions from PageMemory for the case when data size is bigger than allocated memory.
Date Thu, 28 Apr 2016 08:52:38 GMT
GG-11044 - Implement page evictions from PageMemory for the case when data size is bigger than
allocated memory.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d2fedf6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d2fedf6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d2fedf6b

Branch: refs/heads/ignite-db-x-10884
Commit: d2fedf6bb95bdd5f30a800959f9c9ff93d200bc7
Parents: 4282d80
Author: dkarachentsev <dkarachentsev@gridgain.com>
Authored: Thu Apr 21 19:15:04 2016 +0300
Committer: dkarachentsev <dkarachentsev@gridgain.com>
Committed: Thu Apr 21 19:15:04 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/pagemem/PageMemory.java     |   3 +-
 .../internal/pagemem/impl/FullPageIdTable.java  |  90 +++++++-
 .../ignite/internal/pagemem/impl/PageImpl.java  |   9 +
 .../internal/pagemem/impl/PageMemoryImpl.java   | 206 +++++++++++++++++--
 4 files changed, 287 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d2fedf6b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
index 0f10002..cf5ff69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
@@ -73,7 +73,8 @@ public interface PageMemory extends LifecycleAware, PageIdAllocator {
      * @param pageId Page ID to get byte buffer for. The page ID must be present in the collection
returned by
      *      the {@link #beginCheckpoint()} method call.
      * @param tmpBuf Temporary buffer to write changes into.
+     * @return {@code True} if data were read, {@code false} otherwise (data already saved
to storage).
      * @throws IgniteException If failed to obtain page data.
      */
-    public void getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf);
+    public boolean getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2fedf6b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java
index e05d64c..c6529f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java
@@ -78,6 +78,24 @@ public class FullPageIdTable {
     /** */
     protected DirectMemoryUtils mem;
 
+    /** Addressing strategy. */
+    private final AddressingStrategy strategy;
+
+    /** Specifies types of addressing. */
+    public enum AddressingStrategy {
+        /**
+         * Insertion will search for each available cell.
+         * Slower, but more suitable when used many removes/insertions.
+         */
+        LINEAR,
+
+        /**
+         * Insertion will search for available cell with limited steps.
+         * Faster, but requires more memory to resolve collisions.
+         */
+        QUADRATIC
+    }
+
     /**
      * @return Estimated memory size required for this map to store the given number of elements.
      */
@@ -93,11 +111,20 @@ public class FullPageIdTable {
      * @param len Allocated memory length.
      * @param clear If {@code true}, then memory is considered dirty and will be cleared.
Otherwise,
      *      map will assume that the given memory region is in valid state.
+     * @param stgy Addressing strategy {@link AddressingStrategy}.
      */
-    public FullPageIdTable(DirectMemoryUtils mem, long addr, long len, boolean clear) {
+    public FullPageIdTable(DirectMemoryUtils mem, long addr, long len, boolean clear, AddressingStrategy
stgy) {
         valPtr = addr;
+        this.strategy = stgy;
         capacity = (int)((len - 4) / BYTES_PER_ENTRY);
-        maxSteps = (int)Math.sqrt(capacity);
+
+        if (stgy == AddressingStrategy.LINEAR)
+            maxSteps = capacity;
+        else if (stgy == AddressingStrategy.QUADRATIC)
+            maxSteps = (int) Math.sqrt(capacity);
+        else
+            throw new IllegalArgumentException("Unsupported addressing strategy: " + stgy);
+
         this.mem = mem;
 
         if (clear)
@@ -167,6 +194,24 @@ public class FullPageIdTable {
     }
 
     /**
+     * Find nearest value from specified position to the right.
+     *
+     * @param idx Index to start searching from.
+     * @param absent Default value that will be returned if no values present.
+     * @return Closest value to the index or {@code absent} if no values found.
+     */
+    public long getNearestAt(final int idx, final long absent) {
+        for (int i = idx; i < capacity + idx; i++) {
+            final int idx2 = i >= capacity ? i - capacity : i;
+
+            if (isValuePresentAt(idx2))
+                return valueAt(idx2);
+        }
+
+        return absent;
+    }
+
+    /**
      * @param key Key.
      * @return Key index.
      */
@@ -178,7 +223,7 @@ public class FullPageIdTable {
         do {
             int res = testKeyAt(index, key);
 
-            if (res == EMPTY) {
+            if (res == EMPTY || res == REMOVED) {
                 setKeyAt(index, key);
 
                 incrementSize();
@@ -188,9 +233,14 @@ public class FullPageIdTable {
             else if (res == EQUAL)
                 return index;
             else
-                assert res == REMOVED || res == NOT_EQUAL;
+                assert res == NOT_EQUAL;
+
+            if (strategy == AddressingStrategy.QUADRATIC)
+                index += step;
+            else if (strategy == AddressingStrategy.LINEAR)
+                index++;
 
-            if ((index += step) >= capacity)
+            if (index >= capacity)
                 index -= capacity;
         }
         while (++step <= maxSteps);
@@ -217,7 +267,12 @@ public class FullPageIdTable {
             else
                 assert res == REMOVED || res == NOT_EQUAL;
 
-            if ((index += step) >= capacity)
+            if (strategy == AddressingStrategy.QUADRATIC)
+                index += step;
+            else if (strategy == AddressingStrategy.LINEAR)
+                index++;
+
+            if (index >= capacity)
                 index -= capacity;
         } while (++step <= maxSteps);
 
@@ -248,7 +303,12 @@ public class FullPageIdTable {
             else
                 assert res == REMOVED || res == NOT_EQUAL;
 
-            if ((index += step) >= capacity)
+            if (strategy == AddressingStrategy.QUADRATIC)
+                index += step;
+            else if (strategy == AddressingStrategy.LINEAR)
+                index++;
+
+            if (index >= capacity)
                 index -= capacity;
         }
         while (++step <= maxSteps);
@@ -278,6 +338,20 @@ public class FullPageIdTable {
     }
 
     /**
+     * @param idx Index to test.
+     * @return {@code True} if value set for index.
+     */
+    private boolean isValuePresentAt(final int idx) {
+        long base = valPtr + 4 + (long)idx * BYTES_PER_ENTRY;
+
+        long pageId = mem.readLong(base);
+        int cacheId = mem.readInt(base + 8);
+
+        return !((pageId == REMOVED_PAGE_ID && cacheId == REMOVED_CACHE_ID)
+            || (pageId == EMPTY_PAGE_ID && cacheId == EMPTY_CACHE_ID));
+    }
+
+    /**
      * @param fullId Full page ID to check.
      * @return {@code True} if checks succeeded.
      */
@@ -333,6 +407,6 @@ public class FullPageIdTable {
      *
      */
     private void decrementSize() {
-        mem.writeInt(valPtr, mem.readInt(valPtr) + 1);
+        mem.writeInt(valPtr, mem.readInt(valPtr) - 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2fedf6b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java
index 77f096d..6e33572 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java
@@ -141,6 +141,8 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page {
     @Override public ByteBuffer getForRead() {
         acquireShared(1);
 
+        pageMem.atomicWriteCurrentTimestamp(ptr);
+
         return reset(buf.asReadOnlyBuffer());
     }
 
@@ -156,6 +158,8 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page {
 
         markDirty();
 
+        pageMem.atomicWriteCurrentTimestamp(ptr);
+
         return reset(buf);
     }
 
@@ -167,6 +171,8 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page {
             acquire(1);
 
             setExclusiveOwnerThread(th);
+
+            pageMem.atomicWriteCurrentTimestamp(ptr);
         }
 
         return reset(buf);
@@ -211,6 +217,7 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page {
         return pageMem.isDirty(ptr);
     }
 
+    /** {@inheritDoc} */
     @Override public String toString() {
         SB sb = new SB("PageImpl [handle=");
 
@@ -230,6 +237,8 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page {
      */
     void acquireReference() {
         refCntUpd.incrementAndGet(this);
+
+        pageMem.atomicWriteCurrentTimestamp(ptr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2fedf6b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java
index b8d58f6..62a4e72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.ignite.IgniteCheckedException;
@@ -79,8 +80,20 @@ public class  PageMemoryImpl implements PageMemory {
     /** Page ID offset  */
     public static final int PAGE_ID_OFFSET = 16;
 
-    /** Need a 8-byte pointer for linked list and 8 bytes for internal needs. */
-    public static final int PAGE_OVERHEAD = 24;
+    /** Page cache ID offset. */
+    public static final int PAGE_CACHE_ID_OFFSET = 24;
+
+    /** Page access timestamp */
+    public static final int PAGE_TIMESTAMP_OFFSET = 28;
+
+    /**
+     * Need a 8-byte pointer for linked list, 8 bytes for internal needs (flags),
+     * 4 bytes cache ID, 8 bytes timestamp.
+     */
+    public static final int PAGE_OVERHEAD = 36;
+
+    /** Number of random pages that will be picked for eviction. */
+    public static final int RANDOM_PAGES_EVICT_NUM = 5;
 
     /** Page size. */
     private int sysPageSize;
@@ -210,9 +223,14 @@ public class  PageMemoryImpl implements PageMemory {
 
             relPtr = allocateFreePage();
 
+            if (relPtr == INVALID_REL_PTR)
+                throw new OutOfMemoryException();
+
             absPtr = absolute(relPtr);
 
             writePageId(absPtr, pageId);
+            writePageCacheId(absPtr, cacheId);
+            atomicWriteCurrentTimestamp(absPtr);
         }
 
         // TODO pass an argument to decide whether the page should be cleaned.
@@ -309,8 +327,14 @@ public class  PageMemoryImpl implements PageMemory {
 
                 relPtr = borrowOrAllocateFreePage();
 
+                if (relPtr == INVALID_REL_PTR)
+                    relPtr = evictPage(seg);
+
                 long absPtr = absolute(relPtr);
 
+                writeFullPageId(absPtr, fullId);
+                atomicWriteCurrentTimestamp(absPtr);
+
                 // We can clear dirty flag after the page has been allocated.
                 setDirty(fullId, absPtr, false);
 
@@ -373,7 +397,7 @@ public class  PageMemoryImpl implements PageMemory {
     }
 
     /** {@inheritDoc} */
-    @Override public void getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf) {
+    @Override public boolean getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf) {
         assert tmpBuf.remaining() == pageSize();
 
         Segment seg = segment(pageId);
@@ -386,15 +410,16 @@ public class  PageMemoryImpl implements PageMemory {
             page = seg.acquiredPages.get(pageId);
 
             if (page != null) {
-                assert page.isDirty() : "Page is acquired for a checkpoint, but is not dirty:
" + page;
+                if (!page.isDirty())
+                    return false;
 
                 page.acquireReference();
             }
             else {
                 long relPtr = seg.loadedPages.get(pageId, INVALID_REL_PTR);
 
-                assert relPtr != INVALID_REL_PTR : "Failed to get page checkpoint data (page
has been evicted) " +
-                    "[pageId=" + pageId + ']';
+                if (relPtr == INVALID_REL_PTR)
+                    return false;
 
                 long absPtr = absolute(relPtr);
 
@@ -404,7 +429,7 @@ public class  PageMemoryImpl implements PageMemory {
 
                 setDirty(pageId, absPtr, false);
 
-                return;
+                return true;
             }
         }
         finally {
@@ -428,6 +453,8 @@ public class  PageMemoryImpl implements PageMemory {
         finally {
             releasePage(page);
         }
+
+        return true;
     }
 
     /**
@@ -529,6 +556,47 @@ public class  PageMemoryImpl implements PageMemory {
     }
 
     /**
+     * Reads cache ID from the page at the given absolute pointer.
+     *
+     * @param absPtr Absolute memory pointer to the page header.
+     * @return Cache ID written to the page.
+     */
+    int readPageCacheId(final long absPtr) {
+        return mem.readInt(absPtr + PAGE_CACHE_ID_OFFSET);
+    }
+
+    /**
+     * Writes cache ID from the page at the given absolute pointer.
+     *
+     * @param absPtr Absolute memory pointer to the page header.
+     * @param cacheId Cache ID to write.
+     */
+    void writePageCacheId(final long absPtr, final int cacheId) {
+        mem.writeInt(absPtr + PAGE_CACHE_ID_OFFSET, cacheId);
+    }
+
+    /**
+     * Reads page ID and cache ID from the page at the given absolute pointer.
+     *
+     * @param absPtr Absolute memory pointer to the page header.
+     * @return Full page ID written to the page.
+     */
+    FullPageId readFullPageId(final long absPtr) {
+        return new FullPageId(readPageId(absPtr), readPageCacheId(absPtr));
+    }
+
+    /**
+     * Writes page ID and cache ID from the page at the given absolute pointer.
+     *
+     * @param absPtr Absolute memory pointer to the page header.
+     * @param fullPageId Full page ID to write.
+     */
+    void writeFullPageId(final long absPtr, final FullPageId fullPageId) {
+        writePageId(absPtr, fullPageId.pageId());
+        writePageCacheId(absPtr, fullPageId.cacheId());
+    }
+
+    /**
      * @param absPtr Absolute pointer.
      * @return {@code True} if page is dirty.
      */
@@ -576,6 +644,23 @@ public class  PageMemoryImpl implements PageMemory {
             dirtyPages.remove(pageId);
     }
 
+    void atomicWriteCurrentTimestamp(final long absPtr) {
+        while (true) {
+            final long readTs = readTimestamp(absPtr);
+
+            if (mem.compareAndSwapLong(absPtr + PAGE_TIMESTAMP_OFFSET, readTs, U.currentTimeMillis()))
+                break;
+        }
+    }
+
+    void writeTimestamp(final long absPtr, final long ts) {
+        mem.writeLong(absPtr + PAGE_TIMESTAMP_OFFSET, ts);
+    }
+
+    long readTimestamp(final long absPtr) {
+        return mem.readLong(absPtr + PAGE_TIMESTAMP_OFFSET);
+    }
+
     /**
      * Attempts to restore page memory state based on the memory chunks returned by the allocator.
      */
@@ -749,14 +834,14 @@ public class  PageMemoryImpl implements PageMemory {
     /**
      * Requests next memory chunk from the system allocator.
      */
-    private void requestNextChunk() {
+    private boolean requestNextChunk() {
         assert Thread.holdsLock(this);
 
         int curIdx = currentChunk.idx;
 
         // If current chunk is the last one, fail.
         if (curIdx == chunks.size() - 1)
-            throw new OutOfMemoryException();
+            return false;
 
         Chunk chunk = chunks.get(curIdx + 1);
 
@@ -765,6 +850,8 @@ public class  PageMemoryImpl implements PageMemory {
                 ", base=0x" + U.hexLong(chunk.fr.address()) + ", len=" + chunk.size() + ']');
 
         currentChunk = chunk;
+
+        return true;
     }
 
     /**
@@ -834,8 +921,8 @@ public class  PageMemoryImpl implements PageMemory {
                 synchronized (this) {
                     Chunk full = currentChunk;
 
-                    if (chunk == full)
-                        requestNextChunk();
+                    if (chunk == full && !requestNextChunk())
+                        return INVALID_REL_PTR;
                 }
             }
             else
@@ -844,6 +931,98 @@ public class  PageMemoryImpl implements PageMemory {
     }
 
     /**
+     * Evict random oldest page from memory to storage.
+     *
+     * @param seg Currently locked segment.
+     * @return Relative addres for evicted page.
+     * @throws IgniteCheckedException
+     */
+    private long evictPage(final Segment seg) throws IgniteCheckedException {
+        final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        final long[] pageRelAddrs = new long[RANDOM_PAGES_EVICT_NUM];
+
+        final int cap = seg.loadedPages.capacity();
+
+        assert seg.loadedPages.size() >= RANDOM_PAGES_EVICT_NUM;
+
+        if (seg.acquiredPages.size() >= seg.loadedPages.size())
+            throw new OutOfMemoryException("No not acquired pages left for segment. Unable
to evict.");
+
+        while (true) {
+            for (int i = 0; i < RANDOM_PAGES_EVICT_NUM; i++) {
+                // We need to lookup for pages only in current segment for thread safety,
+                // so peeking random memory will lead to checking for found page segment.
+                // It's much faster to check available pages for segment right away.
+                final long addr = seg.loadedPages.getNearestAt(rnd.nextInt(cap), INVALID_REL_PTR);
+
+                assert addr != INVALID_REL_PTR;
+
+                pageRelAddrs[i] = addr;
+            }
+
+            final long relEvictAddr = findSuitablePageRelAddr(pageRelAddrs);
+
+            assert relEvictAddr != INVALID_REL_PTR;
+
+            final long absEvictAddr = absolute(relEvictAddr);
+
+            assert absEvictAddr != dbMetaPageIdPtr;
+
+            final FullPageId fullPageId = readFullPageId(absEvictAddr);
+
+            assert seg.writeLock().isHeldByCurrentThread();
+
+            if (!seg.acquiredPages.containsKey(fullPageId))
+                seg.loadedPages.remove(fullPageId);
+            else
+                continue;
+
+            // Force flush data and free page.
+            if (isDirty(absEvictAddr)) {
+                storeMgr.write(fullPageId.cacheId(), fullPageId.pageId(), wrapPointer(absEvictAddr
+ PAGE_OVERHEAD, pageSize()));
+
+                setDirty(fullPageId, absEvictAddr, false);
+            }
+
+            return relEvictAddr;
+        }
+    }
+
+    /**
+     * Find oldest and preferable not dirty page from passed ones.
+     *
+     * @param relAddrs Addresses to find from.
+     * @return The oldest and may be dirty page relative address.
+     */
+    private long findSuitablePageRelAddr(final long[] relAddrs) {
+        long addr = INVALID_REL_PTR;
+        long ts = Long.MAX_VALUE;
+        long dirtyTs = Long.MAX_VALUE;
+        long dirtyAddr = INVALID_REL_PTR;
+
+        for (final long relAddr : relAddrs) {
+            final long absPageAddr = absolute(relAddr);
+
+            final long pageTs = readTimestamp(absPageAddr);
+
+            final boolean dirty = isDirty(absPageAddr);
+
+            if (pageTs < ts && !dirty) {
+                addr = relAddr;
+
+                ts = pageTs;
+            } else if (pageTs < dirtyTs && dirty) {
+                dirtyAddr = relAddr;
+
+                dirtyTs = pageTs;
+            }
+        }
+
+        return addr == INVALID_REL_PTR ? dirtyAddr : addr;
+    }
+
+    /**
      * @param relPtr Relative pointer to free.
      */
     private void releaseFreePage(long relPtr) {
@@ -877,7 +1056,10 @@ public class  PageMemoryImpl implements PageMemory {
          * @param len Length of the allocated memory.
          */
         private Segment(long ptr, long len, boolean clear) {
-            loadedPages = new FullPageIdTable(mem, ptr, len, clear);
+            loadedPages = new FullPageIdTable(mem, ptr, len, clear,
+                storeMgr == null // if null evictions won't be used
+                    ? FullPageIdTable.AddressingStrategy.QUADRATIC
+                    : FullPageIdTable.AddressingStrategy.LINEAR);
 
             acquiredPages = new HashMap<>(16, 0.75f);
         }


Mime
View raw message