Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6955B200C06 for ; Fri, 27 Jan 2017 16:10:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 67D98160B5B; Fri, 27 Jan 2017 15:10:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8B3B2160B47 for ; Fri, 27 Jan 2017 16:10:42 +0100 (CET) Received: (qmail 75632 invoked by uid 500); 27 Jan 2017 15:10:41 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 75623 invoked by uid 99); 27 Jan 2017 15:10:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jan 2017 15:10:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8FD1FDFC15; Fri, 27 Jan 2017 15:10:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-3477 lock free freelist Date: Fri, 27 Jan 2017 15:10:41 +0000 (UTC) archived-at: Fri, 27 Jan 2017 15:10:44 -0000 Repository: ignite Updated Branches: refs/heads/ignite-3477-freelist [created] 260aee1c8 ignite-3477 lock free freelist Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/260aee1c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/260aee1c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/260aee1c Branch: refs/heads/ignite-3477-freelist Commit: 260aee1c8ffe3778f3fd030ac0c7d1e454cde218 Parents: ad06102 Author: sboikov Authored: Fri Jan 27 18:10:30 2017 +0300 Committer: sboikov Committed: Fri Jan 27 18:10:30 2017 +0300 ---------------------------------------------------------------------- .../configuration/MemoryConfiguration.java | 2 +- .../apache/ignite/internal/IgniteKernal.java | 1 + .../apache/ignite/internal/pagemem/Page.java | 2 + .../internal/pagemem/impl/PageNoStoreImpl.java | 5 + .../cache/IgniteCacheOffheapManagerImpl.java | 2 +- .../IgniteCacheDatabaseSharedManager.java | 15 +- .../cache/database/freelist/DataPageList.java | 107 +++ .../cache/database/freelist/FreeListImpl2.java | 623 +++++++++++++++ .../cache/database/tree/io/DataPageIO.java | 38 +- .../apache/ignite/internal/util/GridUnsafe.java | 4 + .../database/FreeListImpl2SelfTest.java | 759 +++++++++++++++++++ .../database/FreeListImplSelfTest.java | 32 +- 12 files changed, 1582 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/configuration/MemoryConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/MemoryConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/MemoryConfiguration.java index 242354e..ed37784 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/MemoryConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/MemoryConfiguration.java @@ -29,7 +29,7 @@ public class MemoryConfiguration implements Serializable { private static final long serialVersionUID = 0L; /** Default cache size is 1Gb. */ - public static final long DFLT_PAGE_CACHE_SIZE = 1024 * 1024 * 1024; + public static final long DFLT_PAGE_CACHE_SIZE = 4L * 1024 * 1024 * 1024; /** Default page size. */ public static final int DFLT_PAGE_SIZE = 2 * 1024; http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c280b30..459e474 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1131,6 +1131,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { String msg = NL + "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL + " ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL + + " ^-- Memory [allocPages=" + ctx.cache().context().database().pages()+ "]" + NL + " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL + " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" + dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL + http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java index 2667e44..125b438 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java @@ -40,6 +40,8 @@ public interface Page extends AutoCloseable { */ public long getForReadPointer(); + public long pageAddress(); + /** * Releases reserved page. Released page can be evicted from RAM after flushing modifications to disk. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java index e82b5d2..22f5381 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java @@ -58,6 +58,11 @@ public class PageNoStoreImpl implements Page { } /** {@inheritDoc} */ + @Override public long pageAddress() { + return pointer(); + } + + /** {@inheritDoc} */ @Override public long id() { return pageId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index e1ea6d1..5e40c4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -895,7 +895,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple */ private boolean canUpdateOldRow(@Nullable CacheDataRow oldRow, DataRow dataRow) throws IgniteCheckedException { - if (oldRow == null || indexingEnabled) + if (true || oldRow == null || indexingEnabled) return false; CacheObjectContext coCtx = cctx.cacheObjectContext(); http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index 9c10057..b1da249 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl2; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.typedef.internal.U; @@ -49,7 +50,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap protected PageMemory pageMem; /** */ - private FreeListImpl freeList; + private FreeListImpl2 freeList; /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@ -79,7 +80,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. */ protected void initDataStructures() throws IgniteCheckedException { - freeList = new FreeListImpl(0, cctx.gridName(), pageMem, null, cctx.wal(), 0L, true); + freeList = new FreeListImpl2(0, cctx.gridName(), pageMem, null, cctx.wal(), 0L, true); } /** @@ -109,6 +110,9 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** {@inheritDoc} */ @Override protected void stop0(boolean cancel) { + if (freeList != null) + freeList.close(); + if (pageMem != null) pageMem.stop(); } @@ -120,6 +124,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap return false; } + public long pages() { + if (pageMem instanceof PageMemoryNoStoreImpl) + return ((PageMemoryNoStoreImpl) pageMem).loadedPages(); + + return -1; + } + /** * @return Page memory instance. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/DataPageList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/DataPageList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/DataPageList.java new file mode 100644 index 0000000..33ed2aa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/DataPageList.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.database.freelist; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.Page; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; +import org.apache.ignite.internal.util.GridUnsafe; + +/** + * + */ +public class DataPageList { + static { + try { + headOffset = GridUnsafe.objectFieldOffset(DataPageList.class.getDeclaredField("head")); + } + catch (NoSuchFieldException e) { + throw new NoSuchFieldError(); + } + } + + /** */ + private volatile Head head = new Head(0); + + /** */ + private static final long headOffset; + + /** */ + private final PageMemory pageMem; + + /** */ + private final DataPageIO io; + + /** + * @param pageMem Page memory. + */ + public DataPageList(PageMemory pageMem) { + this.pageMem = pageMem; + + io = DataPageIO.VERSIONS.latest(); + } + + public void put(Page page) throws IgniteCheckedException { + while (true) { + Head head = this.head; + + Head newHead = new Head(page.id()); + + io.setNextPageId(page.pageAddress(), head.pageId); + + if (GridUnsafe.compareAndSwapObject(this, headOffset, head, newHead)) + break; + } + } + + public Page take(int cacheId) throws IgniteCheckedException { + while (true) { + Head head = this.head; + + if (head.pageId == 0L) + return null; + + Page page = pageMem.page(cacheId, head.pageId); + + long pageAddr = page.pageAddress(); + + long nextPageId = io.getNextPageId(pageAddr); + + Head newHead = new Head(nextPageId); + + if (GridUnsafe.compareAndSwapObject(this, headOffset, head, newHead)) + return page; + } + } + + /** + * + */ + private static class Head { + /** */ + final long pageId; + + /** + * @param pageId Page ID. + */ + public Head(long pageId) { + this.pageId = pageId; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl2.java new file mode 100644 index 0000000..81186a6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl2.java @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.database.freelist; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceArray; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.pagemem.Page; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; +import org.apache.ignite.internal.processors.cache.database.DataStructure; +import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseBag; +import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseListImpl; +import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jsr166.LongAdder8; + +import static org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler.writePage; + +/** + */ +public class FreeListImpl2 extends DataStructure implements FreeList, ReuseList { + /** */ + private static final int BUCKETS = 256; // Must be power of 2. + + /** */ + private static final Integer COMPLETE = Integer.MAX_VALUE; + + /** */ + private static final Integer FAIL_I = Integer.MIN_VALUE; + + /** */ + private static final Long FAIL_L = Long.MAX_VALUE; + + /** */ + private static final int MIN_PAGE_FREE_SPACE = 8; + + /** */ + private final int shift; + + /** */ + private final int MIN_SIZE_FOR_DATA_PAGE; + + /** */ + private final PageHandler updateRow = + new PageHandler() { + @Override public Boolean run(Page page, PageIO iox, long pageAddr, CacheDataRow row, int itemId) + throws IgniteCheckedException { + DataPageIO io = (DataPageIO)iox; + + int rowSize = getRowSize(row); + + boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize); + + return updated; + } + }; + + /** */ + private final PageHandler compact = + new PageHandler() { + @Override public Boolean run(Page page, PageIO iox, long pageAddr, Void row, int itemId) + throws IgniteCheckedException { + DataPageIO io = (DataPageIO)iox; + + int freeSpace = io.getFreeSpace(pageAddr); + + int newFreeSpace = io.compact(pageAddr, freeSpace, pageSize()); + + assert freeSpace == newFreeSpace; + + if (newFreeSpace > MIN_PAGE_FREE_SPACE) { + int newBucket = bucket(newFreeSpace); + +// System.out.println("End compact [freeSpace=" + freeSpace + +// ", newSpace=" + newFreeSpace + +// ", b=" + newBucket + ']'); + + putInBucket(newBucket, page); + } +// else +// System.out.println("End compact, no reuse [freeSpace=" + freeSpace + +// ", newSpace=" + newFreeSpace + ']'); + + return Boolean.TRUE; + } + }; + + /** */ + private final PageHandler compact2 = + new PageHandler() { + @Override public Boolean run(Page page, PageIO iox, long pageAddr, Void ignore, int reqSpace) + throws IgniteCheckedException { + DataPageIO io = (DataPageIO)iox; + + int freeSpace = io.getFreeSpace(pageAddr); + int ts1 = io.getFreeSpace2(pageAddr); + + int newFreeSpace = io.compact(pageAddr, freeSpace, pageSize()); + + int ts2 = io.getFreeSpace2(pageAddr); + + assert freeSpace == newFreeSpace; + + if (newFreeSpace > MIN_PAGE_FREE_SPACE) { + if (newFreeSpace > reqSpace) + return Boolean.TRUE; + + int newBucket = bucket(newFreeSpace); + + putInBucket(newBucket, page); + } + + return Boolean.FALSE; + } + }; + + /** */ + private final PageHandler writeRow = + new PageHandler() { + @Override public Integer run(Page page, PageIO iox, long pageAddr, CacheDataRow row, int written) + throws IgniteCheckedException { + DataPageIO io = (DataPageIO)iox; + + int rowSize = getRowSize(row); + int oldFreeSpace = io.getFreeSpace(pageAddr); + + assert oldFreeSpace > 0 : oldFreeSpace; + + // If the full row does not fit into this page write only a fragment. + written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(page, pageAddr, io, row, rowSize): + addRowFragment(page, pageAddr, io, row, written, rowSize); + + // Reread free space after update. + int newFreeSpace = io.getFreeSpace(pageAddr); + + if (newFreeSpace > MIN_PAGE_FREE_SPACE) { + int bucket = bucket(newFreeSpace); + + putInBucket(bucket, page); + } + + // Avoid boxing with garbage generation for usual case. + return written == rowSize ? COMPLETE : written; + } + + /** + * @param page Page. + * @param pageAddr Page address. + * @param io IO. + * @param row Row. + * @param rowSize Row size. + * @return Written size which is always equal to row size here. + * @throws IgniteCheckedException If failed. + */ + private int addRow( + Page page, + long pageAddr, + DataPageIO io, + CacheDataRow row, + int rowSize + ) throws IgniteCheckedException { + io.addRow(pageAddr, row, rowSize, pageSize()); + + return rowSize; + } + + /** + * @param page Page. + * @param pageAddr Page address. + * @param io IO. + * @param row Row. + * @param written Written size. + * @param rowSize Row size. + * @return Updated written size. + * @throws IgniteCheckedException If failed. + */ + private int addRowFragment( + Page page, + long pageAddr, + DataPageIO io, + CacheDataRow row, + int written, + int rowSize + ) throws IgniteCheckedException { + int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize()); + + assert payloadSize > 0 : payloadSize; + + return written + payloadSize; + } + }; + + /** */ + private final PageHandler rmvRow = new PageHandler() { + @Override public Long run(Page page, PageIO iox, long pageAddr, Void arg, int itemId) + throws IgniteCheckedException { + DataPageIO io = (DataPageIO)iox; + + int oldFreeSpace = io.getFreeSpace(pageAddr); + + assert oldFreeSpace >= 0: oldFreeSpace; + + long nextLink = io.removeRow(pageAddr, itemId, pageSize()); + + int newFreeSpace = io.getFreeSpace(pageAddr); + + assert newFreeSpace > oldFreeSpace; + + // For common case boxed 0L will be cached inside of Long, so no garbage will be produced. + return nextLink; + } + }; + + /** */ + private final int STACKS_PER_BUCKET = 5; + + /** */ + private final AtomicReferenceArray[] buckets = new AtomicReferenceArray[BUCKETS]; + + final Thread compacter; + + private final ReuseListImpl reuseList; + + /** + * @param cacheId Cache ID. + * @param name Name (for debug purpose). + * @param pageMem Page memory. + * @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself. + * @param wal Write ahead log manager. + * @param metaPageId Metadata page ID. + * @param initNew {@code True} if new metadata should be initialized. + * @throws IgniteCheckedException If failed. + */ + public FreeListImpl2( + int cacheId, + String name, + PageMemory pageMem, + ReuseList reuseList, + IgniteWriteAheadLogManager wal, + long metaPageId, + boolean initNew) throws IgniteCheckedException { + super(cacheId, pageMem, wal); + this.reuseList = new ReuseListImpl(cacheId, name, pageMem, wal, 0, true); + + int pageSize = pageMem.pageSize(); + + assert U.isPow2(pageSize) : "Page size must be a power of 2: " + pageSize; + assert U.isPow2(BUCKETS); + assert BUCKETS <= pageSize : pageSize; + + // TODO this constant is used because currently we cannot reuse data pages as index pages + // TODO and vice-versa. It should be removed when data storage format is finalized. + MIN_SIZE_FOR_DATA_PAGE = pageSize - DataPageIO.MIN_DATA_PAGE_OVERHEAD; + + int shift = 0; + + while (pageSize > BUCKETS) { + shift++; + pageSize >>>= 1; + } + + this.shift = shift; + + for (int i = 0; i < BUCKETS; i++) { + AtomicReferenceArray stacks = new AtomicReferenceArray<>(STACKS_PER_BUCKET); + + for (int j = 0; j < STACKS_PER_BUCKET; j++) + stacks.set(j, new DataPageList(pageMem)); + + buckets[i] = stacks; + } + + compacter = new Thread(new Runnable() { + @Override public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + compact(); + + Thread.sleep(100); + } + } + catch (InterruptedException ignore) { + // No-op. + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + + compacter.setName("compacter"); + compacter.setDaemon(true); + + //compacter.start(); + } + + private void putInBucket(int bucket, Page page) throws IgniteCheckedException { + AtomicReferenceArray b = buckets[bucket]; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (true) { + int idx = rnd.nextInt(STACKS_PER_BUCKET); + + DataPageList list = b.get(idx); + + if (list != null) { + //System.out.println(Thread.currentThread().getName() + " put in bucket [b=" + bucket + ", stripe=" + idx + ']'); + + list.put(page); + + return; + } + } + } + + private Page takeFromBucket(int bucket) throws IgniteCheckedException { + AtomicReferenceArray b = buckets[bucket]; + +// for (int i = 0; i < STACKS_PER_BUCKET; i++) { +// DataPageList list = b.get(i); +// +// if (list != null) { +// Page page = list.take(cacheId); +// +// if (page != null) +// return page; +// } +// } +// +// return null; + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (true) { + int idx = rnd.nextInt(STACKS_PER_BUCKET); + + DataPageList list = b.get(idx); + + if (list != null) + return list.take(cacheId); + } + } + + public void close() { + compacter.interrupt(); + + U.interrupt(compacter); + + try { + U.join(compacter); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + } + + /** + * @param freeSpace Page free space. + * @return Bucket. + */ + private int bucket(int freeSpace) { + assert freeSpace > 0 : freeSpace; + + int bucket = freeSpace >>> shift; + + assert bucket >= 0 && bucket < BUCKETS : bucket; + + return bucket; + } + + /** + * @param part Partition. + * @return Page. + * @throws IgniteCheckedException If failed. + */ + private Page allocateDataPage(int part) throws IgniteCheckedException { + assert part <= PageIdAllocator.MAX_PARTITION_ID; + assert part != PageIdAllocator.INDEX_PARTITION; + + long pageId = pageMem.allocatePage(cacheId, part, PageIdAllocator.FLAG_DATA); + + return pageMem.page(cacheId, pageId); + } + + public void compact() throws IgniteCheckedException { + for (int b = 0; b < BUCKETS; b++) + compactBucket(b); + } + + private void compactBucket(int b) throws IgniteCheckedException { + AtomicReferenceArray stacks = buckets[b]; + + for (int i = 0; i < stacks.length(); i++) { + DataPageList pageList = stacks.get(i); + + if (pageList != null) { + boolean take = stacks.compareAndSet(i, pageList, null); + + if (take) { + compactStack(pageList); + + stacks.set(i, pageList); + } + } + } + } + + private void compactStack(DataPageList pageList) throws IgniteCheckedException { + Page page; + + while ((page = pageList.take(cacheId)) != null) { + //System.out.println("Start compact [b=" + b + ", stripe=" + i + ']'); + + Boolean ok = writePage(pageMem, page, this, compact, null, wal, null, 0, Boolean.FALSE); + + assert ok; + } + } + + private final AtomicBoolean cg = new AtomicBoolean(); + + public boolean locCompact; + + /** {@inheritDoc} */ + @Override public void insertDataRow(CacheDataRow row) throws IgniteCheckedException { + int rowSize = getRowSize(row); + + int written = 0; + + do { + int freeSpace = Math.min(MIN_SIZE_FOR_DATA_PAGE, rowSize - written); + + int bucket = bucket(freeSpace); + + Page foundPage = null; + + // TODO: properly handle reuse bucket. + for (int b = bucket; b < BUCKETS; b++) { + foundPage = takeFromBucket(b); + + if (foundPage != null) + break; + } + + if (locCompact && foundPage == null && bucket > 0 && cg.compareAndSet(false, true)) { + try { + for (int b = 0; b < bucket; b++) { + AtomicReferenceArray stacks = buckets[b]; + + for (int i = 0; i < STACKS_PER_BUCKET; i++) { + DataPageList pageList = stacks.get(i); + + if (pageList != null) { + boolean take = stacks.compareAndSet(i, pageList, null); + + if (take) { + Page page; + + while ((page = pageList.take(cacheId)) != null) { + Boolean found = writePage(pageMem, + page, + this, + compact2, + null, + wal, + null, + freeSpace, + null); + + assert found != null; + + if (found) { + foundPage = page; + + break; + } + } + + stacks.set(i, pageList); + } + } + } + + if (foundPage != null) + break; + } + } + finally { + cg.set(false); + } + } + + try (Page page = foundPage == null ? allocateDataPage(row.partition()) : foundPage) { + // If it is an existing page, we do not need to initialize it. + DataPageIO init = foundPage == null ? DataPageIO.VERSIONS.latest() : null; + + written = writePage(pageMem, page, this, writeRow, init, wal, row, written, FAIL_I); + + assert written != FAIL_I; // We can't fail here. + } + } + while (written != COMPLETE); + } + + /** {@inheritDoc} */ + @Override public boolean updateDataRow(long link, CacheDataRow row) throws IgniteCheckedException { + assert link != 0; + + long pageId = PageIdUtils.pageId(link); + int itemId = PageIdUtils.itemId(link); + + try (Page page = pageMem.page(cacheId, pageId)) { + Boolean updated = writePage(pageMem, page, this, updateRow, row, itemId, null); + + assert updated != null; // Can't fail here. + + return updated != null ? updated : false; + } + } + + /** {@inheritDoc} */ + @Override public void removeDataRowByLink(long link) throws IgniteCheckedException { + assert link != 0; + + long pageId = PageIdUtils.pageId(link); + int itemId = PageIdUtils.itemId(link); + + long nextLink; + + try (Page page = pageMem.page(cacheId, pageId)) { + nextLink = writePage(pageMem, page, this, rmvRow, null, itemId, FAIL_L); + + assert nextLink != FAIL_L; // Can't fail here. + } + + while (nextLink != 0L) { + itemId = PageIdUtils.itemId(nextLink); + pageId = PageIdUtils.pageId(nextLink); + + try (Page page = pageMem.page(cacheId, pageId)) { + nextLink = writePage(pageMem, page, this, rmvRow, null, itemId, FAIL_L); + + assert nextLink != FAIL_L; // Can't fail here. + } + } + } + + /** {@inheritDoc} */ + @Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException { + reuseList.addForRecycle(bag); +// assert reuseList == this: "not allowed to be a reuse list"; +// +// //put(bag, null, 0L, REUSE_BUCKET); +// +// long pageId; +// +// while ((pageId = bag.pollFreePage()) != 0) { +// try (Page page = pageMem.page(cacheId, pageId)) { +// //putInBucket(REUSE_BUCKET, page); +// buckets[REUSE_BUCKET].get(0).put(page); +// } +// } + } + + /** {@inheritDoc} */ + @Override public long takeRecycledPage() throws IgniteCheckedException { + return reuseList.takeRecycledPage(); +// assert reuseList == this: "not allowed to be a reuse list"; +// +// Page page = buckets[REUSE_BUCKET].get(0).take(cacheId); +// +// return page != null ? page.id() : 0; + } + + /** {@inheritDoc} */ + @Override public long recycledPagesCount() throws IgniteCheckedException { + //assert reuseList == this: "not allowed to be a reuse list"; + + return reuseList.recycledPagesCount(); + } + + /** + * @param row Row. + * @return Entry size on page. + * @throws IgniteCheckedException If failed. + */ + private static int getRowSize(CacheDataRow row) throws IgniteCheckedException { + int keyLen = row.key().valueBytesLength(null); + int valLen = row.value().valueBytesLength(null); + + return keyLen + valLen + CacheVersionIO.size(row.version(), false) + 8; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "FreeList [name=" + "new" + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java index fdb812f..23ee970 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java @@ -51,7 +51,10 @@ public class DataPageIO extends PageIO { private static final int SHOW_LINK = 0b0100; /** */ - private static final int FREE_LIST_PAGE_ID_OFF = COMMON_HEADER_END; + private static final int NEXT_PAGE_ID_OFF = COMMON_HEADER_END; + + /** */ + private static final int FREE_LIST_PAGE_ID_OFF = NEXT_PAGE_ID_OFF + 8; /** */ private static final int FREE_SPACE_OFF = FREE_LIST_PAGE_ID_OFF + 8; @@ -127,6 +130,22 @@ public class DataPageIO extends PageIO { /** * @param pageAddr Page address. + * @param nextPageId Next data page ID. + */ + public void setNextPageId(long pageAddr, long nextPageId) { + PageUtils.putLong(pageAddr, NEXT_PAGE_ID_OFF, nextPageId); + } + + /** + * @param pageAddr Page address. + * @return Next data page ID. + */ + public long getNextPageId(long pageAddr) { + return PageUtils.getLong(pageAddr, NEXT_PAGE_ID_OFF); + } + + /** + * @param pageAddr Page address. * @param dataOff Data offset. * @param show What elements of data page entry to show in the result. * @return Data page entry size. @@ -194,6 +213,14 @@ public class DataPageIO extends PageIO { PageUtils.putShort(pageAddr, FREE_SPACE_OFF, (short)freeSpace); } + public int getFreeSpace2(long pageAddr) { + int directCnt = getDirectCount(pageAddr); + int indirectCnt = getIndirectCount(pageAddr); + int firstEntryOff = getFirstEntryOffset(pageAddr); + + return firstEntryOff - (ITEMS_OFF + ITEM_SIZE * (directCnt + indirectCnt)); + } + /** * Free space refers to a "max row size (without any data page specific overhead) which is * guaranteed to fit into this data page". @@ -471,6 +498,15 @@ public class DataPageIO extends PageIO { nextLink); } + public int compact(long pageAddr, int freeSpace, int pageSize) { + int directCnt = getDirectCount(pageAddr); + int indirectCnt = getIndirectCount(pageAddr); + + getDataOffsetForWrite(pageAddr, freeSpace, directCnt, indirectCnt, pageSize); + + return getFreeSpace(pageAddr); + } + /** * @param pageAddr Page address. * @param idx Item index. http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java index bbbf22b..1210000 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java @@ -1187,6 +1187,10 @@ public abstract class GridUnsafe { return UNSAFE.compareAndSwapLong(obj, off, exp, upd); } + public static boolean compareAndSwapObject(Object obj, long off, Object exp, Object upd) { + return UNSAFE.compareAndSwapObject(obj, off, exp, upd); + } + /** * Gets byte value with volatile semantic. * http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImpl2SelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImpl2SelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImpl2SelfTest.java new file mode 100644 index 0000000..bcb4c58 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImpl2SelfTest.java @@ -0,0 +1,759 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.Stack; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; +import org.apache.ignite.internal.pagemem.Page; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; +import org.apache.ignite.internal.processors.cache.database.freelist.DataPageList; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl2; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class FreeListImpl2SelfTest extends GridCommonAbstractTest { + /** */ + private static final int CPUS = Runtime.getRuntime().availableProcessors(); + + /** */ + private static final long MB = 1024L * 1024L; + + /** */ + private PageMemoryNoStoreImpl pageMem; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + if (pageMem != null) + pageMem.stop(); + + pageMem = null; + } + + /** + * @throws Exception If failed. + */ + public void testCompact() throws Exception { + pageMem = createPageMemory(1024); + + FreeListImpl2 fl = new FreeListImpl2(1, "freelist", pageMem, null, null, 0, true); + + for (int iter = 0; iter < 100_000; iter++) { + System.out.println("Iter: " + iter + ", allocated=" + pageMem.loadedPages()); + + List links = new ArrayList<>(); + + for (int i = 0; i < 1000; i++) { + TestDataRow row = new TestDataRow(64, 64); + + fl.insertDataRow(row); + + links.add(row.link()); + } + + for (Long link : links) + fl.removeDataRowByLink(link); + } + + fl.close(); + } + + /** + * @throws Exception If failed. + */ + public void testCompact1() throws Exception { + pageMem = createPageMemory(1024); + + FreeListImpl2 fl = new FreeListImpl2(1, "freelist", pageMem, null, null, 0, true); + + for (int iter = 0; iter < 1; iter++) { + System.out.println("Iter: " + iter + ", allocated=" + pageMem.loadedPages()); + + List links = new ArrayList<>(); + + for (int i = 0; i < 100_000; i++) { + TestDataRow row = new TestDataRow(64, 64); + + fl.insertDataRow(row); + + links.add(row.link()); + } + + for (Long link : links) + fl.removeDataRowByLink(link); + + fl.locCompact = true; + + TestDataRow row = new TestDataRow(64, 64); + + fl.insertDataRow(row); + } + + fl.close(); + } + + /** + * @throws Exception If failed. + */ + public void testStack() throws Exception { + pageMem = createPageMemory(1024); + + DataPageList pl = new DataPageList(pageMem); + + Deque ids = new LinkedList<>(); + + for (int i = 0; i < 20_000; i++) { + long id = pageMem.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + + assert id != 0; + + pl.put(pageMem.page(1, id)); + + ids.push(id); + } + + for (int i = 0; i < 20_000; i++) { + Page page = pl.take(1); + Long id = ids.pop(); + + assertEquals((Long)page.id(), id); + } + + assertNull(pl.take(1)); + } + + /** + * @throws Exception If failed. + */ + public void testStackConcurrent1() throws Exception { + pageMem = createPageMemory(1024); + + final DataPageList pl = new DataPageList(pageMem); + + Set ids = new HashSet<>(); + + for (int i = 0; i < 100_000; i++) { + long id = pageMem.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + + ids.add(id); + } + + for (int i = 0; i < 100; i++) { + System.out.println("Iter: " + i); + + for (Long id : ids) + pl.put(pageMem.page(1, id)); + + final int THREADS = 16; + + final CyclicBarrier b = new CyclicBarrier(THREADS); + + final ConcurrentHashSet taken = new ConcurrentHashSet<>(); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + b.await(); + + for (int i = 0; i < 20_000; i++) { + Page page = pl.take(1); + + if (page != null) + taken.add(page.id()); + } + + Page page; + + while ((page = pl.take(1)) != null) + taken.add(page.id()); + + return null; + } + }, THREADS, "take"); + + assertEquals(ids.size(), taken.size()); + } + } + + /** + * @throws Exception If failed. + */ + public void testStackConcurrent2() throws Exception { + pageMem = createPageMemory(1024); + + final DataPageList pl = new DataPageList(pageMem); + + Set ids = new HashSet<>(); + + for (int i = 0; i < 100_000; i++) { + long id = pageMem.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + + ids.add(id); + } + + for (int i = 0; i < 100; i++) { + System.out.println("Iter: " + i); + + for (Long id : ids) + pl.put(pageMem.page(1, id)); + + final int THREADS = 16; + + final CyclicBarrier b = new CyclicBarrier(THREADS); + + final AtomicLong takeCnt = new AtomicLong(); + + final AtomicLong putCnt = new AtomicLong(ids.size()); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + b.await(); + + for (int i = 0; i < 200_000; i++) { + Page page = pl.take(1); + + if (page != null) { + takeCnt.incrementAndGet(); + + if (ThreadLocalRandom.current().nextBoolean()) { + pl.put(page); + + putCnt.incrementAndGet(); + } + } + } + + Page page; + + while ((page = pl.take(1)) != null) + takeCnt.incrementAndGet(); + + return null; + } + }, THREADS, "take"); + + assertEquals(putCnt.get(), takeCnt.get()); + } + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteSingleThreaded_1024() throws Exception { + checkInsertDeleteSingleThreaded(1024); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteSingleThreaded_2048() throws Exception { + checkInsertDeleteSingleThreaded(2048); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteSingleThreaded_4096() throws Exception { + checkInsertDeleteSingleThreaded(4096); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteSingleThreaded_8192() throws Exception { + checkInsertDeleteSingleThreaded(8192); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteSingleThreaded_16384() throws Exception { + checkInsertDeleteSingleThreaded(16384); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteMultiThreaded_1024() throws Exception { + checkInsertDeleteMultiThreaded(1024); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteMultiThreaded_2048() throws Exception { + checkInsertDeleteMultiThreaded(2048); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteMultiThreaded_4096() throws Exception { + checkInsertDeleteMultiThreaded(4096); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteMultiThreaded_8192() throws Exception { + checkInsertDeleteMultiThreaded(8192); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteMultiThreaded_16384() throws Exception { + checkInsertDeleteMultiThreaded(16384); + } + + /** + * @param pageSize Page size. + * @throws Exception + */ + protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Exception { + final FreeList list = createFreeList(pageSize); + + Random rnd = new Random(); + + final ConcurrentMap stored = new ConcurrentHashMap<>(); + + for (int i = 0; i < 100; i++) { + int keySize = rnd.nextInt(pageSize * 3 / 2) + 10; + int valSize = rnd.nextInt(pageSize * 5 / 2) + 10; + + TestDataRow row = new TestDataRow(keySize, valSize); + + list.insertDataRow(row); + + assertTrue(row.link() != 0L); + + TestDataRow old = stored.put(row.link(), row); + + assertNull(old); + } + + final AtomicBoolean grow = new AtomicBoolean(true); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Object call() throws Exception { + Random rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 1_000_000; i++) { + boolean grow0 = grow.get(); + + if (grow0) { + if (stored.size() > 20_000) { + if (grow.compareAndSet(true, false)) + info("Shrink... [" + stored.size() + ']'); + + grow0 = false; + } + } + else { + if (stored.size() < 1_000) { + if (grow.compareAndSet(false, true)) + info("Grow... [" + stored.size() + ']'); + + grow0 = true; + } + } + + boolean insert = rnd.nextInt(100) < 70 == grow0; + + if (insert) { + int keySize = rnd.nextInt(pageSize * 3 / 2) + 10; + int valSize = rnd.nextInt(pageSize * 3 / 2) + 10; + + TestDataRow row = new TestDataRow(keySize, valSize); + + list.insertDataRow(row); + + assertTrue(row.link() != 0L); + + TestDataRow old = stored.put(row.link(), row); + + assertNull(old); + } + else { + while (true) { + Iterator it = stored.values().iterator(); + + if (it.hasNext()) { + TestDataRow row = it.next(); + + TestDataRow rmvd = stored.remove(row.link); + + if (rmvd != null) { + list.removeDataRowByLink(row.link); + + break; + } + } + } + } + } + + return null; + } + }, 8, "runner"); + } + + /** + * @throws Exception if failed. + */ + protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception { + FreeList list = createFreeList(pageSize); + + Random rnd = new Random(); + + Map stored = new HashMap<>(); + + for (int i = 0; i < 100; i++) { + int keySize = rnd.nextInt(pageSize * 3 / 2) + 10; + int valSize = rnd.nextInt(pageSize * 5 / 2) + 10; + + TestDataRow row = new TestDataRow(keySize, valSize); + + list.insertDataRow(row); + + assertTrue(row.link() != 0L); + + TestDataRow old = stored.put(row.link(), row); + + assertNull(old); + } + + boolean grow = true; + + for (int i = 0; i < 1_000_000; i++) { + if (grow) { + if (stored.size() > 20_000) { + grow = false; + + info("Shrink... [" + stored.size() + ']'); + } + } + else { + if (stored.size() < 1_000) { + grow = true; + + info("Grow... [" + stored.size() + ']'); + } + } + + boolean insert = rnd.nextInt(100) < 70 == grow; + + if (insert) { + int keySize = rnd.nextInt(pageSize * 3 / 2) + 10; + int valSize = rnd.nextInt(pageSize * 3 / 2) + 10; + + TestDataRow row = new TestDataRow(keySize, valSize); + + list.insertDataRow(row); + + assertTrue(row.link() != 0L); + + TestDataRow old = stored.put(row.link(), row); + + assertNull(old); + } + else { + Iterator it = stored.values().iterator(); + + if (it.hasNext()) { + TestDataRow row = it.next(); + + TestDataRow rmvd = stored.remove(row.link); + + assertTrue(rmvd == row); + + list.removeDataRowByLink(row.link); + } + } + } + } + + /** + * @return Page memory. + */ + protected PageMemoryNoStoreImpl createPageMemory(int pageSize) throws Exception { + long[] sizes = new long[CPUS]; + + for (int i = 0; i < sizes.length; i++) + sizes[i] = 1024 * MB / CPUS; + + PageMemoryNoStoreImpl pageMem = new PageMemoryNoStoreImpl(log, new UnsafeMemoryProvider(sizes), null, pageSize, true); + + pageMem.start(); + + return pageMem; + } + + /** + * @param pageSize Page size. + * @return Free list. + * @throws Exception If failed. + */ + protected FreeList createFreeList(int pageSize) throws Exception { + pageMem = createPageMemory(pageSize); + + long metaPageId = pageMem.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + + return new FreeListImpl(1, "freelist", pageMem, null, null, metaPageId, true); + } + + /** + * + */ + private static class TestDataRow implements CacheDataRow { + /** */ + private long link; + + /** */ + private TestCacheObject key; + + /** */ + private TestCacheObject val; + + /** */ + private GridCacheVersion ver; + + /** + * @param keySize Key size. + * @param valSize Value size. + */ + private TestDataRow(int keySize, int valSize) { + key = new TestCacheObject(keySize); + val = new TestCacheObject(valSize); + ver = new GridCacheVersion(keySize, valSize, 0L, 1); + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject key() { + return key; + } + + /** {@inheritDoc} */ + @Override public CacheObject value() { + return val; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public long expireTime() { + return 0; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return 0; + } + + /** {@inheritDoc} */ + @Override public long link() { + return link; + } + + /** {@inheritDoc} */ + @Override public void link(long link) { + this.link = link; + } + + /** {@inheritDoc} */ + @Override public int hash() { + throw new UnsupportedOperationException(); + } + } + + /** + * + */ + private static class TestCacheObject implements KeyCacheObject { + /** */ + private byte[] data; + + /** + * @param size Object size. + */ + private TestCacheObject(int size) { + data = new byte[size]; + + Arrays.fill(data, (byte)size); + } + + /** {@inheritDoc} */ + @Override public boolean internal() { + return false; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void partition(int part) { + assert false; + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject copy(int part) { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public T value(CacheObjectContext ctx, boolean cpy) { + return (T)data; + } + + /** {@inheritDoc} */ + @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { + return data; + } + + /** {@inheritDoc} */ + @Override public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException { + return data.length; + } + + /** {@inheritDoc} */ + @Override public boolean putValue(ByteBuffer buf) throws IgniteCheckedException { + buf.put(data); + + return true; + } + + /** {@inheritDoc} */ + @Override public int putValue(long addr) throws IgniteCheckedException { + PageUtils.putBytes(addr, 0, data); + + return data.length; + } + + /** {@inheritDoc} */ + @Override public boolean putValue(ByteBuffer buf, int off, int len) throws IgniteCheckedException { + buf.put(data, off, len); + + return true; + } + + /** {@inheritDoc} */ + @Override public byte cacheObjectType() { + return 42; + } + + /** {@inheritDoc} */ + @Override public boolean isPlatformType() { + return false; + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + assert false; + + return this; + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException { + assert false; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + assert false; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + assert false; + + return 0; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + assert false; + + return 0; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + assert false; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/260aee1c/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java index d0d495e..027e0ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.processors.database; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -40,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImpl2; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -58,7 +61,7 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest { private static final long MB = 1024L * 1024L; /** */ - private PageMemory pageMem; + private PageMemoryNoStoreImpl pageMem; /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { @@ -69,6 +72,29 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest { pageMem = null; } + /** + * @throws Exception If failed. + */ + public void testCompact() throws Exception { + FreeList fl = createFreeList(1024); + + for (int iter = 0; iter < 10_000; iter++) { + System.out.println("Iter: " + iter + ", allocated=" + pageMem.loadedPages()); + + List links = new ArrayList<>(); + + for (int i = 0; i < 1000; i++) { + TestDataRow row = new TestDataRow(64, 64); + + fl.insertDataRow(row); + + links.add(row.link()); + } + + for (Long link : links) + fl.removeDataRowByLink(link); + } + } /** * @throws Exception if failed. @@ -310,13 +336,13 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest { /** * @return Page memory. */ - protected PageMemory createPageMemory(int pageSize) throws Exception { + protected PageMemoryNoStoreImpl createPageMemory(int pageSize) throws Exception { long[] sizes = new long[CPUS]; for (int i = 0; i < sizes.length; i++) sizes[i] = 1024 * MB / CPUS; - PageMemory pageMem = new PageMemoryNoStoreImpl(log, new UnsafeMemoryProvider(sizes), null, pageSize, true); + PageMemoryNoStoreImpl pageMem = new PageMemoryNoStoreImpl(log, new UnsafeMemoryProvider(sizes), null, pageSize, true); pageMem.start();