ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5075
Date Mon, 29 May 2017 13:22:57 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 63fa2bd19 -> 3733f6aaa


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3733f6aa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3733f6aa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3733f6aa

Branch: refs/heads/ignite-5075
Commit: 3733f6aaa464bcf0474e69615c92d8ddc1f72ac3
Parents: 63fa2bd
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 29 13:42:07 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 29 16:22:50 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLocalPartition.java  | 217 +++++----
 .../processors/cache/IgniteCacheGroupsTest.java | 436 +++++++++++++++++--
 2 files changed, 531 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3733f6aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index f2c0206..4a501a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -56,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -130,11 +127,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
 
     /** */
     @GridToStringExclude
-    private final ConcurrentMap<Integer, ConcurrentMap<KeyCacheObject, GridCacheMapEntry>>
cachesEntryMaps;
-
-    /** */
-    @GridToStringExclude
-    private final ConcurrentMap<Integer, AtomicInteger> cacheSizes;
+    private final ConcurrentMap<Integer, CacheMapHolder> cacheMaps;
 
     /** */
     @GridToStringExclude
@@ -184,13 +177,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
 
         if (grp.sharedGroup()) {
             singleCacheEntryMap = null;
-            cachesEntryMaps = new ConcurrentHashMap<>();
-            cacheSizes = new ConcurrentHashMap<>();
+            cacheMaps = new ConcurrentHashMap<>();
         }
         else {
             singleCacheEntryMap = createEntriesMap();
-            cachesEntryMaps = null;
-            cacheSizes = null;
+            cacheMaps = null;
         }
 
         rent = new GridFutureAdapter<Object>() {
@@ -231,17 +222,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
     private AtomicInteger cacheSizeCounter(int cacheId) {
         assert grp.sharedGroup();
 
-        AtomicInteger cntr = cacheSizes.get(cacheId);
-
-        if (cntr != null)
-            return cntr;
-
-        AtomicInteger old = cacheSizes.putIfAbsent(cacheId, cntr = new AtomicInteger());
-
-        if (old != null)
-            cntr = old;
+        CacheMapHolder hld = cacheMapHolder(cacheId, true);
 
-        return cntr;
+        return hld.size;
     }
 
     /** {@inheritDoc} */
@@ -249,8 +232,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
         if (grp.sharedGroup()) {
             int size = 0;
 
-            for (ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map : cachesEntryMaps.values())
-                size += map.size();
+            for (CacheMapHolder hld : cacheMaps.values())
+                size += hld.map.size();
 
             return size;
         }
@@ -261,24 +244,36 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
     /** {@inheritDoc} */
     @Override protected ConcurrentMap<KeyCacheObject, GridCacheMapEntry> entriesMap(int
cacheId, boolean create) {
         if (grp.sharedGroup()) {
-            ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map = cachesEntryMaps.get(cacheId);
+            CacheMapHolder hld = cacheMapHolder(cacheId, create);
 
-            if (map != null)
-                return map;
+            return hld != null ? hld.map : null;
+        }
 
-            if (!create)
-                return null;
+        return singleCacheEntryMap;
+    }
 
-            ConcurrentMap<KeyCacheObject, GridCacheMapEntry> old =
-                cachesEntryMaps.putIfAbsent(cacheId, map = createEntriesMap());
+    /**
+     * @param cacheId Cache ID.
+     * @param create Create flag.
+     * @return Map holder.
+     */
+    private CacheMapHolder cacheMapHolder(int cacheId, boolean create) {
+        assert grp.sharedGroup();
 
-            if (old != null)
-                map = old;
+        CacheMapHolder hld = cacheMaps.get(cacheId);
 
-            return map;
-        }
+        if (hld != null)
+            return hld;
 
-        return singleCacheEntryMap;
+        if (!create)
+            return null;
+
+        CacheMapHolder  old = cacheMaps.putIfAbsent(cacheId, hld = new CacheMapHolder(createEntriesMap()));
+
+        if (old != null)
+            hld = old;
+
+        return hld;
     }
 
     /**
@@ -968,63 +963,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
     public void clearAll() throws NodeStoppingException {
         GridCacheVersion clearVer = ctx.versions().next();
 
-        boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
-
-        Collection<ConcurrentMap<KeyCacheObject, GridCacheMapEntry>> maps =
-            grp.sharedGroup() ? cachesEntryMaps.values() : Collections.singleton(singleCacheEntryMap);
-
         GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
 
-        for (ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map : maps) {
-            Iterator<GridCacheMapEntry> it = map.values().iterator();
-
-            while (it.hasNext()) {
-                GridCacheMapEntry cached = null;
-
-                ctx.database().checkpointReadLock();
-
-                try {
-                    cached = it.next();
-
-                    if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer,
extras)) {
-                        removeEntry(cached);
-
-                        if (!cached.isInternal()) {
-                            if (rec) {
-                                grp.addCacheEvent(cached.partition(),
-                                    cached.key(),
-                                    ctx.localNodeId(),
-                                    EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
-                                    null,
-                                    false,
-                                    cached.rawGet(),
-                                    cached.hasValue(),
-                                    false);
-                            }
-                        }
-                    }
-                }
-                catch (GridDhtInvalidPartitionException e) {
-                    assert isEmpty() && state() == EVICTED : "Invalid error [e="
+ e + ", part=" + this + ']';
-
-                    break; // Partition is already concurrently cleared and evicted.
-                }
-                catch (NodeStoppingException e) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to clear cache entry for evicted partition: " +
cached.partition());
-
-                    rent.onDone(e);
+        boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
 
-                    throw e;
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to clear cache entry for evicted partition: " +
cached, e);
-                }
-                finally {
-                    ctx.database().checkpointReadUnlock();
-                }
-            }
+        if (grp.sharedGroup()) {
+            for (CacheMapHolder hld : cacheMaps.values())
+                clear(hld.map, extras, rec);
         }
+        else
+            clear(singleCacheEntryMap, extras, rec);
 
         if (!grp.allowFastEviction()) {
             GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext().dhtCache().context();
@@ -1091,6 +1039,65 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
     }
 
     /**
+     * @param map Map to clear.
+     * @param extras Obsolete extras.
+     * @param evt Unload event flag.
+     * @throws NodeStoppingException
+     */
+    private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
+        GridCacheObsoleteEntryExtras extras,
+        boolean evt) throws NodeStoppingException {
+        Iterator<GridCacheMapEntry> it = map.values().iterator();
+
+        while (it.hasNext()) {
+            GridCacheMapEntry cached = null;
+
+            ctx.database().checkpointReadLock();
+
+            try {
+                cached = it.next();
+
+                if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(),
extras)) {
+                    removeEntry(cached);
+
+                    if (!cached.isInternal()) {
+                        if (evt) {
+                            grp.addCacheEvent(cached.partition(),
+                                cached.key(),
+                                ctx.localNodeId(),
+                                EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                null,
+                                false,
+                                cached.rawGet(),
+                                cached.hasValue(),
+                                false);
+                        }
+                    }
+                }
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e
+ ", part=" + this + ']';
+
+                break; // Partition is already concurrently cleared and evicted.
+            }
+            catch (NodeStoppingException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to clear cache entry for evicted partition: " + cached.partition());
+
+                rent.onDone(e);
+
+                throw e;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to clear cache entry for evicted partition: " + cached,
e);
+            }
+            finally {
+                ctx.database().checkpointReadUnlock();
+            }
+        }
+    }
+
+    /**
      *
      */
     private void clearDeferredDeletes() {
@@ -1129,8 +1136,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
 
     /** {@inheritDoc} */
     @Override public int publicSize(int cacheId) {
-        if (grp.sharedGroup())
-            return cacheSizeCounter(cacheId).get();
+        if (grp.sharedGroup()) {
+            CacheMapHolder hld = cacheMaps.get(cacheId);
+
+            return hld != null ? hld.size.get() : 0;
+        }
 
         return getSize(state.get());
     }
@@ -1176,7 +1186,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
                 it.remove();
         }
 
-        cachesEntryMaps.remove(cacheId);
+        cacheMaps.remove(cacheId);
     }
 
     /**
@@ -1293,4 +1303,27 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
             return S.toString(RemovedEntryHolder.class, this);
         }
     }
+
+    /**
+     *
+     */
+    static class CacheMapHolder {
+        /** */
+        final AtomicInteger size = new AtomicInteger();
+
+        /** */
+        final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map;
+
+        /**
+         * @param map Map.
+         */
+        CacheMapHolder(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map) {
+            this.map = map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheMapHolder.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3733f6aa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 1cc8999..a6e009d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -32,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.concurrent.locks.Lock;
 import javax.cache.Cache;
 import javax.cache.CacheException;
@@ -68,8 +72,11 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -1152,64 +1159,179 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest
{
         Ignite srv0 = startGrid(0);
 
         IgniteCache<Object, Object> srv0Cache1 =
-            srv0.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false));
+            srv0.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false));
         IgniteCache<Object, Object> srv0Cache2 =
-            srv0.createCache(cacheConfiguration(GROUP1, CACHE2, PARTITIONED, ATOMIC, 2, false));
+            srv0.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false));
+        IgniteCache<Object, Object> srv0Cache3 =
+            srv0.createCache(cacheConfiguration(GROUP2, "c3", PARTITIONED, TRANSACTIONAL,
2, false));
+        IgniteCache<Object, Object> srv0Cache4 =
+            srv0.createCache(cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL,
2, false));
+
+        final int ITEMS = 1_000;
 
-        for (int i = 0; i < 10; i++)
+        for (int i = 0; i < ITEMS; i++) {
             srv0Cache1.put(new Key1(i), i);
 
-        assertEquals(10, srv0Cache1.size());
-        assertEquals(10, srv0Cache1.localSize());
+            srv0Cache3.put(new Key1(i), i);
+            srv0Cache4.put(new Key1(i), -i);
+        }
+
+        assertEquals(ITEMS, srv0Cache1.size());
+        assertEquals(ITEMS, srv0Cache1.localSize());
         assertEquals(0, srv0Cache2.size());
+        assertEquals(ITEMS, srv0Cache3.size());
+        assertEquals(ITEMS, srv0Cache4.localSize());
 
-        Ignite srv1 = startGrid(1);
+        startGrid(1);
 
         awaitPartitionMapExchange();
 
-        IgniteCache<Object, Object> srv1Cache1 = srv1.cache("cache1");
-        IgniteCache<Object, Object> srv1Cache2 = srv1.cache("cache2");
+        for (int i = 0; i < 2; i++) {
+            Ignite node = ignite(i);
 
-        assertEquals(20, srv0Cache1.size(CachePeekMode.ALL));
-        assertEquals(10, srv0Cache1.localSize(CachePeekMode.ALL));
-        assertEquals(0, srv0Cache2.size(CachePeekMode.ALL));
-        assertEquals(0, srv0Cache2.localSize(CachePeekMode.ALL));
+            IgniteCache<Object, Object> cache1 = node.cache("c1");
+            IgniteCache<Object, Object> cache2 = node.cache("c2");
+            IgniteCache<Object, Object> cache3 = node.cache("c3");
+            IgniteCache<Object, Object> cache4 = node.cache("c4");
 
-        assertEquals(20, srv1Cache1.size(CachePeekMode.ALL));
-        assertEquals(10, srv1Cache1.localSize(CachePeekMode.ALL));
-        assertEquals(0, srv1Cache2.size(CachePeekMode.ALL));
-        assertEquals(0, srv1Cache2.localSize(CachePeekMode.ALL));
+            assertEquals(ITEMS * 2, cache1.size(CachePeekMode.ALL));
+            assertEquals(ITEMS, cache1.localSize(CachePeekMode.ALL));
+            assertEquals(0, cache2.size(CachePeekMode.ALL));
+            assertEquals(0, cache2.localSize(CachePeekMode.ALL));
 
-        for (int i = 0; i < 10; i++) {
-            assertEquals(i, srv0Cache1.localPeek(new Key1(i)));
-            assertEquals(i, srv1Cache1.localPeek(new Key1(i)));
+            assertEquals(ITEMS * 2, cache3.size(CachePeekMode.ALL));
+            assertEquals(ITEMS, cache3.localSize(CachePeekMode.ALL));
+
+            assertEquals(ITEMS * 2, cache4.size(CachePeekMode.ALL));
+            assertEquals(ITEMS, cache4.localSize(CachePeekMode.ALL));
+
+            for (int k = 0; k < ITEMS; k++) {
+                assertEquals(i, cache1.localPeek(new Key1(i)));
+                assertNull(cache2.localPeek(new Key1(i)));
+                assertEquals(i, cache3.localPeek(new Key1(i)));
+                assertEquals(-i, cache4.localPeek(new Key1(i)));
+            }
         }
 
-        for (int i = 0; i < 20; i++)
+        for (int i = 0; i < ITEMS * 2; i++)
             srv0Cache2.put(new Key1(i), i + 1);
 
         Ignite srv2 = startGrid(2);
 
         awaitPartitionMapExchange();
 
-        IgniteCache<Object, Object> srv2Cache1 = srv2.cache("cache1");
-        IgniteCache<Object, Object> srv2Cache2 = srv2.cache("cache2");
+        for (int i = 0; i < 3; i++) {
+            Ignite node = ignite(i);
+
+            IgniteCache<Object, Object> cache1 = node.cache("c1");
+            IgniteCache<Object, Object> cache2 = node.cache("c2");
+            IgniteCache<Object, Object> cache3 = node.cache("c3");
+            IgniteCache<Object, Object> cache4 = node.cache("c4");
+
+            assertEquals(ITEMS * 3, cache1.size(CachePeekMode.ALL));
+            assertEquals(ITEMS, cache1.localSize(CachePeekMode.ALL));
+            assertEquals(ITEMS * 6, cache2.size(CachePeekMode.ALL));
+            assertEquals(ITEMS * 2, cache2.localSize(CachePeekMode.ALL));
+            assertEquals(ITEMS, cache3.localSize(CachePeekMode.ALL));
+            assertEquals(ITEMS, cache4.localSize(CachePeekMode.ALL));
+        }
 
-        assertEquals(30, srv2Cache1.size(CachePeekMode.ALL));
-        assertEquals(10, srv2Cache1.localSize(CachePeekMode.ALL));
-        assertEquals(60, srv2Cache2.size(CachePeekMode.ALL));
-        assertEquals(20, srv1Cache2.localSize(CachePeekMode.ALL));
+        IgniteCache<Object, Object> srv2Cache1 = srv2.cache("c1");
+        IgniteCache<Object, Object> srv2Cache2 = srv2.cache("c2");
 
-        for (int i = 0; i < 10; i++)
+        for (int i = 0; i < ITEMS; i++)
             assertEquals(i, srv2Cache1.localPeek(new Key1(i)));
 
-        for (int i = 0; i < 20; i++)
+        for (int i = 0; i < ITEMS * 2; i++)
             assertEquals(i + 1, srv2Cache2.localPeek(new Key1(i)));
     }
 
     /**
      * @throws Exception If failed.
      */
+    public void testRebalance2() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Object, Object> srv0Cache1 =
+            srv0.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 0, false));
+        IgniteCache<Object, Object> srv0Cache2 =
+            srv0.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 0, false));
+
+        Affinity aff = srv0.affinity("c1");
+
+        final int ITEMS = 2_000;
+
+        Map<Integer, Integer> c1Data = new HashMap<>();
+        Map<Integer, Integer> c2Data = new HashMap<>();
+
+        for (int i = 0; i < ITEMS; i++) {
+            srv0Cache1.put(i, i);
+            c1Data.put(i, i);
+
+            if (i % 2 == 0) {
+                srv0Cache2.put(i, i);
+                c2Data.put(i, i);
+            }
+        }
+
+        assertEquals(ITEMS, srv0Cache1.size());
+        assertEquals(ITEMS / 2, srv0Cache2.size());
+
+        Ignite srv1 = startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        assertEquals(ITEMS, srv0Cache1.size());
+        assertEquals(ITEMS / 2, srv0Cache2.size());
+
+        checkCacheData(c1Data, "c1");
+        checkCacheData(c2Data, "c2");
+
+        Set<Integer> srv1Parts = new HashSet<>();
+
+        for (Integer p : aff.primaryPartitions(srv1.cluster().localNode()))
+            srv1Parts.add(p);
+
+        CacheGroupInfrastructure grpSrv0 = cacheGroup(srv0, GROUP1);
+        CacheGroupInfrastructure grpSrv1 = cacheGroup(srv1, GROUP1);
+
+        for (int p = 0; p < aff.partitions(); p++) {
+            if (srv1Parts.contains(p)) {
+                GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p);
+                assertFalse(it.hasNext());
+
+                it = grpSrv1.offheap().partitionIterator(p);
+                assertTrue(it.hasNext());
+            }
+            else {
+                GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p);
+                assertTrue(it.hasNext());
+
+                it = grpSrv1.offheap().partitionIterator(p);
+                assertFalse(it.hasNext());
+            }
+        }
+
+        c1Data = new HashMap<>();
+        c2Data = new HashMap<>();
+
+        for (int i = 0; i < ITEMS; i++) {
+            srv0Cache1.put(i, i + 1);
+            c1Data.put(i, i + 1);
+
+            if (i % 2 == 0) {
+                srv0Cache2.put(i, i + 1);
+                c2Data.put(i, i + 1);
+            }
+        }
+
+        checkCacheData(c1Data, "c1");
+        checkCacheData(c2Data, "c2");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testNoKeyIntersectTx() throws Exception {
         testNoKeyIntersect(TRANSACTIONAL);
     }
@@ -2472,6 +2594,260 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCacheIdSort() throws Exception {
+        Ignite node = startGrid(0);
+
+        final List<IgniteCache> caches = new ArrayList<>(3);
+
+        caches.add(node.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC,
1, false)
+            .setAffinity(new RendezvousAffinityFunction(false, 8))));
+        caches.add(node.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC,
1, false)
+            .setAffinity(new RendezvousAffinityFunction(false, 8))));
+        caches.add(node.createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, ATOMIC,
1, false)
+            .setAffinity(new RendezvousAffinityFunction(false, 8))));
+
+        Affinity aff = node.affinity("c1");
+
+        final List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 1_000_000; i++) {
+            if (aff.partition(i) == 0) {
+                keys.add(i);
+
+                if (keys.size() >= 10_000)
+                    break;
+            }
+        }
+
+        assertEquals(10_000, keys.size());
+
+        final long stopTime = System.currentTimeMillis() + 10_000;
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while (System.currentTimeMillis() < stopTime) {
+                    for (int i = 0; i < 100; i++) {
+                        IgniteCache cache = caches.get(rnd.nextInt(3));
+
+                        Integer key = keys.get(rnd.nextInt(10_000));
+
+                        if (rnd.nextFloat() > 0.8f)
+                            cache.remove(key);
+                        else
+                            cache.put(key, key);
+                    }
+                }
+
+                return null;
+            }
+        }, 5, "update-thread");
+
+        CacheGroupInfrastructure grp = cacheGroup(node, GROUP1);
+
+        Integer cacheId = null;
+
+        GridIterator<CacheDataRow> it = grp.offheap().partitionIterator(0);
+
+        int c = 0;
+
+        while (it.hasNext()) {
+            CacheDataRow row = it.next();
+
+            if (cacheId == null || cacheId != row.cacheId()) {
+                cacheId = row.cacheId();
+
+                c++;
+            }
+        }
+
+        assertEquals(3, c);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartsAndCacheCreateDestroy() throws Exception {
+        final int SRVS = 5;
+
+        startGrids(SRVS);
+
+        client = true;
+
+        final Ignite clientNode = startGrid(SRVS);
+
+        client = false;
+
+        final int CACHES = 10;
+
+        final AtomicReferenceArray<IgniteCache> caches = new AtomicReferenceArray<>(CACHES);
+
+        for (int i = 0; i < 10; i++) {
+            CacheAtomicityMode atomicityMode = i % 2 == 0 ? ATOMIC : TRANSACTIONAL;
+
+            caches.set(i,
+                clientNode.createCache(cacheConfiguration(GROUP1, "c" + i, PARTITIONED, atomicityMode,
0, false)));
+        }
+
+        final AtomicBoolean stop = new AtomicBoolean();
+        final AtomicInteger cacheCntr = new AtomicInteger();
+
+        try {
+            for (int i = 0; i < 10; i++) {
+                stop.set(false);
+
+                final AtomicReference<Exception> err = new AtomicReference<>();
+
+                log.info("Iteration: " + i);
+
+                IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Runnable()
{
+                    @Override public void run() {
+                        try {
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                            while (!stop.get()) {
+                                int node = rnd.nextInt(SRVS);
+
+                                log.info("Stop node: " + node);
+
+                                stopGrid(node);
+
+                                U.sleep(500);
+
+                                log.info("Start node: " + node);
+
+                                startGrid(node);
+
+                                try {
+                                    if (rnd.nextBoolean())
+                                        awaitPartitionMapExchange();
+                                }
+                                catch (Exception ignore) {
+                                    // No-op.
+                                }
+                            }
+                        }
+                        catch (Exception e){
+                            log.error("Unexpected error: " + e, e);
+
+                            err.set(e);
+
+                            stop.set(true);
+                        }
+                    }
+                });
+
+                IgniteInternalFuture<?> cacheFut = GridTestUtils.runAsync(new Runnable()
{
+                    @Override public void run() {
+                        try {
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                            while (!stop.get()) {
+                                int idx = rnd.nextInt(CACHES);
+
+                                IgniteCache cache = caches.get(idx);
+
+                                if (cache != null && caches.compareAndSet(idx, cache,
null)) {
+                                    log.info("Destroy cache: " + cache.getName());
+
+                                    clientNode.destroyCache(cache.getName());
+
+                                    CacheAtomicityMode atomicityMode = rnd.nextBoolean()
? ATOMIC : TRANSACTIONAL;
+
+                                    String name = "newName-" + cacheCntr.incrementAndGet();
+
+                                    cache = clientNode.createCache(
+                                        cacheConfiguration(GROUP1, name, PARTITIONED, atomicityMode,
0, false));
+
+                                    caches.set(idx, cache);
+                                }
+                            }
+                        }
+                        catch (Exception e){
+                            log.error("Unexpected error: " + e, e);
+
+                            err.set(e);
+
+                            stop.set(true);
+                        }
+                    }
+                });
+
+                IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable()
{
+                    @Override public void run() {
+                        try {
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                            while (!stop.get()) {
+                                int idx = rnd.nextInt(CACHES);
+
+                                IgniteCache cache = caches.get(idx);
+
+                                if (cache != null && caches.compareAndSet(idx, cache,
null)) {
+                                    for (int i = 0; i < 10; i++)
+                                        cacheOperation(rnd, cache);
+
+                                    caches.set(idx, cache);
+                                }
+                            }
+                        }
+                        catch (Exception e) {
+                            err.set(e);
+
+                            log.error("Unexpected error: " + e, e);
+
+                            stop.set(true);
+                        }
+                    }
+                }, 8, "op-thread");
+
+                Thread.sleep(10_000);
+
+                stop.set(true);
+
+                restartFut.get();
+                cacheFut.get();
+                opFut.get();
+
+                assertNull("Unexpected error during test, see log for details", err.get());
+
+                awaitPartitionMapExchange();
+
+                Set<Integer> cacheIds = new HashSet<>();
+
+                for (int c = 0; c < CACHES; c++) {
+                    IgniteCache cache = caches.get(c);
+
+                    assertNotNull(cache);
+
+                    assertTrue(cacheIds.add(CU.cacheId(cache.getName())));
+                }
+
+                for (int n = 0; n < SRVS; n++) {
+                    CacheGroupInfrastructure grp = cacheGroup(ignite(n), GROUP1);
+
+                    assertNotNull(grp);
+
+                    for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
{
+                        Map<Integer, Object> cachesMap = GridTestUtils.getFieldValue(part,
"cacheMaps");
+
+                        assertTrue(cachesMap.size() <= cacheIds.size());
+
+                        for (Integer cacheId : cachesMap.keySet())
+                            assertTrue(cachesMap.containsKey(cacheId));
+                    }
+                }
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+    }
+
+    /**
      * @param cntr Counter.
      * @param expEvts Expected events number.
      * @throws Exception If failed.
@@ -2893,8 +3269,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
      * @param grpName Cache group name.
      * @return Cache group.
      */
-    private CacheGroupInfrastructure cacheGroup(IgniteKernal node, String grpName) {
-        for (CacheGroupInfrastructure grp : node.context().cache().cacheGroups()) {
+    private CacheGroupInfrastructure cacheGroup(Ignite node, String grpName) {
+        for (CacheGroupInfrastructure grp : ((IgniteKernal)node).context().cache().cacheGroups())
{
             if (grpName.equals(grp.name()))
                 return grp;
         }


Mime
View raw message