Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 293A6200C8A for ; Sun, 4 Jun 2017 10:03:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 28283160BEA; Sun, 4 Jun 2017 08:03:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BA9C8160BE0 for ; Sun, 4 Jun 2017 10:03:01 +0200 (CEST) Received: (qmail 49927 invoked by uid 500); 4 Jun 2017 08:03:01 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 49756 invoked by uid 99); 4 Jun 2017 08:03:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Jun 2017 08:03:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7EE1AE04F2; Sun, 4 Jun 2017 08:03:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Sun, 04 Jun 2017 08:03:01 -0000 Message-Id: <7b637eece1384019a2c97433a92021d1@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/18] ignite git commit: ignite-5075 'logical' caches sharing the same 'physical' cache group archived-at: Sun, 04 Jun 2017 08:03:04 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java index 9bced42..3904205 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.database; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; @@ -39,6 +41,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSna import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; @@ -48,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.database.freelist.FreeListImp import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.database.tree.io.PageMetaIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionCountersIO; import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionMetaIO; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseListImpl; @@ -56,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.T2; @@ -80,60 +85,70 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple RootPage reuseListRoot = metas.reuseListRoot; - reuseList = new ReuseListImpl(cctx.cacheId(), - cctx.name(), - cctx.memoryPolicy().pageMemory(), - cctx.shared().wal(), + reuseList = new ReuseListImpl(grp.groupId(), + grp.cacheOrGroupName(), + grp.memoryPolicy().pageMemory(), + ctx.wal(), reuseListRoot.pageId().pageId(), reuseListRoot.isAllocated()); RootPage metastoreRoot = metas.treeRoot; - metaStore = new MetadataStorage(cctx.memoryPolicy().pageMemory(), - cctx.shared().wal(), + metaStore = new MetadataStorage(grp.memoryPolicy().pageMemory(), + ctx.wal(), globalRemoveId(), - cctx.cacheId(), + grp.groupId(), PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, reuseList, metastoreRoot.pageId().pageId(), metastoreRoot.isAllocated()); - if (cctx.shared().ttl().eagerTtlEnabled()) { - final String name = "PendingEntries"; + ((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this); + } - RootPage pendingRootPage = metaStore.getOrAllocateForTree(name); + /** {@inheritDoc} */ + @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { + if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { + ctx.database().checkpointReadLock(); - pendingEntries = new PendingEntriesTree( - cctx, - name, - cctx.memoryPolicy().pageMemory(), - pendingRootPage.pageId().pageId(), - reuseList, - pendingRootPage.isAllocated() - ); + try { + final String name = "PendingEntries"; + + RootPage pendingRootPage = metaStore.getOrAllocateForTree(name); + + pendingEntries = new PendingEntriesTree( + grp, + name, + grp.memoryPolicy().pageMemory(), + pendingRootPage.pageId().pageId(), + reuseList, + pendingRootPage.isAllocated() + ); + } + finally { + ctx.database().checkpointReadUnlock(); + } } - - ((GridCacheDatabaseSharedManager)cctx.shared().database()).addCheckpointListener(this); } /** {@inheritDoc} */ @Override protected CacheDataStore createCacheDataStore0(final int p) throws IgniteCheckedException { - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.shared().database(); + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.database(); - if (!cctx.allowFastEviction()) - dbMgr.cancelOrWaitPartitionDestroy(cctx, p); + if (!grp.allowFastEviction()) + dbMgr.cancelOrWaitPartitionDestroy(grp.groupId(), p); - boolean exists = cctx.shared().pageStore() != null - && cctx.shared().pageStore().exists(cctx.cacheId(), p); + boolean exists = ctx.pageStore() != null + && ctx.pageStore().exists(grp.groupId(), p); return new GridCacheDataStore(p, exists); } /** {@inheritDoc} */ @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { - assert cctx.memoryPolicy().pageMemory() instanceof PageMemoryEx; + assert grp.memoryPolicy().pageMemory() instanceof PageMemoryEx; reuseList.saveMetadata(); @@ -167,11 +182,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple freeList.saveMetadata(); long updCntr = store.updateCounter(); - int size = store.size(); + int size = store.fullSize(); long rmvId = globalRemoveId().get(); - PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = cctx.shared().wal(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); + IgniteWriteAheadLogManager wal = this.ctx.wal(); if (size > 0 || updCntr > 0) { int state = -1; @@ -180,7 +195,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple state = GridDhtPartitionState.EVICTED.ordinal(); else { // localPartition will not acquire writeLock here because create=false. - GridDhtLocalPartition part = cctx.topology().localPartition(store.partId(), + GridDhtLocalPartition part = grp.topology().localPartition(store.partId(), AffinityTopologyVersion.NONE, false); if (part != null && part.state() != GridDhtPartitionState.EVICTED) @@ -191,12 +206,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (state == -1) return false; - int cacheId = cctx.cacheId(); - long partMetaId = pageMem.partitionMetaPageId(cacheId, store.partId()); - long partMetaPage = pageMem.acquirePage(cacheId, partMetaId); + int grpId = grp.groupId(); + long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId()); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { - long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage); + long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); try { PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); @@ -207,69 +222,132 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple io.setPartitionState(pageAddr, (byte)state); - int pageCount; + long cntrsPageId; + + if (grp.sharedGroup()) { + cntrsPageId = io.getCountersPageId(pageAddr); + + byte[] data = serializeCacheSizes(store.cacheSizes()); + + int items = data.length / 12; + int written = 0; + int pageSize = pageMem.pageSize(); + + boolean init = cntrsPageId == 0; + + if (init) { + cntrsPageId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA); + io.setCountersPageId(pageAddr, cntrsPageId); + } + + long nextId = cntrsPageId; + + while (written != items) { + final long curId = nextId; + final long curPage = pageMem.acquirePage(grpId, curId); + + try { + final long curAddr = pageMem.writeLock(grpId, curId, curPage); + + assert curAddr != 0; + + try { + PagePartitionCountersIO partMetaIo; + + if (init) { + partMetaIo = PagePartitionCountersIO.VERSIONS.latest(); + partMetaIo.initNewPage(curAddr, curId, pageSize); + } + else + partMetaIo = PageIO.getPageIO(curAddr); + + written += partMetaIo.writeCacheSizes(pageSize, curAddr, data, written); + + nextId = partMetaIo.getNextCountersPageId(curAddr); + + if(written != items && (init = nextId == 0)) { + //allocate new counters page + nextId = pageMem.allocatePage(grpId, store.partId(), PageIdAllocator.FLAG_DATA); + partMetaIo.setNextCountersPageId(curAddr, nextId); + } + } + finally { + // Write full page + pageMem.writeUnlock(grpId, curId, curPage, Boolean.TRUE, true); + } + } + finally { + pageMem.releasePage(grpId, curId, curPage); + } + } + } + else + cntrsPageId = 0L; + + int pageCnt; if (beforeSnapshot) { - pageCount = cctx.shared().pageStore().pages(cctx.cacheId(), store.partId()); - io.setCandidatePageCount(pageAddr, pageCount); + pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); + io.setCandidatePageCount(pageAddr, pageCnt); if (saveMeta) { - long metaPageId = pageMem.metaPageId(cctx.cacheId()); - long metaPage = pageMem.acquirePage(cctx.cacheId(), metaPageId); + long metaPageId = pageMem.metaPageId(grpId); + long metaPage = pageMem.acquirePage(grpId, metaPageId); try { - long metaPageAddr = pageMem.writeLock(cctx.cacheId(), metaPageId, metaPage); + long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage); try { long nextSnapshotTag = io.getNextSnapshotTag(metaPageAddr); io.setNextSnapshotTag(metaPageAddr, nextSnapshotTag + 1); - if (PageHandler.isWalDeltaRecordNeeded(pageMem, cctx.cacheId(), metaPageId, + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaPageId, metaPage, wal, null)) - wal.log(new MetaPageUpdateNextSnapshotId(cctx.cacheId(), metaPageId, + wal.log(new MetaPageUpdateNextSnapshotId(grpId, metaPageId, nextSnapshotTag + 1)); - addPartition(ctx.partitionStatMap(), metaPageAddr, io, cctx.cacheId(), PageIdAllocator.INDEX_PARTITION, - cctx.kernalContext().cache().context().pageStore().pages(cacheId, PageIdAllocator.INDEX_PARTITION)); + addPartition(ctx.partitionStatMap(), metaPageAddr, io, grpId, PageIdAllocator.INDEX_PARTITION, + this.ctx.kernalContext().cache().context().pageStore().pages(grpId, PageIdAllocator.INDEX_PARTITION)); } finally { - pageMem.writeUnlock(cctx.cacheId(), metaPageId, metaPage, null, true); + pageMem.writeUnlock(grpId, metaPageId, metaPage, null, true); } } finally { - pageMem.releasePage(cctx.cacheId(), metaPageId, metaPage); + pageMem.releasePage(grpId, metaPageId, metaPage); } wasSaveToMeta = true; } - GridDhtPartitionMap partMap = cctx.topology().localPartitionMap(); + GridDhtPartitionMap partMap = grp.topology().localPartitionMap(); if (partMap.containsKey(store.partId()) && partMap.get(store.partId()) == GridDhtPartitionState.OWNING) - addPartition(ctx.partitionStatMap(), pageAddr, io, cctx.cacheId(), store.partId(), - cctx.kernalContext().cache().context().pageStore().pages(cctx.cacheId(), store.partId())); + addPartition(ctx.partitionStatMap(), pageAddr, io, grpId, store.partId(), + this.ctx.pageStore().pages(grpId, store.partId())); } else - pageCount = io.getCandidatePageCount(pageAddr); + pageCnt = io.getCandidatePageCount(pageAddr); - if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, partMetaId, partMetaPage, wal, null)) + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) wal.log(new MetaPageUpdatePartitionDataRecord( - cacheId, + grpId, partMetaId, updCntr, rmvId, size, + cntrsPageId, (byte)state, - pageCount + pageCnt )); } finally { - pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, true); + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, true); } } finally { - pageMem.releasePage(cacheId, partMetaId, partMetaPage); + pageMem.releasePage(grpId, partMetaId, partMetaPage); } } } @@ -278,6 +356,23 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** + * @param cacheSizes Cache sizes. + * @return Serialized cache sizes + */ + private byte[] serializeCacheSizes(Map cacheSizes) { + // Item size = 4 bytes (cache ID) + 8 bytes (cache size) = 12 bytes + byte[] data = new byte[cacheSizes.size() * 12]; + long off = GridUnsafe.BYTE_ARR_OFF; + + for (Map.Entry entry : cacheSizes.entrySet()) { + GridUnsafe.putInt(data, off, entry.getKey()); off += 4; + GridUnsafe.putLong(data, off, entry.getValue()); off += 8; + } + + return data; + } + + /** * @param map Map to add values to. * @param pageAddr page address * @param io Page Meta IO @@ -304,23 +399,23 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException { - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { int p = store.partId(); saveStoreMetadata(store, null, false, true); - PageMemoryEx pageMemory = (PageMemoryEx)cctx.memoryPolicy().pageMemory(); + PageMemoryEx pageMemory = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - int tag = pageMemory.invalidate(cctx.cacheId(), p); + int tag = pageMemory.invalidate(grp.groupId(), p); - cctx.shared().wal().log(new PartitionDestroyRecord(cctx.cacheId(), p)); + ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p)); - cctx.shared().pageStore().onPartitionDestroyed(cctx.cacheId(), p, tag); + ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag); } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } @@ -354,12 +449,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public RootPage rootPageForIndex(String idxName) throws IgniteCheckedException { + @Override public RootPage rootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException { + if (grp.sharedGroup()) + idxName = Integer.toString(cacheId) + "_" + idxName; + return metaStore.getOrAllocateForTree(idxName); } /** {@inheritDoc} */ - @Override public void dropRootPageForIndex(String idxName) throws IgniteCheckedException { + @Override public void dropRootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException { + if (grp.sharedGroup()) + idxName = Integer.toString(cacheId) + "_" + idxName; + metaStore.dropRootPage(idxName); } @@ -369,10 +470,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override protected void destroyCacheDataStructures() { - assert cctx.affinityNode(); - - ((GridCacheDatabaseSharedManager)cctx.shared().database()).removeCheckpointListener(this); + @Override public void stop() { + if (grp.affinityNode()) + ((GridCacheDatabaseSharedManager)ctx.database()).removeCheckpointListener(this); } /** @@ -380,14 +480,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * @throws IgniteCheckedException If failed. */ private Metas getOrAllocateCacheMetas() throws IgniteCheckedException { - PageMemoryEx pageMem = (PageMemoryEx) cctx.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = cctx.shared().wal(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); + IgniteWriteAheadLogManager wal = ctx.wal(); + + int grpId = grp.groupId(); + long metaId = pageMem.metaPageId(grpId); + long metaPage = pageMem.acquirePage(grpId, metaId); - int cacheId = cctx.cacheId(); - long metaId = pageMem.metaPageId(cacheId); - long metaPage = pageMem.acquirePage(cacheId, metaId); try { - final long pageAddr = pageMem.writeLock(cacheId, metaId, metaPage); + final long pageAddr = pageMem.writeLock(grpId, metaId, metaPage); boolean allocated = false; @@ -399,15 +500,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple pageIO.initNewPage(pageAddr, metaId, pageMem.pageSize()); - metastoreRoot = pageMem.allocatePage(cacheId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); - reuseListRoot = pageMem.allocatePage(cacheId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); + metastoreRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); + reuseListRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); pageIO.setTreeRoot(pageAddr, metastoreRoot); pageIO.setReuseListRoot(pageAddr, reuseListRoot); - if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, metaId, metaPage, wal, null)) + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null)) wal.log(new MetaPageInitRecord( - cacheId, + grpId, metaId, pageIO.getType(), pageIO.getVersion(), @@ -427,15 +528,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } return new Metas( - new RootPage(new FullPageId(metastoreRoot, cacheId), allocated), - new RootPage(new FullPageId(reuseListRoot, cacheId), allocated)); + new RootPage(new FullPageId(metastoreRoot, grpId), allocated), + new RootPage(new FullPageId(reuseListRoot, grpId), allocated)); } finally { - pageMem.writeUnlock(cacheId, metaId, metaPage, null, allocated); + pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated); } } finally { - pageMem.releasePage(cacheId, metaId, metaPage); + pageMem.releasePage(grpId, metaId, metaPage); } } @@ -445,10 +546,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (partCntrSince == null) return super.rebalanceIterator(part, topVer, partCntrSince); - GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)cctx.shared().database(); + GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)ctx.database(); try { - WALPointer startPtr = database.searchPartitionCounter(cctx, part, partCntrSince); + WALPointer startPtr = database.searchPartitionCounter(grp.groupId(), part, partCntrSince); if (startPtr == null) { assert false : "partCntr=" + partCntrSince + ", reservations=" + S.toString(Map.class, database.reservedForPreloading()); @@ -456,9 +557,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple return super.rebalanceIterator(part, topVer, partCntrSince); } - WALIterator it = cctx.shared().wal().replay(startPtr); + WALIterator it = ctx.wal().replay(startPtr); - return new RebalanceIteratorAdapter(cctx, it, part); + return new RebalanceIteratorAdapter(grp, it, part); } catch (IgniteCheckedException e) { U.warn(log, "Failed to create WAL-based rebalance iterator (a full partition will transferred to a " + @@ -475,14 +576,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Cache context. */ - private GridCacheContext cctx; + /** Cache group caches. */ + private final Set cacheGrpCaches; /** WAL iterator. */ - private WALIterator walIt; + private final WALIterator walIt; /** Partition to scan. */ - private int part; + private final int part; /** */ private Iterator entryIt; @@ -491,12 +592,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple private CacheDataRow next; /** - * @param cctx Cache context. + * @param grp Cache group. * @param walIt WAL iterator. * @param part Partition ID. */ - private RebalanceIteratorAdapter(GridCacheContext cctx, WALIterator walIt, int part) { - this.cctx = cctx; + private RebalanceIteratorAdapter(CacheGroupContext grp, WALIterator walIt, int part) { + this.cacheGrpCaches = grp.cacheIds(); this.walIt = walIt; this.part = part; @@ -571,8 +672,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple while (entryIt.hasNext()) { DataEntry entry = entryIt.next(); - if (entry.cacheId() == cctx.cacheId() && - entry.partitionId() == part) { + if (entry.partitionId() == part && cacheGrpCaches.contains(entry.cacheId())) { next = new DataEntryRow(entry); @@ -745,7 +845,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple return null; } - IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database(); + IgniteCacheDatabaseSharedManager dbMgr = ctx.database(); dbMgr.checkpointReadLock(); @@ -756,12 +856,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple RootPage reuseRoot = metas.reuseListRoot; freeList = new FreeListImpl( - cctx.cacheId(), - cctx.name() + "-" + partId, - (MemoryMetricsImpl)cctx.memoryPolicy().memoryMetrics(), - cctx.memoryPolicy(), + grp.groupId(), + grp.cacheOrGroupName() + "-" + partId, + grp.memoryPolicy().memoryMetrics(), + grp.memoryPolicy(), null, - cctx.shared().wal(), + ctx.wal(), reuseRoot.pageId().pageId(), reuseRoot.isAllocated()) { @Override protected long allocatePageNoReuse() throws IgniteCheckedException { @@ -769,15 +869,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } }; - CacheDataRowStore rowStore = new CacheDataRowStore(cctx, freeList, partId); + CacheDataRowStore rowStore = new CacheDataRowStore(grp, freeList, partId); RootPage treeRoot = metas.treeRoot; CacheDataTree dataTree = new CacheDataTree( + grp, name, freeList, rowStore, - cctx, treeRoot.pageId().pageId(), treeRoot.isAllocated()) { @Override protected long allocatePageNoReuse() throws IgniteCheckedException { @@ -785,31 +885,72 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } }; - PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree); - int cacheId = cctx.cacheId(); - long partMetaId = pageMem.partitionMetaPageId(cacheId, partId); - long partMetaPage = pageMem.acquirePage(cacheId, partMetaId); + int grpId = grp.groupId(); + long partMetaId = pageMem.partitionMetaPageId(grpId, partId); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); + try { - long pageAddr = pageMem.readLock(cacheId, partMetaId, partMetaPage); + long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage); try { if (PageIO.getType(pageAddr) != 0) { PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest(); - delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr)); + Map cacheSizes = null; + + if (grp.sharedGroup()) { + long cntrsPageId = io.getCountersPageId(pageAddr); + + if (cntrsPageId != 0L) { + cacheSizes = new HashMap<>(); + + long nextId = cntrsPageId; + + while (true){ + final long curId = nextId; + final long curPage = pageMem.acquirePage(grpId, curId); + + try { + final long curAddr = pageMem.readLock(grpId, curId, curPage); + + assert curAddr != 0; - cctx.offheap().globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); + try { + PagePartitionCountersIO cntrsIO = PageIO.getPageIO(curAddr); + + if (cntrsIO.readCacheSizes(curAddr, cacheSizes)) + break; + + nextId = cntrsIO.getNextCountersPageId(curAddr); + + assert nextId != 0; + } + finally { + pageMem.readUnlock(grpId, curId, curPage); + } + } + finally { + pageMem.releasePage(grpId, curId, curPage); + } + } + } + } + + delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), cacheSizes); + + globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); } } finally { - pageMem.readUnlock(cacheId, partMetaId, partMetaPage); + pageMem.readUnlock(grpId, partMetaId, partMetaPage); } } finally { - pageMem.releasePage(cacheId, partMetaId, partMetaPage); + pageMem.releasePage(grpId, partMetaId, partMetaPage); } delegate = delegate0; @@ -838,15 +979,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * @return Partition metas. */ private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { - PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = cctx.shared().wal(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); + IgniteWriteAheadLogManager wal = ctx.wal(); - int cacheId = cctx.cacheId(); - long partMetaId = pageMem.partitionMetaPageId(cacheId, partId); - long partMetaPage = pageMem.acquirePage(cacheId, partMetaId); + int grpId = grp.groupId(); + long partMetaId = pageMem.partitionMetaPageId(grpId, partId); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { boolean allocated = false; - long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage); + long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); try { long treeRoot, reuseListRoot; @@ -857,8 +998,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple io.initNewPage(pageAddr, partMetaId, pageMem.pageSize()); - treeRoot = pageMem.allocatePage(cacheId, partId, PageMemory.FLAG_DATA); - reuseListRoot = pageMem.allocatePage(cacheId, partId, PageMemory.FLAG_DATA); + treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); + reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA; assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA; @@ -866,9 +1007,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple io.setTreeRoot(pageAddr, treeRoot); io.setReuseListRoot(pageAddr, reuseListRoot); - if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, partMetaId, partMetaPage, wal, null)) + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) wal.log(new MetaPageInitRecord( - cctx.cacheId(), + grpId, partMetaId, io.getType(), io.getVersion(), @@ -885,21 +1026,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple reuseListRoot = io.getReuseListRoot(pageAddr); assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA : - U.hexLong(treeRoot) + ", part=" + partId + ", cacheId=" + cacheId; + U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId; assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA : - U.hexLong(reuseListRoot) + ", part=" + partId + ", cacheId=" + cacheId; + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId; } return new Metas( - new RootPage(new FullPageId(treeRoot, cacheId), allocated), - new RootPage(new FullPageId(reuseListRoot, cacheId), allocated)); + new RootPage(new FullPageId(treeRoot, grpId), allocated), + new RootPage(new FullPageId(reuseListRoot, grpId), allocated)); } finally { - pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, allocated); + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated); } } finally { - pageMem.releasePage(cacheId, partMetaId, partMetaPage); + pageMem.releasePage(grpId, partMetaId, partMetaPage); } } @@ -921,11 +1062,35 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public int size() { + @Override public int fullSize() { + try { + CacheDataStore delegate0 = init0(true); + + return delegate0 == null ? 0 : delegate0.fullSize(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public int cacheSize(int cacheId) { + try { + CacheDataStore delegate0 = init0(true); + + return delegate0 == null ? 0 : delegate0.cacheSize(cacheId); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Map cacheSizes() { try { CacheDataStore delegate0 = init0(true); - return delegate0 == null ? 0 : delegate0.size(); + return delegate0 == null ? null : delegate0.cacheSizes(); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -945,7 +1110,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void init(long size, long updCntr) { + @Override public void init(long size, long updCntr, @Nullable Map cacheSizes) { throw new IllegalStateException("Should be never called."); } @@ -1001,6 +1166,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void update( + GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, @@ -1009,47 +1175,51 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple ) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.update(key, val, ver, expireTime, oldRow); + delegate.update(cctx, key, val, ver, expireTime, oldRow); } /** {@inheritDoc} */ - @Override public void updateIndexes(KeyCacheObject key) throws IgniteCheckedException { + @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.updateIndexes(key); + delegate.updateIndexes(cctx, key); } /** {@inheritDoc} */ - @Override public CacheDataRow createRow(KeyCacheObject key, + @Override public CacheDataRow createRow( + GridCacheContext cctx, + KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - return delegate.createRow(key, val, ver, expireTime, oldRow); + return delegate.createRow(cctx, key, val, ver, expireTime, oldRow); } /** {@inheritDoc} */ - @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException { + @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) + throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.invoke(key, c); + delegate.invoke(cctx, key, c); } /** {@inheritDoc} */ - @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException { + @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) + throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.remove(key, partId); + delegate.remove(cctx, key, partId); } /** {@inheritDoc} */ - @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException { + @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.find(key); + return delegate.find(cctx, key); return null; } @@ -1065,12 +1235,28 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public GridCursor cursor(KeyCacheObject lower, + @Override public GridCursor cursor( + int cacheId, + KeyCacheObject lower, KeyCacheObject upper) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.cursor(lower, upper); + return delegate.cursor(cacheId, lower, upper); + + return EMPTY_CURSOR; + } + + /** {@inheritDoc} */ + @Override public GridCursor cursor(int cacheId, + KeyCacheObject lower, + KeyCacheObject upper, + Object x) + throws IgniteCheckedException { + CacheDataStore delegate = init0(true); + + if (delegate != null) + return delegate.cursor(cacheId, lower, upper, x); return EMPTY_CURSOR; } @@ -1079,6 +1265,24 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple @Override public void destroy() throws IgniteCheckedException { // No need to destroy delegate. } + + /** {@inheritDoc} */ + @Override public GridCursor cursor(int cacheId) throws IgniteCheckedException { + CacheDataStore delegate = init0(true); + + if (delegate != null) + return delegate.cursor(cacheId); + + return EMPTY_CURSOR; + } + + /** {@inheritDoc} */ + @Override public void clear(int cacheId) throws IgniteCheckedException { + CacheDataStore delegate = init0(true); + + if (delegate != null) + delegate.clear(cacheId); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java index c233b1e..1343589 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java @@ -29,7 +29,7 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -43,10 +43,10 @@ import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.store.PageStore; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.database.IgniteCacheSnapshotManager; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.jdk.JdkMarshaller; @@ -72,6 +72,9 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen public static final String CACHE_DIR_PREFIX = "cache-"; /** */ + public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-"; + + /** */ public static final String CACHE_CONF_FILENAME = "conf.dat"; /** Marshaller. */ @@ -93,7 +96,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen private final long metaPageId = PageIdUtils.pageId(-1, PageMemory.FLAG_IDX, 0); /** */ - private final Set cachesWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap()); + private final Set grpsWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap()); /** * @param ctx Kernal context. @@ -189,25 +192,59 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public void initializeForCache(CacheConfiguration ccfg) throws IgniteCheckedException { - int cacheId = CU.cacheId(ccfg.getName()); + @Override public void initializeForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) + throws IgniteCheckedException { + int grpId = grpDesc.groupId(); - if (!idxCacheStores.containsKey(cacheId)) { - CacheStoreHolder holder = initForCache(ccfg); + if (!idxCacheStores.containsKey(grpId)) { + CacheStoreHolder holder = initForCache(grpDesc, ccfg); - CacheStoreHolder old = idxCacheStores.put(cacheId, holder); + CacheStoreHolder old = idxCacheStores.put(grpId, holder); assert old == null : "Non-null old store holder for cache: " + ccfg.getName(); } + + storeCacheConfiguration(grpDesc, ccfg); + } + + /** + * @param grpDesc Cache group descriptor. + * @param ccfg Cache configuration. + * @throws IgniteCheckedException If failed. + */ + private void storeCacheConfiguration(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) + throws IgniteCheckedException { + File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg); + File file; + + assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir; + + if (grpDesc.sharedGroup()) + file = new File(cacheWorkDir, ccfg.getName() + CACHE_CONF_FILENAME); + else + file = new File(cacheWorkDir, CACHE_CONF_FILENAME); + + if (!file.exists() || file.length() == 0) { + try { + file.createNewFile(); + + try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(file))) { + marshaller.marshal(ccfg, stream); + } + } + catch (IOException ex) { + throw new IgniteCheckedException("Failed to persist cache configuration: " + ccfg.getName(), ex); + } + } } /** {@inheritDoc} */ - @Override public void shutdownForCache(GridCacheContext cacheCtx, boolean destroy) throws IgniteCheckedException { - cachesWithoutIdx.remove(cacheCtx.cacheId()); + @Override public void shutdownForCacheGroup(CacheGroupContext grp, boolean destroy) throws IgniteCheckedException { + grpsWithoutIdx.remove(grp.groupId()); - CacheStoreHolder old = idxCacheStores.remove(cacheCtx.cacheId()); + CacheStoreHolder old = idxCacheStores.remove(grp.groupId()); - assert old != null : "Missing cache store holder [cache=" + cacheCtx.name() + + assert old != null : "Missing cache store holder [cache=" + grp.cacheOrGroupName() + ", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.igniteInstanceName() + ']'; IgniteCheckedException ex = shutdown(old, /*clean files if destroy*/destroy, null); @@ -217,17 +254,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public void onPartitionCreated(int cacheId, int partId) throws IgniteCheckedException { + @Override public void onPartitionCreated(int grpId, int partId) throws IgniteCheckedException { // No-op. } /** {@inheritDoc} */ - @Override public void onPartitionDestroyed(int cacheId, int partId, int tag) throws IgniteCheckedException { + @Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException { assert partId <= PageIdAllocator.MAX_PARTITION_ID; - PageStore store = getStore(cacheId, partId); + PageStore store = getStore(grpId, partId); - assert store instanceof FilePageStore; + assert store instanceof FilePageStore : store; ((FilePageStore)store).truncate(tag); } @@ -253,8 +290,8 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public boolean exists(int cacheId, int partId) throws IgniteCheckedException { - PageStore store = getStore(cacheId, partId); + @Override public boolean exists(int grpId, int partId) throws IgniteCheckedException { + PageStore store = getStore(grpId, partId); return store.exists(); } @@ -296,12 +333,31 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** + * @param grpDesc Cache group descriptor. + * @param ccfg Cache configuration. + * @return Cache work directory. + */ + private File cacheWorkDirectory(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) { + String dirName; + + if (grpDesc.sharedGroup()) + dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName(); + else + dirName = CACHE_DIR_PREFIX + ccfg.getName(); + + return new File(storeWorkDir, dirName); + } + + /** + * @param grpDesc Cache group descriptor. * @param ccfg Cache configuration. * @return Cache store holder. * @throws IgniteCheckedException If failed. */ - private CacheStoreHolder initForCache(CacheConfiguration ccfg) throws IgniteCheckedException { - File cacheWorkDir = new File(storeWorkDir, CACHE_DIR_PREFIX + ccfg.getName()); + private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException { + assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName(); + + File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg); boolean dirExisted = false; @@ -358,32 +414,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen dirExisted = true; } - File file = new File(cacheWorkDir, CACHE_CONF_FILENAME); - - if (!file.exists() || file.length() == 0) { - try { - file.createNewFile(); - - try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(file))) { - marshaller.marshal(ccfg, stream); - } - } - catch (IOException ex) { - throw new IgniteCheckedException("Failed to persist cache configuration: " + ccfg.getName(), ex); - } - } - File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME); if (dirExisted && !idxFile.exists()) - cachesWithoutIdx.add(CU.cacheId(ccfg.getName())); + grpsWithoutIdx.add(grpDesc.groupId()); FilePageStore idxStore = new FilePageStore( PageMemory.FLAG_IDX, idxFile, cctx.kernalContext().config().getMemoryConfiguration()); - FilePageStore[] partStores = new FilePageStore[ccfg.getAffinity().partitions()]; + FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()]; for (int partId = 0; partId < partStores.length; partId++) { FilePageStore partStore = new FilePageStore( @@ -432,52 +473,74 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public Set savedCacheNames() { + @Override public Map readCacheConfigurations() throws IgniteCheckedException { if (cctx.kernalContext().clientNode()) - return Collections.emptySet(); + return Collections.emptyMap(); File[] files = storeWorkDir.listFiles(); if (files == null) - return Collections.emptySet(); + return Collections.emptyMap(); - Set cacheNames = new HashSet<>(); + Map ccfgs = new HashMap<>(); for (File file : files) { - if (file.isDirectory() && file.getName().startsWith(CACHE_DIR_PREFIX)) { - File conf = new File(file, CACHE_CONF_FILENAME); - if (conf.exists() && conf.length() > 0) { - String name = file.getName().substring(CACHE_DIR_PREFIX.length()); + if (file.isDirectory()) { + if (file.getName().startsWith(CACHE_DIR_PREFIX)) { + File conf = new File(file, CACHE_CONF_FILENAME); - // TODO remove when fixed null cache names. - if ("null".equals(name)) - name = null; + if (conf.exists() && conf.length() > 0) { + CacheConfiguration ccfg = readCacheConfig(conf); - cacheNames.add(name); + ccfgs.put(ccfg.getName(), ccfg); + } } + else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX)) + readCacheGroupCaches(file, ccfgs); } } - return cacheNames; + return ccfgs; } - /** {@inheritDoc} */ - @Override public CacheConfiguration readConfiguration(String cacheName) { - File file = new File(storeWorkDir, CACHE_DIR_PREFIX + cacheName); + /** + * @param grpDir Group directory. + * @param ccfgs Cache configurations. + * @throws IgniteCheckedException If failed. + */ + private void readCacheGroupCaches(File grpDir, Map ccfgs) throws IgniteCheckedException { + File[] files = grpDir.listFiles(); - assert file.exists() && file.isDirectory(); + if (files == null) + return; - try (InputStream stream = new BufferedInputStream(new FileInputStream(new File(file, CACHE_CONF_FILENAME)))) { + for (File file : files) { + if (!file.isDirectory() && file.getName().endsWith(CACHE_CONF_FILENAME) && file.length() > 0) { + CacheConfiguration ccfg = readCacheConfig(file); + + ccfgs.put(ccfg.getName(), ccfg); + } + } + } + + /** + * @param conf File with stored configuration. + * @return Cache configuration. + * @throws IgniteCheckedException If failed. + */ + private CacheConfiguration readCacheConfig(File conf) throws IgniteCheckedException { + try (InputStream stream = new BufferedInputStream(new FileInputStream(conf))) { return marshaller.unmarshal(stream, U.resolveClassLoader(igniteCfg)); } - catch (IOException | IgniteCheckedException e) { - throw new IllegalStateException("Failed to read cache configuration from disk for cache: " + cacheName, e); + catch (IOException e) { + throw new IgniteCheckedException("Failed to read cache configuration from disk for cache: " + + conf.getAbsolutePath(), e); } } /** {@inheritDoc} */ - @Override public boolean hasIndexStore(int cacheId) { - return !cachesWithoutIdx.contains(cacheId); + @Override public boolean hasIndexStore(int grpId) { + return !grpsWithoutIdx.contains(grpId); } /** @@ -488,11 +551,14 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** - * @param cacheName Cache name. + * @param ccfg Cache configuration. * @return Store dir for given cache. */ - public File cacheWorkDir(String cacheName) { - return new File(storeWorkDir, "cache-" + cacheName); + public File cacheWorkDir(CacheConfiguration ccfg) { + String dirName = ccfg.getGroupName() == null ? + CACHE_DIR_PREFIX + ccfg.getName() : CACHE_GRP_DIR_PREFIX + ccfg.getGroupName(); + + return new File(storeWorkDir, dirName); } /** @@ -547,19 +613,19 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param partId Partition ID. * @return Page store for the corresponding parameters. * @throws IgniteCheckedException If cache or partition with the given ID was not created. * * Note: visible for testing. */ - public PageStore getStore(int cacheId, int partId) throws IgniteCheckedException { - CacheStoreHolder holder = idxCacheStores.get(cacheId); + public PageStore getStore(int grpId, int partId) throws IgniteCheckedException { + CacheStoreHolder holder = idxCacheStores.get(grpId); if (holder == null) throw new IgniteCheckedException("Failed to get page store for the given cache ID " + - "(cache has not been started): " + cacheId); + "(cache has not been started): " + grpId); if (partId == PageIdAllocator.INDEX_PARTITION) return holder.idxStore; @@ -571,7 +637,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen if (store == null) throw new IgniteCheckedException("Failed to get page store for the given partition ID " + - "(partition has not been created) [cacheId=" + cacheId + ", partId=" + partId + ']'); + "(partition has not been created) [grpId=" + grpId + ", partId=" + partId + ']'); return store; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java index e8ae554..ef84d83 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java @@ -131,16 +131,16 @@ public interface PageMemoryEx extends PageMemory { public int invalidate(int cacheId, int partId); /** - * Clears internal metadata of destroyed cache. + * Clears internal metadata of destroyed cache group. * - * @param cacheId Cache ID. + * @param grpId Cache group ID. */ - public void onCacheDestroyed(int cacheId); + public void onCacheGroupDestroyed(int grpId); /** * Asynchronously clears pages satisfying the given predicate. * - * @param pred Predicate for cacheId, pageId and partition tag. + * @param pred Predicate for cache group id, pageId and partition tag. * @param cleanDirty Flag indicating that dirty pages collection should be cleaned. * @return Future that will be completed when all pages are cleared. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java index c05af57..f807229 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java @@ -960,12 +960,12 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public void onCacheDestroyed(int cacheId) { + @Override public void onCacheGroupDestroyed(int grpId) { for (Segment seg : segments) { seg.writeLock().lock(); try { - seg.resetPartTags(cacheId); + seg.resetPartTags(grpId); } finally { seg.writeLock().unlock(); @@ -1892,9 +1892,9 @@ public class PageMemoryImpl implements PageMemoryEx { } /** - * @param cacheId Cache id. + * @param grpId Cache group id. */ - private void resetPartTags(int cacheId) { + private void resetPartTags(int grpId) { assert getWriteHoldCount() > 0; Iterator> iter = partTagMap.keySet().iterator(); @@ -1902,7 +1902,7 @@ public class PageMemoryImpl implements PageMemoryEx { while (iter.hasNext()) { T2 t = iter.next(); - if (t.get1() == cacheId) + if (t.get1() == grpId) iter.remove(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java index f67f617..4f532ae 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java @@ -197,6 +197,7 @@ public class RecordV1Serializer implements RecordSerializer { buf.putLong(partDataRec.updateCounter()); buf.putLong(partDataRec.globalRemoveId()); buf.putInt(partDataRec.partitionSize()); + buf.putLong(partDataRec.countersPageId()); buf.put(partDataRec.state()); buf.putInt(partDataRec.allocatedIndexCandidate()); @@ -222,7 +223,7 @@ public class RecordV1Serializer implements RecordSerializer { buf.putInt(walPtr.length()); } - putCacheStates(buf, cpRec.cacheStates()); + putCacheStates(buf, cpRec.cacheGroupStates()); buf.put(cpRec.end() ? (byte)1 : 0); @@ -730,7 +731,7 @@ public class RecordV1Serializer implements RecordSerializer { CheckpointRecord cpRec = new CheckpointRecord(new UUID(msb, lsb), walPtr, end); - cpRec.cacheStates(states); + cpRec.cacheGroupStates(states); res = cpRec; @@ -756,10 +757,11 @@ public class RecordV1Serializer implements RecordSerializer { long updCntr = in.readLong(); long rmvId = in.readLong(); int partSize = in.readInt(); + long countersPageId = in.readLong(); byte state = in.readByte(); int allocatedIdxCandidate = in.readInt(); - res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId, partSize, state, allocatedIdxCandidate); + res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId, partSize, countersPageId, state, allocatedIdxCandidate); break; @@ -1234,7 +1236,7 @@ public class RecordV1Serializer implements RecordSerializer { assert cpRec.checkpointMark() == null || cpRec.checkpointMark() instanceof FileWALPointer : "Invalid WAL record: " + cpRec; - int cacheStatesSize = cacheStatesSize(cpRec.cacheStates()); + int cacheStatesSize = cacheStatesSize(cpRec.cacheGroupStates()); FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark(); @@ -1244,7 +1246,7 @@ public class RecordV1Serializer implements RecordSerializer { return 1 + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2 + /*ioVer*/2 + /*tree root*/8 + /*reuse root*/8 + /*CRC*/4; case PARTITION_META_PAGE_UPDATE_COUNTERS: - return 1 + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*state*/ 1 + return 1 + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*counters page id*/8 + /*state*/ 1 + /*allocatedIdxCandidate*/ 4 + /*CRC*/4; case MEMORY_RECOVERY: http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java new file mode 100644 index 0000000..7a6b34c --- /dev/null +++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.database; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.Cache; +import javax.cache.expiry.ExpiryPolicy; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.BinaryConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.internal.processors.cache.database.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String GROUP1 = "grp1"; + + /** */ + private static final String GROUP2 = "grp2"; + + /** */ + private CacheConfiguration[] ccfgs; + + /** */ + private boolean activeOnStart = true; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(FileWriteAheadLogManager.IGNITE_PDS_WAL_MODE, "LOG_ONLY"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(FileWriteAheadLogManager.IGNITE_PDS_WAL_MODE); + + GridTestUtils.deleteDbFiles(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + GridTestUtils.deleteDbFiles(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setConsistentId(gridName); + + cfg.setActiveOnStart(activeOnStart); + + MemoryConfiguration memCfg = new MemoryConfiguration(); + memCfg.setPageSize(1024); + memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024); + + cfg.setMemoryConfiguration(memCfg); + + cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration()); + + cfg.setBinaryConfiguration(new BinaryConfiguration().setCompactFooter(false)); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + if (ccfgs != null) { + cfg.setCacheConfiguration(ccfgs); + + ccfgs = null; + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + GridTestUtils.deleteDbFiles(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testClusterRestartStaticCaches1() throws Exception { + clusterRestart(1, true); + } + + /** + * @throws Exception If failed. + */ + public void testClusterRestartStaticCaches2() throws Exception { + clusterRestart(3, true); + } + + /** + * @throws Exception If failed. + */ + public void testClusterRestartDynamicCaches1() throws Exception { + clusterRestart(1, false); + } + + /** + * @throws Exception If failed. + */ + public void testClusterRestartDynamicCaches2() throws Exception { + clusterRestart(3, false); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testClusterRestartCachesWithH2Indexes() throws Exception { + CacheConfiguration[] ccfgs1 = new CacheConfiguration[5]; + + // Several caches with the same indexed type (and index names). + ccfgs1[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1). + setIndexedTypes(Integer.class, Person.class); + ccfgs1[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1). + setIndexedTypes(Integer.class, Person.class); + ccfgs1[2] = cacheConfiguration(GROUP2, "c3", PARTITIONED, ATOMIC, 1). + setIndexedTypes(Integer.class, Person.class); + ccfgs1[3] = cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1). + setIndexedTypes(Integer.class, Person.class); + ccfgs1[4] = cacheConfiguration(null, "c5", PARTITIONED, ATOMIC, 1). + setIndexedTypes(Integer.class, Person.class); + + String[] caches = {"c1", "c2", "c3", "c4", "c5"}; + + startGrids(3); + + Ignite node = ignite(0); + + node.createCaches(Arrays.asList(ccfgs1)); + + putPersons(caches, node); + + checkPersons(caches, node); + checkPersonsQuery(caches, node); + + stopAllGrids(); + + startGrids(3); + + awaitPartitionMapExchange(); + + node = ignite(0); + + checkPersons(caches, node); + checkPersonsQuery(caches, node); + + Random rnd = ThreadLocalRandom.current(); + + int idx = rnd.nextInt(caches.length); + + String cacheName = caches[idx]; + CacheConfiguration cacheCfg = ccfgs1[idx]; + + node.destroyCache(cacheName); + + node.createCache(cacheCfg); + + putPersons(new String[]{cacheName}, node); + + checkPersons(caches, node); + checkPersonsQuery(caches, node); + } + + /** + * @throws Exception If failed. + */ + public void _testExpiryPolicy() throws Exception { + long ttl = 10000; + + activeOnStart = false; + + CacheConfiguration[] ccfgs1 = new CacheConfiguration[5]; + + ccfgs1[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1); + ccfgs1[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1); + ccfgs1[2] = cacheConfiguration(GROUP2, "c3", PARTITIONED, ATOMIC, 1); + ccfgs1[3] = cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1); + ccfgs1[4] = cacheConfiguration(null, "c5", PARTITIONED, ATOMIC, 1); + + String[] caches = {"c1", "c2", "c3", "c4", "c5"}; + + startGrids(3); + + Ignite node = ignite(0); + + node.active(true); + + node.createCaches(Arrays.asList(ccfgs1)); + + ExpiryPolicy plc = new PlatformExpiryPolicy(ttl, -2, -2); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName).withExpiryPolicy(plc); + + for (int i = 0; i < 10; i++) + cache.put(i, cacheName + i); + } + + long deadline = System.currentTimeMillis() + (long)(ttl * 1.2); + + stopAllGrids(); + + startGrids(3); + + node = ignite(0); + + node.active(true); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + for (int i = 0; i < 10; i++) + assertEquals(cacheName + i, cache.get(i)); + + assertEquals(10, cache.size()); + } + + // Wait for expiration. + Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0)); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + assertEquals(0, cache.size()); + } + } + + /** + * @throws Exception If failed. + */ + public void testCreateDropCache() throws Exception { + ccfgs = new CacheConfiguration[]{cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1) + .setIndexedTypes(Integer.class, Person.class)}; + + Ignite ignite = startGrid(); + + ignite.cache("c1").destroy(); + + stopGrid(); + } + + /** + * @throws Exception If failed. + */ + public void testCreateDropCache1() throws Exception { + CacheConfiguration ccfg1 = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1); + + CacheConfiguration ccfg2 = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 1); + + Ignite ignite = startGrid(); + + ignite.createCaches(Arrays.asList(ccfg1, ccfg2)); + + + ignite.cache("c1").destroy(); + + ignite.cache("c2").destroy(); + + ignite.createCache(ccfg1); + ignite.createCache(ccfg2); + + stopGrid(); + } + + /** + * @throws Exception If failed. + */ + public void testCreateDropCache2() throws Exception { + CacheConfiguration ccfg1 = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1) + .setIndexedTypes(Integer.class, Person.class); + + CacheConfiguration ccfg2 = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 1) + .setIndexedTypes(Integer.class, Person.class); + + Ignite ignite = startGrid(); + + ignite.createCaches(Arrays.asList(ccfg1, ccfg2)); + + ignite.cache("c1").destroy(); + + ignite.createCache(ccfg1); + + stopGrid(); + } + + /** + * @param caches Cache names to put data into. + * @param node Ignite node. + */ + private void putPersons(String[] caches, Ignite node) { + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + for (int i = 0; i < 10; i++) + cache.put(i, new Person("" + i, cacheName)); + } + } + + /** + * @param caches Cache names to invoke a query against to. + * @param node Ignite node. + */ + private void checkPersons(String[] caches, Ignite node) { + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + for (int i = 0; i < 10; i++) + assertEquals(new Person("" + i, cacheName), cache.get(i)); + + assertEquals(10, cache.size()); + } + } + + /** + * @param caches Cache names to invoke a query against to. + * @param node Ignite node. + */ + private void checkPersonsQuery(String[] caches, Ignite node) { + SqlQuery qry = new SqlQuery<>( + Person.class, "SELECT p.* FROM Person p WHERE p.lname=? ORDER BY p.fname"); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + List> persons = cache.query(qry.setArgs(cacheName)).getAll(); + + for (int i = 0; i < 10; i++) + assertEquals(new Person("" + i, cacheName), persons.get(i).getValue()); + + assertEquals(10, persons.size()); + } + } + + /** + * @param nodes Nodes number. + * @param staticCaches {@code True} if caches should be statically configured. + * @throws Exception If failed. + */ + private void clusterRestart(int nodes, boolean staticCaches) throws Exception { + CacheConfiguration[] ccfgs = new CacheConfiguration[5]; + + ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1); + ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1); + ccfgs[2] = cacheConfiguration(GROUP2, "c3", PARTITIONED, ATOMIC, 1); + ccfgs[3] = cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1); + ccfgs[4] = cacheConfiguration(null, "c5", PARTITIONED, ATOMIC, 1); + + String[] caches = {"c1", "c2", "c3", "c4", "c5"}; + + for (int i = 0; i < nodes; i++) { + if (staticCaches) + this.ccfgs = ccfgs; + + startGrid(i); + } + + Ignite node = ignite(0); + + if (!staticCaches) + node.createCaches(Arrays.asList(ccfgs)); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + for (int i = 0; i < 10; i++) { + cache.put(i, cacheName + i); + + assertEquals(cacheName + i, cache.get(i)); + } + + assertEquals(10, cache.size()); + } + + stopAllGrids(); + + node = startGrids(nodes); + + awaitPartitionMapExchange(); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + for (int i = 0; i < 10; i++) + assertEquals(cacheName + i, cache.get(i)); + + assertEquals(10, cache.size()); + } + } + + /** + * @param grpName Cache group name. + * @param name Cache name. + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + * @param backups Backups number. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration( + String grpName, + String name, + CacheMode cacheMode, + CacheAtomicityMode atomicityMode, + int backups + ) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setGroupName(grpName); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setBackups(backups); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } + + /** + * + */ + static class Person implements Serializable { + /** */ + @GridToStringInclude + @QuerySqlField(index = true, groups = "full_name") + String fName; + + /** */ + @GridToStringInclude + @QuerySqlField(index = true, groups = "full_name") + String lName; + + /** + * @param fName First name. + * @param lName Last name. + */ + public Person(String fName, String lName) { + this.fName = fName; + this.lName = lName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Person person = (Person)o; + return Objects.equals(fName, person.fName) && + Objects.equals(lName, person.lName); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(fName, lName); + } + } +}