Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7FC10200C84 for ; Mon, 29 May 2017 15:23:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 733ED160BCE; Mon, 29 May 2017 13:23:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E7D4C160BC2 for ; Mon, 29 May 2017 15:22:58 +0200 (CEST) Received: (qmail 72120 invoked by uid 500); 29 May 2017 13:22:58 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 72109 invoked by uid 99); 29 May 2017 13:22:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 May 2017 13:22:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C6586DFC2E; Mon, 29 May 2017 13:22:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-5075 Date: Mon, 29 May 2017 13:22:57 +0000 (UTC) archived-at: Mon, 29 May 2017 13:23:00 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5075 63fa2bd19 -> 3733f6aaa ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3733f6aa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3733f6aa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3733f6aa Branch: refs/heads/ignite-5075 Commit: 3733f6aaa464bcf0474e69615c92d8ddc1f72ac3 Parents: 63fa2bd Author: sboikov Authored: Mon May 29 13:42:07 2017 +0300 Committer: sboikov Committed: Mon May 29 16:22:50 2017 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 217 +++++---- .../processors/cache/IgniteCacheGroupsTest.java | 436 +++++++++++++++++-- 2 files changed, 531 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3733f6aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index f2c0206..4a501a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -56,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -130,11 +127,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** */ @GridToStringExclude - private final ConcurrentMap> cachesEntryMaps; - - /** */ - @GridToStringExclude - private final ConcurrentMap cacheSizes; + private final ConcurrentMap cacheMaps; /** */ @GridToStringExclude @@ -184,13 +177,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (grp.sharedGroup()) { singleCacheEntryMap = null; - cachesEntryMaps = new ConcurrentHashMap<>(); - cacheSizes = new ConcurrentHashMap<>(); + cacheMaps = new ConcurrentHashMap<>(); } else { singleCacheEntryMap = createEntriesMap(); - cachesEntryMaps = null; - cacheSizes = null; + cacheMaps = null; } rent = new GridFutureAdapter() { @@ -231,17 +222,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements private AtomicInteger cacheSizeCounter(int cacheId) { assert grp.sharedGroup(); - AtomicInteger cntr = cacheSizes.get(cacheId); - - if (cntr != null) - return cntr; - - AtomicInteger old = cacheSizes.putIfAbsent(cacheId, cntr = new AtomicInteger()); - - if (old != null) - cntr = old; + CacheMapHolder hld = cacheMapHolder(cacheId, true); - return cntr; + return hld.size; } /** {@inheritDoc} */ @@ -249,8 +232,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (grp.sharedGroup()) { int size = 0; - for (ConcurrentMap map : cachesEntryMaps.values()) - size += map.size(); + for (CacheMapHolder hld : cacheMaps.values()) + size += hld.map.size(); return size; } @@ -261,24 +244,36 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** {@inheritDoc} */ @Override protected ConcurrentMap entriesMap(int cacheId, boolean create) { if (grp.sharedGroup()) { - ConcurrentMap map = cachesEntryMaps.get(cacheId); + CacheMapHolder hld = cacheMapHolder(cacheId, create); - if (map != null) - return map; + return hld != null ? hld.map : null; + } - if (!create) - return null; + return singleCacheEntryMap; + } - ConcurrentMap old = - cachesEntryMaps.putIfAbsent(cacheId, map = createEntriesMap()); + /** + * @param cacheId Cache ID. + * @param create Create flag. + * @return Map holder. + */ + private CacheMapHolder cacheMapHolder(int cacheId, boolean create) { + assert grp.sharedGroup(); - if (old != null) - map = old; + CacheMapHolder hld = cacheMaps.get(cacheId); - return map; - } + if (hld != null) + return hld; - return singleCacheEntryMap; + if (!create) + return null; + + CacheMapHolder old = cacheMaps.putIfAbsent(cacheId, hld = new CacheMapHolder(createEntriesMap())); + + if (old != null) + hld = old; + + return hld; } /** @@ -968,63 +963,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements public void clearAll() throws NodeStoppingException { GridCacheVersion clearVer = ctx.versions().next(); - boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - - Collection> maps = - grp.sharedGroup() ? cachesEntryMaps.values() : Collections.singleton(singleCacheEntryMap); - GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); - for (ConcurrentMap map : maps) { - Iterator it = map.values().iterator(); - - while (it.hasNext()) { - GridCacheMapEntry cached = null; - - ctx.database().checkpointReadLock(); - - try { - cached = it.next(); - - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { - removeEntry(cached); - - if (!cached.isInternal()) { - if (rec) { - grp.addCacheEvent(cached.partition(), - cached.key(), - ctx.localNodeId(), - EVT_CACHE_REBALANCE_OBJECT_UNLOADED, - null, - false, - cached.rawGet(), - cached.hasValue(), - false); - } - } - } - } - catch (GridDhtInvalidPartitionException e) { - assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; - - break; // Partition is already concurrently cleared and evicted. - } - catch (NodeStoppingException e) { - if (log.isDebugEnabled()) - log.debug("Failed to clear cache entry for evicted partition: " + cached.partition()); - - rent.onDone(e); + boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - throw e; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); - } - finally { - ctx.database().checkpointReadUnlock(); - } - } + if (grp.sharedGroup()) { + for (CacheMapHolder hld : cacheMaps.values()) + clear(hld.map, extras, rec); } + else + clear(singleCacheEntryMap, extras, rec); if (!grp.allowFastEviction()) { GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext().dhtCache().context(); @@ -1091,6 +1039,65 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param map Map to clear. + * @param extras Obsolete extras. + * @param evt Unload event flag. + * @throws NodeStoppingException + */ + private void clear(ConcurrentMap map, + GridCacheObsoleteEntryExtras extras, + boolean evt) throws NodeStoppingException { + Iterator it = map.values().iterator(); + + while (it.hasNext()) { + GridCacheMapEntry cached = null; + + ctx.database().checkpointReadLock(); + + try { + cached = it.next(); + + if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) { + removeEntry(cached); + + if (!cached.isInternal()) { + if (evt) { + grp.addCacheEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + false); + } + } + } + } + catch (GridDhtInvalidPartitionException e) { + assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; + + break; // Partition is already concurrently cleared and evicted. + } + catch (NodeStoppingException e) { + if (log.isDebugEnabled()) + log.debug("Failed to clear cache entry for evicted partition: " + cached.partition()); + + rent.onDone(e); + + throw e; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + } + + /** * */ private void clearDeferredDeletes() { @@ -1129,8 +1136,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** {@inheritDoc} */ @Override public int publicSize(int cacheId) { - if (grp.sharedGroup()) - return cacheSizeCounter(cacheId).get(); + if (grp.sharedGroup()) { + CacheMapHolder hld = cacheMaps.get(cacheId); + + return hld != null ? hld.size.get() : 0; + } return getSize(state.get()); } @@ -1176,7 +1186,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements it.remove(); } - cachesEntryMaps.remove(cacheId); + cacheMaps.remove(cacheId); } /** @@ -1293,4 +1303,27 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements return S.toString(RemovedEntryHolder.class, this); } } + + /** + * + */ + static class CacheMapHolder { + /** */ + final AtomicInteger size = new AtomicInteger(); + + /** */ + final ConcurrentMap map; + + /** + * @param map Map. + */ + CacheMapHolder(ConcurrentMap map) { + this.map = map; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheMapHolder.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3733f6aa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 1cc8999..a6e009d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -32,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.locks.Lock; import javax.cache.Cache; import javax.cache.CacheException; @@ -68,8 +72,11 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -1152,64 +1159,179 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { Ignite srv0 = startGrid(0); IgniteCache srv0Cache1 = - srv0.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false)); IgniteCache srv0Cache2 = - srv0.createCache(cacheConfiguration(GROUP1, CACHE2, PARTITIONED, ATOMIC, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false)); + IgniteCache srv0Cache3 = + srv0.createCache(cacheConfiguration(GROUP2, "c3", PARTITIONED, TRANSACTIONAL, 2, false)); + IgniteCache srv0Cache4 = + srv0.createCache(cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 2, false)); + + final int ITEMS = 1_000; - for (int i = 0; i < 10; i++) + for (int i = 0; i < ITEMS; i++) { srv0Cache1.put(new Key1(i), i); - assertEquals(10, srv0Cache1.size()); - assertEquals(10, srv0Cache1.localSize()); + srv0Cache3.put(new Key1(i), i); + srv0Cache4.put(new Key1(i), -i); + } + + assertEquals(ITEMS, srv0Cache1.size()); + assertEquals(ITEMS, srv0Cache1.localSize()); assertEquals(0, srv0Cache2.size()); + assertEquals(ITEMS, srv0Cache3.size()); + assertEquals(ITEMS, srv0Cache4.localSize()); - Ignite srv1 = startGrid(1); + startGrid(1); awaitPartitionMapExchange(); - IgniteCache srv1Cache1 = srv1.cache("cache1"); - IgniteCache srv1Cache2 = srv1.cache("cache2"); + for (int i = 0; i < 2; i++) { + Ignite node = ignite(i); - assertEquals(20, srv0Cache1.size(CachePeekMode.ALL)); - assertEquals(10, srv0Cache1.localSize(CachePeekMode.ALL)); - assertEquals(0, srv0Cache2.size(CachePeekMode.ALL)); - assertEquals(0, srv0Cache2.localSize(CachePeekMode.ALL)); + IgniteCache cache1 = node.cache("c1"); + IgniteCache cache2 = node.cache("c2"); + IgniteCache cache3 = node.cache("c3"); + IgniteCache cache4 = node.cache("c4"); - assertEquals(20, srv1Cache1.size(CachePeekMode.ALL)); - assertEquals(10, srv1Cache1.localSize(CachePeekMode.ALL)); - assertEquals(0, srv1Cache2.size(CachePeekMode.ALL)); - assertEquals(0, srv1Cache2.localSize(CachePeekMode.ALL)); + assertEquals(ITEMS * 2, cache1.size(CachePeekMode.ALL)); + assertEquals(ITEMS, cache1.localSize(CachePeekMode.ALL)); + assertEquals(0, cache2.size(CachePeekMode.ALL)); + assertEquals(0, cache2.localSize(CachePeekMode.ALL)); - for (int i = 0; i < 10; i++) { - assertEquals(i, srv0Cache1.localPeek(new Key1(i))); - assertEquals(i, srv1Cache1.localPeek(new Key1(i))); + assertEquals(ITEMS * 2, cache3.size(CachePeekMode.ALL)); + assertEquals(ITEMS, cache3.localSize(CachePeekMode.ALL)); + + assertEquals(ITEMS * 2, cache4.size(CachePeekMode.ALL)); + assertEquals(ITEMS, cache4.localSize(CachePeekMode.ALL)); + + for (int k = 0; k < ITEMS; k++) { + assertEquals(i, cache1.localPeek(new Key1(i))); + assertNull(cache2.localPeek(new Key1(i))); + assertEquals(i, cache3.localPeek(new Key1(i))); + assertEquals(-i, cache4.localPeek(new Key1(i))); + } } - for (int i = 0; i < 20; i++) + for (int i = 0; i < ITEMS * 2; i++) srv0Cache2.put(new Key1(i), i + 1); Ignite srv2 = startGrid(2); awaitPartitionMapExchange(); - IgniteCache srv2Cache1 = srv2.cache("cache1"); - IgniteCache srv2Cache2 = srv2.cache("cache2"); + for (int i = 0; i < 3; i++) { + Ignite node = ignite(i); + + IgniteCache cache1 = node.cache("c1"); + IgniteCache cache2 = node.cache("c2"); + IgniteCache cache3 = node.cache("c3"); + IgniteCache cache4 = node.cache("c4"); + + assertEquals(ITEMS * 3, cache1.size(CachePeekMode.ALL)); + assertEquals(ITEMS, cache1.localSize(CachePeekMode.ALL)); + assertEquals(ITEMS * 6, cache2.size(CachePeekMode.ALL)); + assertEquals(ITEMS * 2, cache2.localSize(CachePeekMode.ALL)); + assertEquals(ITEMS, cache3.localSize(CachePeekMode.ALL)); + assertEquals(ITEMS, cache4.localSize(CachePeekMode.ALL)); + } - assertEquals(30, srv2Cache1.size(CachePeekMode.ALL)); - assertEquals(10, srv2Cache1.localSize(CachePeekMode.ALL)); - assertEquals(60, srv2Cache2.size(CachePeekMode.ALL)); - assertEquals(20, srv1Cache2.localSize(CachePeekMode.ALL)); + IgniteCache srv2Cache1 = srv2.cache("c1"); + IgniteCache srv2Cache2 = srv2.cache("c2"); - for (int i = 0; i < 10; i++) + for (int i = 0; i < ITEMS; i++) assertEquals(i, srv2Cache1.localPeek(new Key1(i))); - for (int i = 0; i < 20; i++) + for (int i = 0; i < ITEMS * 2; i++) assertEquals(i + 1, srv2Cache2.localPeek(new Key1(i))); } /** * @throws Exception If failed. */ + public void testRebalance2() throws Exception { + Ignite srv0 = startGrid(0); + + IgniteCache srv0Cache1 = + srv0.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 0, false)); + IgniteCache srv0Cache2 = + srv0.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 0, false)); + + Affinity aff = srv0.affinity("c1"); + + final int ITEMS = 2_000; + + Map c1Data = new HashMap<>(); + Map c2Data = new HashMap<>(); + + for (int i = 0; i < ITEMS; i++) { + srv0Cache1.put(i, i); + c1Data.put(i, i); + + if (i % 2 == 0) { + srv0Cache2.put(i, i); + c2Data.put(i, i); + } + } + + assertEquals(ITEMS, srv0Cache1.size()); + assertEquals(ITEMS / 2, srv0Cache2.size()); + + Ignite srv1 = startGrid(1); + + awaitPartitionMapExchange(); + + assertEquals(ITEMS, srv0Cache1.size()); + assertEquals(ITEMS / 2, srv0Cache2.size()); + + checkCacheData(c1Data, "c1"); + checkCacheData(c2Data, "c2"); + + Set srv1Parts = new HashSet<>(); + + for (Integer p : aff.primaryPartitions(srv1.cluster().localNode())) + srv1Parts.add(p); + + CacheGroupInfrastructure grpSrv0 = cacheGroup(srv0, GROUP1); + CacheGroupInfrastructure grpSrv1 = cacheGroup(srv1, GROUP1); + + for (int p = 0; p < aff.partitions(); p++) { + if (srv1Parts.contains(p)) { + GridIterator it = grpSrv0.offheap().partitionIterator(p); + assertFalse(it.hasNext()); + + it = grpSrv1.offheap().partitionIterator(p); + assertTrue(it.hasNext()); + } + else { + GridIterator it = grpSrv0.offheap().partitionIterator(p); + assertTrue(it.hasNext()); + + it = grpSrv1.offheap().partitionIterator(p); + assertFalse(it.hasNext()); + } + } + + c1Data = new HashMap<>(); + c2Data = new HashMap<>(); + + for (int i = 0; i < ITEMS; i++) { + srv0Cache1.put(i, i + 1); + c1Data.put(i, i + 1); + + if (i % 2 == 0) { + srv0Cache2.put(i, i + 1); + c2Data.put(i, i + 1); + } + } + + checkCacheData(c1Data, "c1"); + checkCacheData(c2Data, "c2"); + } + + /** + * @throws Exception If failed. + */ public void testNoKeyIntersectTx() throws Exception { testNoKeyIntersect(TRANSACTIONAL); } @@ -2472,6 +2594,260 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testCacheIdSort() throws Exception { + Ignite node = startGrid(0); + + final List caches = new ArrayList<>(3); + + caches.add(node.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false) + .setAffinity(new RendezvousAffinityFunction(false, 8)))); + caches.add(node.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 1, false) + .setAffinity(new RendezvousAffinityFunction(false, 8)))); + caches.add(node.createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, ATOMIC, 1, false) + .setAffinity(new RendezvousAffinityFunction(false, 8)))); + + Affinity aff = node.affinity("c1"); + + final List keys = new ArrayList<>(); + + for (int i = 0; i < 1_000_000; i++) { + if (aff.partition(i) == 0) { + keys.add(i); + + if (keys.size() >= 10_000) + break; + } + } + + assertEquals(10_000, keys.size()); + + final long stopTime = System.currentTimeMillis() + 10_000; + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (System.currentTimeMillis() < stopTime) { + for (int i = 0; i < 100; i++) { + IgniteCache cache = caches.get(rnd.nextInt(3)); + + Integer key = keys.get(rnd.nextInt(10_000)); + + if (rnd.nextFloat() > 0.8f) + cache.remove(key); + else + cache.put(key, key); + } + } + + return null; + } + }, 5, "update-thread"); + + CacheGroupInfrastructure grp = cacheGroup(node, GROUP1); + + Integer cacheId = null; + + GridIterator it = grp.offheap().partitionIterator(0); + + int c = 0; + + while (it.hasNext()) { + CacheDataRow row = it.next(); + + if (cacheId == null || cacheId != row.cacheId()) { + cacheId = row.cacheId(); + + c++; + } + } + + assertEquals(3, c); + } + + /** + * @throws Exception If failed. + */ + public void testRestartsAndCacheCreateDestroy() throws Exception { + final int SRVS = 5; + + startGrids(SRVS); + + client = true; + + final Ignite clientNode = startGrid(SRVS); + + client = false; + + final int CACHES = 10; + + final AtomicReferenceArray caches = new AtomicReferenceArray<>(CACHES); + + for (int i = 0; i < 10; i++) { + CacheAtomicityMode atomicityMode = i % 2 == 0 ? ATOMIC : TRANSACTIONAL; + + caches.set(i, + clientNode.createCache(cacheConfiguration(GROUP1, "c" + i, PARTITIONED, atomicityMode, 0, false))); + } + + final AtomicBoolean stop = new AtomicBoolean(); + final AtomicInteger cacheCntr = new AtomicInteger(); + + try { + for (int i = 0; i < 10; i++) { + stop.set(false); + + final AtomicReference err = new AtomicReference<>(); + + log.info("Iteration: " + i); + + IgniteInternalFuture restartFut = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + int node = rnd.nextInt(SRVS); + + log.info("Stop node: " + node); + + stopGrid(node); + + U.sleep(500); + + log.info("Start node: " + node); + + startGrid(node); + + try { + if (rnd.nextBoolean()) + awaitPartitionMapExchange(); + } + catch (Exception ignore) { + // No-op. + } + } + } + catch (Exception e){ + log.error("Unexpected error: " + e, e); + + err.set(e); + + stop.set(true); + } + } + }); + + IgniteInternalFuture cacheFut = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + int idx = rnd.nextInt(CACHES); + + IgniteCache cache = caches.get(idx); + + if (cache != null && caches.compareAndSet(idx, cache, null)) { + log.info("Destroy cache: " + cache.getName()); + + clientNode.destroyCache(cache.getName()); + + CacheAtomicityMode atomicityMode = rnd.nextBoolean() ? ATOMIC : TRANSACTIONAL; + + String name = "newName-" + cacheCntr.incrementAndGet(); + + cache = clientNode.createCache( + cacheConfiguration(GROUP1, name, PARTITIONED, atomicityMode, 0, false)); + + caches.set(idx, cache); + } + } + } + catch (Exception e){ + log.error("Unexpected error: " + e, e); + + err.set(e); + + stop.set(true); + } + } + }); + + IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + int idx = rnd.nextInt(CACHES); + + IgniteCache cache = caches.get(idx); + + if (cache != null && caches.compareAndSet(idx, cache, null)) { + for (int i = 0; i < 10; i++) + cacheOperation(rnd, cache); + + caches.set(idx, cache); + } + } + } + catch (Exception e) { + err.set(e); + + log.error("Unexpected error: " + e, e); + + stop.set(true); + } + } + }, 8, "op-thread"); + + Thread.sleep(10_000); + + stop.set(true); + + restartFut.get(); + cacheFut.get(); + opFut.get(); + + assertNull("Unexpected error during test, see log for details", err.get()); + + awaitPartitionMapExchange(); + + Set cacheIds = new HashSet<>(); + + for (int c = 0; c < CACHES; c++) { + IgniteCache cache = caches.get(c); + + assertNotNull(cache); + + assertTrue(cacheIds.add(CU.cacheId(cache.getName()))); + } + + for (int n = 0; n < SRVS; n++) { + CacheGroupInfrastructure grp = cacheGroup(ignite(n), GROUP1); + + assertNotNull(grp); + + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) { + Map cachesMap = GridTestUtils.getFieldValue(part, "cacheMaps"); + + assertTrue(cachesMap.size() <= cacheIds.size()); + + for (Integer cacheId : cachesMap.keySet()) + assertTrue(cachesMap.containsKey(cacheId)); + } + } + } + } + finally { + stop.set(true); + } + } + + /** * @param cntr Counter. * @param expEvts Expected events number. * @throws Exception If failed. @@ -2893,8 +3269,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { * @param grpName Cache group name. * @return Cache group. */ - private CacheGroupInfrastructure cacheGroup(IgniteKernal node, String grpName) { - for (CacheGroupInfrastructure grp : node.context().cache().cacheGroups()) { + private CacheGroupInfrastructure cacheGroup(Ignite node, String grpName) { + for (CacheGroupInfrastructure grp : ((IgniteKernal)node).context().cache().cacheGroups()) { if (grpName.equals(grp.name())) return grp; }