ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/50] [abbrv] ignite git commit: IGNITE-1239 - Added test for reopened ticket.
Date Tue, 15 Sep 2015 17:25:49 GMT
IGNITE-1239 - Added test for reopened ticket.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/866fb415
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/866fb415
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/866fb415

Branch: refs/heads/ignite-1093-2
Commit: 866fb41525957555231fca11c5853731b9473170
Parents: 06fdd7d
Author: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Authored: Mon Sep 14 16:09:37 2015 -0700
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Mon Sep 14 16:09:37 2015 -0700

----------------------------------------------------------------------
 ...CacheScanPartitionQueryFallbackSelfTest.java | 105 ++++++++++++++++++-
 1 file changed, 104 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/866fb415/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index cb3a3bf..df310b4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -26,13 +26,19 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -48,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -67,7 +74,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     private static final int GRID_CNT = 3;
 
     /** Keys count. */
-    private static final int KEYS_CNT = 5000;
+    private static final int KEYS_CNT = 50 * RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
 
     /** Ip finder. */
     private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -261,6 +268,79 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
+     * Scan should activate fallback mechanism when new nodes join topology and rebalancing
happens in parallel with
+     * scan query.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testScanFallbackOnRebalancingCursor() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-1239");
+
+        cacheMode = CacheMode.PARTITIONED;
+        clientMode = false;
+        backups = 1;
+        commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
+
+        try {
+            Ignite ignite = startGrids(GRID_CNT);
+
+            fillCache(ignite);
+
+            final AtomicBoolean done = new AtomicBoolean(false);
+
+            IgniteInternalFuture fut1 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        for (int i = 0; i < 5; i++) {
+                            startGrid(GRID_CNT + i);
+
+                            U.sleep(500);
+                        }
+
+                        done.set(true);
+
+                        return null;
+                    }
+                }, 1);
+
+            final AtomicInteger nodeIdx = new AtomicInteger();
+
+            IgniteInternalFuture fut2 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        int nodeId = nodeIdx.getAndIncrement();
+
+                        IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
+
+                        while (!done.get()) {
+                            int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
+
+                            try {
+                                QueryCursor<Cache.Entry<Integer, Integer>> cur
=
+                                    cache.query(new ScanQuery<Integer, Integer>(part));
+
+                                U.debug(log, "Running query [node=" + nodeId + ", part="
+ part + ']');
+
+                                doTestScanQueryCursor(cur, part);
+                            }
+                            catch (ClusterGroupEmptyCheckedException e) {
+                                log.warning("Invalid partition: " + part, e);
+                            }
+                        }
+
+                        return null;
+                    }
+                }, GRID_CNT);
+
+            fut1.get();
+            fut2.get();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * Scan should try first remote node and fallbacks to second remote node.
      *
      * @throws Exception If failed.
@@ -391,6 +471,29 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
+     * @param cur Query cursor.
+     * @param part Partition number.
+     */
+    protected void doTestScanQueryCursor(
+        QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) throws IgniteCheckedException
{
+
+        Map<Integer, Integer> map = entries.get(part);
+
+        assert map != null;
+
+        int cnt = 0;
+
+        for (Cache.Entry<Integer, Integer> e : cur) {
+
+            assertEquals(map.get(e.getKey()), e.getValue());
+
+            cnt++;
+        }
+
+        assertEquals("Invalid number of entries for partition: " + part, map.size(), cnt);
+    }
+
+    /**
      * @param cctx Cctx.
      */
     private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {


Mime
View raw message