ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5075
Date Wed, 17 May 2017 12:15:03 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 c5ef936c5 -> dc8e10259


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc8e1025
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc8e1025
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc8e1025

Branch: refs/heads/ignite-5075
Commit: dc8e102599f966a39f7bdd7bc22274db52512daf
Parents: c5ef936
Author: sboikov <sboikov@gridgain.com>
Authored: Wed May 17 15:14:53 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed May 17 15:14:53 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  64 +-
 .../cache/database/tree/BPlusTree.java          |  39 +-
 .../apache/ignite/internal/util/IgniteTree.java |  11 +
 .../processors/cache/IgniteCacheGroupsTest.java | 696 ++++++++++++++++++-
 4 files changed, 762 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8e1025/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 9553491..b922a13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -209,10 +209,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 if (pendingEntries != null) {
                     PendingRow row = new PendingRow(cacheId);
 
-                    boolean removex = pendingEntries.removex(row);
+                    GridCursor<PendingRow> cursor = pendingEntries.find(row, row, PendingEntriesTree.WITHOUT_KEY);
 
-                    while (removex)
-                        removex = pendingEntries.removex(row);
+                    while (cursor.next()) {
+                        boolean res = pendingEntries.removex(cursor.get());
+
+                        assert res;
+                    }
                 }
             }
         }
@@ -848,7 +851,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 assert row.key != null && row.link != 0 && row.expireTime
!= 0 : row;
 
-                if (pendingEntries.remove(row) != null) {
+                if (pendingEntries.removex(row)) {
                     if (obsoleteVer == null)
                         obsoleteVer = ctx.versions().next();
 
@@ -1352,17 +1355,23 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             Exception ex = null;
 
-            SearchRow row = new SearchRow(cacheId);
+            SearchRow bound = new SearchRow(cacheId);
+
+            GridCursor<? extends CacheDataRow> cursor = dataTree.find(bound, bound,
CacheDataRowAdapter.RowData.KEY_ONLY);
+
+            while (cursor.next()) {
+                CacheDataRow row = cursor.get();
 
-            CacheDataRow removed = dataTree.remove(row);
+                assert row.link() != 0 : row;
 
-            while (removed != null) {
                 try {
-                    rowStore.removeRow(removed.link());
+                    boolean res = dataTree.removex(row);
 
-                    decrementSize(cacheId);
+                    assert res : row;
+
+                    rowStore.removeRow(row.link());
 
-                    removed = dataTree.remove(row);
+                    decrementSize(cacheId);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Fail remove row [link=" + row.link() + "]");
@@ -2180,22 +2189,16 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
         /**
          * @param grp Cache group.
-         * @param cacheId Cache ID.
-         * @param expireTime Expire time.
-         * @param link Link.
          * @return Row.
          * @throws IgniteCheckedException If failed.
          */
-        static PendingRow createRowWithKey(CacheGroupInfrastructure grp, int cacheId, long
expireTime, long link)
-            throws IgniteCheckedException {
-            PendingRow row = new PendingRow(cacheId, expireTime, link);
-
+        PendingRow initKey(CacheGroupInfrastructure grp) throws IgniteCheckedException {
             CacheDataRowAdapter rowData = new CacheDataRowAdapter(link);
             rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
 
-            row.key = rowData.key();
+            key = rowData.key();
 
-            return row;
+            return this;
         }
 
         /** {@inheritDoc} */
@@ -2209,6 +2212,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
      */
     protected static class PendingEntriesTree extends BPlusTree<PendingRow, PendingRow>
{
         /** */
+        private final static Object WITHOUT_KEY = new Object();
+
+        /** */
         private final CacheGroupInfrastructure grp;
 
         /**
@@ -2284,9 +2290,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr,
int idx, Object ignore)
+        @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr,
int idx, Object flag)
             throws IgniteCheckedException {
-            return io.getLookupRow(this, pageAddr, idx);
+            PendingRow row = io.getLookupRow(this, pageAddr, idx);
+
+            return flag == WITHOUT_KEY ? row : row.initKey(grp);
         }
     }
 
@@ -2369,10 +2377,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         /** {@inheritDoc} */
         @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long
pageAddr, int idx)
             throws IgniteCheckedException {
-            return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp,
-                getCacheId(pageAddr, idx),
-                getExpireTime(pageAddr, idx),
-                getLink(pageAddr, idx));
+            return new PendingRow(getCacheId(pageAddr, idx), getExpireTime(pageAddr, idx),
getLink(pageAddr, idx));
         }
 
         /** {@inheritDoc} */
@@ -2443,10 +2448,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         /** {@inheritDoc} */
         @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long
pageAddr, int idx)
             throws IgniteCheckedException {
-            return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp,
-                getCacheId(pageAddr, idx),
-                getExpireTime(pageAddr, idx),
-                getLink(pageAddr, idx));
+            return new PendingRow(getCacheId(pageAddr, idx), getExpireTime(pageAddr, idx),
getLink(pageAddr, idx));
         }
 
         /** {@inheritDoc} */
@@ -2532,7 +2534,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
          * @param ver Page format version.
          */
         CacheIdAwarePendingEntryInnerIO(int ver) {
-            super(T_PENDING_REF_INNER, ver, true, 20);
+            super(T_CACHE_ID_AWARE_PENDING_REF_INNER, ver, true, 20);
         }
 
         /** {@inheritDoc} */
@@ -2559,7 +2561,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
          * @param ver Page format version.
          */
         CacheIdAwarePendingEntryLeafIO(int ver) {
-            super(T_PENDING_REF_LEAF, ver, 20);
+            super(T_CACHE_ID_AWARE_PENDING_REF_LEAF, ver, 20);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8e1025/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
index a4c09d5..d7a4f7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
@@ -883,11 +883,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
 
     /**
      * @param upper Upper bound.
+     * @param x Implementation specific argument, {@code null} always means that we need
to return full detached data row.
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    private GridCursor<T> findLowerUnbounded(L upper) throws IgniteCheckedException
{
-        ForwardCursor cursor = new ForwardCursor(null, upper);
+    private GridCursor<T> findLowerUnbounded(L upper, Object x) throws IgniteCheckedException
{
+        ForwardCursor cursor = new ForwardCursor(null, upper, x);
 
         long firstPageId;
 
@@ -932,14 +933,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    @Override public final GridCursor<T> find(L lower, L upper) throws IgniteCheckedException
{
+    @Override public GridCursor<T> find(L lower, L upper) throws IgniteCheckedException
{
+        return find(lower, upper, null);
+    }
+
+    /**
+     * @param lower Lower bound inclusive or {@code null} if unbounded.
+     * @param upper Upper bound inclusive or {@code null} if unbounded.
+     * @param x Implementation specific argument, {@code null} always means that we need
to return full detached data row.
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Override public final GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException
{
         checkDestroyed();
 
         try {
             if (lower == null)
-                return findLowerUnbounded(upper);
+                return findLowerUnbounded(upper, x);
 
-            ForwardCursor cursor = new ForwardCursor(lower, upper);
+            ForwardCursor cursor = new ForwardCursor(lower, upper, x);
 
             cursor.find();
 
@@ -4376,6 +4388,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
         /** */
         private final L upperBound;
 
+        /** */
+        private final Object x;
+
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
@@ -4383,6 +4398,18 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
         ForwardCursor(L lowerBound, L upperBound) {
             this.lowerBound = lowerBound;
             this.upperBound = upperBound;
+            this.x = null;
+        }
+
+        /**
+         * @param lowerBound Lower bound.
+         * @param upperBound Upper bound.
+         * @param x Implementation specific argument, {@code null} always means that we need
to return full detached data row.
+         */
+        ForwardCursor(L lowerBound, L upperBound, Object x) {
+            this.lowerBound = lowerBound;
+            this.upperBound = upperBound;
+            this.x = x;
         }
 
         /**
@@ -4495,7 +4522,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
                 rows = (T[])new Object[cnt];
 
             for (int i = 0; i < cnt; i++) {
-                T r = getRow(io, pageAddr, startIdx + i);
+                T r = getRow(io, pageAddr, startIdx + i, x);
 
                 rows = GridArrays.set(rows, i, r);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8e1025/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
index 396b8a4..3198191 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
@@ -64,6 +64,17 @@ public interface IgniteTree<L, T> {
     public GridCursor<T> find(L lower, L upper) throws IgniteCheckedException;
 
     /**
+     * Returns a cursor from lower to upper bounds inclusive.
+     *
+     * @param lower Lower bound or {@code null} if unbounded.
+     * @param upper Upper bound or {@code null} if unbounded.
+     * @param x Implementation specific argument, {@code null} always means that we need
to return full detached data row.
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException;
+
+    /**
      * Returns a value mapped to the lowest key, or {@code null} if tree is empty
      * @return Value.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8e1025/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index aa1fcff..a2eecf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -18,17 +18,20 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
 import java.util.concurrent.locks.Lock;
 import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
@@ -36,13 +39,20 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -54,6 +64,7 @@ import org.apache.ignite.transactions.Transaction;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -77,6 +88,12 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     private static final String GROUP2 = "grp2";
 
     /** */
+    private static final String CACHE1 = "cache1";
+
+    /** */
+    private static final String CACHE2 = "cache2";
+
+    /** */
     private boolean client;
 
     /** */
@@ -159,6 +176,663 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCreateCacheWithSameNameInAnotherGroup() throws Exception {
+        startGridsMultiThreaded(2);
+
+        final Ignite ignite = ignite(0);
+
+        ignite.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false));
+
+        GridTestUtils.assertThrows(null, new GridPlainCallable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite(1).createCache(cacheConfiguration(GROUP2, CACHE1, PARTITIONED, ATOMIC,
2, false));
+                return null;
+            }
+        }, CacheExistsException.class, "a cache with the same name is already started");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCreateDestroyCachesAtomicPartitioned() throws Exception {
+        createDestroyCaches(PARTITIONED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCreateDestroyCachesTxPartitioned() throws Exception {
+        createDestroyCaches(PARTITIONED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCreateDestroyCachesAtomicReplicated() throws Exception {
+        createDestroyCaches(REPLICATED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCreateDestroyCachesTxReplicated() throws Exception {
+        createDestroyCaches(REPLICATED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryAtomicPartitioned() throws Exception {
+        scanQuery(PARTITIONED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryTxPartitioned() throws Exception {
+        scanQuery(PARTITIONED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryAtomicReplicated() throws Exception {
+        scanQuery(REPLICATED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryTxReplicated() throws Exception {
+        scanQuery(REPLICATED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryAtomicLocal() throws Exception {
+        scanQuery(LOCAL, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryTxLocal() throws Exception {
+        scanQuery(LOCAL, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesTtlAtomicPartitioned() throws Exception {
+        entriesTtl(PARTITIONED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesTtlTxPartitioned() throws Exception {
+        entriesTtl(PARTITIONED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesTtlAtomicReplicated() throws Exception {
+        entriesTtl(REPLICATED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesTtlTxReplicated() throws Exception {
+        entriesTtl(REPLICATED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesTtlAtomicLocal() throws Exception {
+        entriesTtl(LOCAL, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesTtlTxLocal() throws Exception {
+        entriesTtl(LOCAL, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheIteratorAtomicPartitioned() throws Exception {
+        cacheIterator(PARTITIONED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheIteratorTxPartitioned() throws Exception {
+        cacheIterator(PARTITIONED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheIteratorAtomicReplicated() throws Exception {
+        cacheIterator(REPLICATED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheIteratorTxReplicated() throws Exception {
+        cacheIterator(REPLICATED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheIteratorAtomicLocal() throws Exception {
+        cacheIterator(LOCAL, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheIteratorTxLocal() throws Exception {
+        cacheIterator(LOCAL, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryMultiplePartitionsAtomicPartitioned() throws Exception {
+        scanQueryMultiplePartitions(PARTITIONED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryMultiplePartitionsTxPartitioned() throws Exception {
+        scanQueryMultiplePartitions(PARTITIONED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryMultiplePartitionsAtomicReplicated() throws Exception {
+        scanQueryMultiplePartitions(REPLICATED, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryMultiplePartitionsTxReplicated() throws Exception {
+        scanQueryMultiplePartitions(REPLICATED, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void scanQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws
Exception {
+        int keys = 10000;
+
+        Integer[] data1 = generateData(keys);
+        Integer[] data2 = generateData(keys);
+
+        boolean local = cacheMode == LOCAL;
+
+        if (local)
+            startGrid(0);
+        else
+            startGridsMultiThreaded(4);
+
+        Ignite srv0 = ignite(0);
+
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2,
false));
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2,
false));
+
+        if(!local)
+            awaitPartitionMapExchange();
+
+        IgniteCache<Integer, Integer> cache1;
+        IgniteCache<Integer, Integer> cache2;
+
+        if (atomicityMode == TRANSACTIONAL) {
+            Ignite ignite = ignite(local ? 0 : 1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                cache1 = ignite.cache(CACHE1);
+                cache2 = ignite.cache(CACHE2);
+
+                for (int i = 0; i < keys ; i++) {
+                    cache1.put(i, data1[i]);
+                    cache2.put(i, data2[i]);
+                }
+
+                tx.commit();
+            }
+        }
+        else {
+            cache1 = ignite(local ? 0 : 1).cache(CACHE1);
+            cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+
+            for (int i = 0; i < keys ; i++) {
+                cache1.put(i, data1[i]);
+                cache2.put(i, data2[i]);
+            }
+        }
+
+        ScanQuery<Integer, Integer> qry = new ScanQuery<>();
+
+        Set<Integer> keysSet = sequence(keys);
+
+        for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).cache(CACHE1).query(qry))
{
+            assertTrue(keysSet.remove(entry.getKey()));
+            assertEquals(data1[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(keysSet.isEmpty());
+
+        srv0.destroyCache(CACHE1);
+
+        keysSet = sequence(keys);
+
+        for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).cache(CACHE2).query(qry))
{
+            assertTrue(keysSet.remove(entry.getKey()));
+            assertEquals(data2[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(keysSet.isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void scanQueryMultiplePartitions(CacheMode cacheMode, CacheAtomicityMode atomicityMode)
throws Exception {
+        int keys = 10000;
+
+        Integer[] data1 = generateData(keys);
+        Integer[] data2 = generateData(keys);
+
+        startGridsMultiThreaded(4);
+
+        Ignite srv0 = ignite(0);
+
+        srv0.createCache(
+            cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)
+                .setAffinity(new RendezvousAffinityFunction().setPartitions(32)));
+        srv0.createCache(
+            cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)
+                .setAffinity(new RendezvousAffinityFunction().setPartitions(32)));
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<Integer, Integer> cache1;
+        IgniteCache<Integer, Integer> cache2;
+
+        if (atomicityMode == TRANSACTIONAL) {
+            Ignite ignite = ignite(1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                cache1 = ignite.cache(CACHE1);
+                cache2 = ignite.cache(CACHE2);
+
+                for (int i = 0; i < keys ; i++) {
+                    cache1.put(i, data1[i]);
+                    cache2.put(i, data2[i]);
+                }
+
+                tx.commit();
+            }
+        }
+        else {
+            cache1 = ignite(1).cache(CACHE1);
+            cache2 = ignite(2).cache(CACHE2);
+
+            for (int i = 0; i < keys ; i++) {
+                cache1.put(i, data1[i]);
+                cache2.put(i, data2[i]);
+            }
+        }
+
+
+        int p = ThreadLocalRandom.current().nextInt(32);
+
+        ScanQuery<Integer, Integer> qry = new ScanQuery().setPartition(p);
+
+        Set<Integer> keysSet = new TreeSet<>();
+
+        cache1 = ignite(3).cache(CACHE1);
+
+        Affinity<Integer> aff = affinity(cache1);
+
+        for(int i = 0; i < keys; i++) {
+            if (aff.partition(i) == p) {
+                keysSet.add(i);
+            }
+        }
+
+        for (Cache.Entry<Integer, Integer> entry : cache1.query(qry)) {
+            assertTrue(keysSet.remove(entry.getKey()));
+            assertEquals(data1[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(keysSet.isEmpty());
+
+        srv0.destroyCache(CACHE1);
+
+        keysSet = new TreeSet<>();
+
+        cache2 = ignite(3).cache(CACHE2);
+
+        aff = affinity(cache2);
+
+        for(int i = 0; i < keys; i++) {
+            if (aff.partition(i) == p) {
+                keysSet.add(i);
+            }
+        }
+
+        for (Cache.Entry<Integer, Integer> entry : cache2.query(qry)) {
+            assertTrue(keysSet.remove(entry.getKey()));
+            assertEquals(data2[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(keysSet.isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void cacheIterator(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws
Exception {
+        int keys = 10000;
+
+        Integer[] data1 = generateData(keys);
+        Integer[] data2 = generateData(keys);
+
+        boolean local = cacheMode == LOCAL;
+
+        if (local)
+            startGrid(0);
+        else
+            startGridsMultiThreaded(4);
+
+        Ignite srv0 = ignite(0);
+
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2,
false));
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2,
false));
+
+        if(!local)
+            awaitPartitionMapExchange();
+
+        if (atomicityMode == TRANSACTIONAL) {
+            Ignite ignite = ignite(local ? 0 : 1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                IgniteCache cache1 = ignite.cache(CACHE1);
+                IgniteCache cache2 = ignite.cache(CACHE2);
+
+                for (int i = 0; i < keys ; i++) {
+                    cache1.put(i, data1[i]);
+                    cache2.put(i, data2[i]);
+                }
+
+                tx.commit();
+            }
+        }
+        else {
+            IgniteCache cache1 = ignite(local ? 0 : 1).cache(CACHE1);
+            IgniteCache cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+
+            for (int i = 0; i < keys ; i++) {
+                cache1.put(i, data1[i]);
+                cache2.put(i, data2[i]);
+            }
+        }
+
+
+        Set<Integer> keysSet = sequence(keys);
+
+        for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).<Integer,
Integer>cache(CACHE1)) {
+            assertTrue(keysSet.remove(entry.getKey()));
+            assertEquals(data1[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(keysSet.isEmpty());
+
+        srv0.destroyCache(CACHE1);
+
+        keysSet = sequence(keys);
+
+        for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).<Integer,
Integer>cache(CACHE2)) {
+            assertTrue(keysSet.remove(entry.getKey()));
+            assertEquals(data2[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(keysSet.isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void entriesTtl(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws
Exception {
+        int keys = 10000;
+
+        final int ttl = 10000;
+
+        Integer[] data1 = generateData(keys);
+        Integer[] data2 = generateData(keys);
+
+        boolean local = cacheMode == LOCAL;
+
+        if (local)
+            startGrid(0);
+        else
+            startGridsMultiThreaded(4);
+
+
+        Ignite srv0 = ignite(0);
+
+        srv0.createCache(
+            cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)
+                // -1 = ETERNAL       just created entries are not expiring
+                // -2 = NOT_CHANGED   not to change ttl on entry update
+                .setExpiryPolicyFactory(new PlatformExpiryPolicyFactory(-1, -2, ttl)).setEagerTtl(true)
+        );
+
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2,
false));
+
+        if (!local)
+            awaitPartitionMapExchange();
+
+        if (atomicityMode == TRANSACTIONAL) {
+            Ignite ignite = ignite(local ? 0 : 1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                IgniteCache cache1 = ignite.cache(CACHE1);
+                IgniteCache cache2 = ignite.cache(CACHE2);
+
+                for (int i = 0; i < keys ; i++) {
+                    cache1.put(i, data1[i]);
+                    cache2.put(i, data2[i]);
+                }
+
+                tx.commit();
+            }
+        }
+        else {
+            IgniteCache cache1 = ignite(local ? 0 : 1).cache(CACHE1);
+            IgniteCache cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+
+            for (int i = 0; i < keys ; i++) {
+                cache1.put(i, data1[i]);
+                cache2.put(i, data2[i]);
+            }
+        }
+
+        checkData(local ? 0 : 3, CACHE1, data1);
+        checkData(local ? 0 : 3, CACHE2, data2);
+
+        srv0.destroyCache(CACHE2);
+
+        checkData(local ? 0 : 3, CACHE1, data1);
+
+        // Wait for expiration
+
+        Thread.sleep((long)(ttl * 1.2));
+
+        assertEquals(0, ignite(local ? 0 : 3).cache(CACHE1).size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void createDestroyCaches(CacheMode cacheMode, CacheAtomicityMode atomicityMode)
throws Exception {
+        int keys = 10000;
+
+        Integer[] data1 = generateData(keys);
+        Integer[] data2 = generateData(keys);
+
+        startGridsMultiThreaded(4);
+
+        Ignite srv0 = ignite(0);
+
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2,
false));
+        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2,
false));
+
+        awaitPartitionMapExchange();
+
+        if (atomicityMode == TRANSACTIONAL) {
+            Ignite ignite = ignite(1);
+
+            try (Transaction tx = ignite.transactions().txStart()) {
+                IgniteCache cache1 = ignite.cache(CACHE1);
+                IgniteCache cache2 = ignite.cache(CACHE2);
+
+                for (int i = 0; i < keys ; i++) {
+                    cache1.put(i, data1[i]);
+                    cache2.put(i, data2[i]);
+                }
+
+                tx.commit();
+            }
+        }
+        else {
+            IgniteCache cache1 = ignite(1).cache(CACHE1);
+            IgniteCache cache2 = ignite(2).cache(CACHE2);
+
+            for (int i = 0; i < keys ; i++) {
+                cache1.put(i, data1[i]);
+                cache2.put(i, data2[i]);
+            }
+        }
+
+        checkLocalData(3, CACHE1, data1);
+        checkLocalData(0, CACHE2, data2);
+
+        checkData(0, CACHE1, data1);
+        checkData(3, CACHE2, data2);
+
+        ignite(1).destroyCache(CACHE2);
+
+        startGrid(5);
+
+        awaitPartitionMapExchange();
+
+        checkData(5, CACHE1, data1);
+        checkLocalData(5, CACHE1, data1);
+
+        ignite(1).destroyCache(CACHE1);
+
+        checkCacheGroup(5, GROUP1, false);
+    }
+
+    /**
+     * Creates an array of random integers.
+     *
+     * @param cnt Array length.
+     * @return Array of random integers.
+     */
+    private Integer[] generateData(int cnt) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer[] data = new Integer[cnt];
+
+        for (int i = 0; i < data.length; i++)
+            data[i] = rnd.nextInt();
+
+        return data;
+    }
+
+    /**
+     * @param cnt Sequence length.
+     * @return Sequence of integers.
+     */
+    private Set<Integer> sequence(int cnt) {
+        Set<Integer> res = new TreeSet<>();
+
+        for (int i = 0; i < cnt; i++)
+            res.add(i);
+
+        return res;
+    }
+
+    /**
+     * @param idx Node index.
+     * @param cacheName Cache name.
+     * @param data Expected data.
+     * @throws Exception If failed.
+     */
+    private void checkData(int idx, String cacheName, Integer[] data) throws Exception {
+        Set<Integer> keys = sequence(data.length);
+
+        Set<Map.Entry<Integer, Integer>> entries =
+            ignite(idx).<Integer, Integer>cache(cacheName).getAll(keys).entrySet();
+
+        for (Map.Entry<Integer, Integer> entry : entries) {
+            assertTrue(keys.remove(entry.getKey()));
+            assertEquals(data[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(keys.isEmpty());
+    }
+
+    /**
+     * @param idx Node index.
+     * @param cacheName Cache name.
+     * @param data Expected data.
+     * @throws Exception If failed.
+     */
+    private void checkLocalData(int idx, String cacheName, Integer[] data) throws Exception
{
+        Ignite ignite = ignite(idx);
+        ClusterNode node = ignite.cluster().localNode();
+        IgniteCache cache = ignite.<Integer, Integer>cache(cacheName);
+
+        Affinity aff = affinity(cache);
+
+        Set<Integer> localKeys = new TreeSet<>();
+
+        for (int key = 0; key < data.length; key++) {
+            if(aff.isPrimaryOrBackup(node, key))
+                localKeys.add(key);
+        }
+
+        Iterable<Cache.Entry<Integer, Integer>> localEntries = cache.localEntries(CachePeekMode.OFFHEAP);
+
+        for (Cache.Entry<Integer, Integer> entry : localEntries) {
+            assertTrue(localKeys.remove(entry.getKey()));
+            assertEquals(data[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(localKeys.isEmpty());
+    }
+
+    /**
      * @param srvs Number of server nodes.
      * @throws Exception If failed.
      */
@@ -175,37 +849,37 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
         for (int iter = 0; iter < 3; iter++) {
             log.info("Iteration: " + iter);
 
-            srv0.createCache(cacheConfiguration(GROUP1, "cache1", PARTITIONED, ATOMIC, 2,
false));
+            srv0.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false));
 
             checkCacheDiscoveryDataConsistent();
 
             for (int i = 0; i < srvs; i++) {
                 checkCacheGroup(i, GROUP1, true);
 
-                checkCache(i, "cache1");
+                checkCache(i, CACHE1);
             }
 
-            srv0.createCache(cacheConfiguration(GROUP1, "cache2", PARTITIONED, ATOMIC, 2,
false));
+            srv0.createCache(cacheConfiguration(GROUP1, CACHE2, PARTITIONED, ATOMIC, 2, false));
 
             checkCacheDiscoveryDataConsistent();
 
             for (int i = 0; i < srvs; i++) {
                 checkCacheGroup(i, GROUP1, true);
 
-                checkCache(i, "cache2");
+                checkCache(i, CACHE2);
             }
 
-            srv0.destroyCache("cache1");
+            srv0.destroyCache(CACHE1);
 
             checkCacheDiscoveryDataConsistent();
 
             for (int i = 0; i < srvs; i++) {
                 checkCacheGroup(i, GROUP1, true);
 
-                checkCache(i, "cache2");
+                checkCache(i, CACHE2);
             }
 
-            srv0.destroyCache("cache2");
+            srv0.destroyCache(CACHE2);
 
             checkCacheDiscoveryDataConsistent();
 
@@ -328,9 +1002,9 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
         Ignite srv0 = startGrid(0);
 
         IgniteCache<Object, Object> srv0Cache1 =
-            srv0.createCache(cacheConfiguration(GROUP1, "cache1", PARTITIONED, ATOMIC, 2,
false));
+            srv0.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false));
         IgniteCache<Object, Object> srv0Cache2 =
-            srv0.createCache(cacheConfiguration(GROUP1, "cache2", PARTITIONED, ATOMIC, 2,
false));
+            srv0.createCache(cacheConfiguration(GROUP1, CACHE2, PARTITIONED, ATOMIC, 2, false));
 
         for (int i = 0; i < 10; i++)
             srv0Cache1.put(new Key1(i), i);


Mime
View raw message