ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1239 Reverted changes in localPeek, instead in scan iterator use 'cache.peekEx'. Removed in CacheQueryFallbackFuture internal future listener assuming CacheQueryFallbackFuture.get is never called.
Date Wed, 16 Sep 2015 09:27:04 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1239 471b75e70 -> f39577a25


ignite-1239 Reverted changes in localPeek, instead in scan iterator use 'cache.peekEx'.  Removed
in CacheQueryFallbackFuture internal future listener assuming CacheQueryFallbackFuture.get
is never called.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f39577a2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f39577a2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f39577a2

Branch: refs/heads/ignite-1239
Commit: f39577a257e74d5ef6858a31ce6f75842f264598
Parents: 471b75e
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 16 12:16:49 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 16 12:23:48 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  45 +++--
 .../query/GridCacheDistributedQueryFuture.java  |   2 -
 .../cache/query/GridCacheQueryAdapter.java      |  56 +-----
 .../cache/query/GridCacheQueryManager.java      |  19 +-
 ...CacheScanPartitionQueryFallbackSelfTest.java | 192 +++++--------------
 5 files changed, 92 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 4c24a51..1fc94ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -745,16 +745,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
                 int part = ctx.affinity().partition(cacheKey);
 
-                boolean dhtKey = false;
+                boolean nearKey;
 
-                if (modes.primary || modes.backup) {
+                if (!(modes.near && modes.primary && modes.backup)) {
                     boolean keyPrimary = ctx.affinity().primary(ctx.localNode(), part, topVer);
 
                     if (keyPrimary) {
                         if (!modes.primary)
                             return null;
 
-                        dhtKey = true;
+                        nearKey = false;
                     }
                     else {
                         boolean keyBackup = ctx.affinity().belongs(ctx.localNode(), part,
topVer);
@@ -763,22 +763,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                             if (!modes.backup)
                                 return null;
 
-                            dhtKey = true;
+                            nearKey = false;
+                        }
+                        else {
+                            if (!modes.near)
+                                return null;
+
+                            nearKey = true;
+
+                            // Swap and offheap are disabled for near cache.
+                            modes.offheap = false;
+                            modes.swap = false;
                         }
                     }
+                }
+                else {
+                    nearKey = !ctx.affinity().belongs(ctx.localNode(), part, topVer);
 
-                    // We will always peek DHT entry if both primary and backup flags are
set, regardless of
-                    // affinity calculation.
-                    // This is required because there are scenarios when neither primary
nor backup node is an owner,
-                    // but we need to be able to peek cache value.
-                    dhtKey |= (modes.primary && modes.backup);
+                    if (nearKey) {
+                        // Swap and offheap are disabled for near cache.
+                        modes.offheap = false;
+                        modes.swap = false;
+                    }
                 }
 
-                if (!dhtKey && !ctx.isNear())
+                if (nearKey && !ctx.isNear())
                     return null;
 
                 if (modes.heap) {
-                    GridCacheEntryEx e = (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey)
: peekEx(cacheKey));
+                    GridCacheEntryEx e = nearKey ? peekEx(cacheKey) :
+                        (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
 
                     if (e != null) {
                         cacheVal = e.peek(modes.heap, modes.offheap, modes.swap, topVer,
plc);
@@ -786,15 +800,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                         modes.offheap = false;
                         modes.swap = false;
                     }
-
-                    if (cacheVal == null && modes.near && ctx.isNear()) {
-                        e = peekEx(cacheKey);
-
-                        cacheVal = e.peek(modes.heap, false, false, topVer, plc);
-
-                        modes.offheap = false;
-                        modes.swap = false;
-                    }
                 }
 
                 if (modes.offheap || modes.swap) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 06d0625..e745e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -143,8 +143,6 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
         }
 
         if (callOnPage)
-            // We consider node departure as a reception of last empty
-            // page from this node.
             onPage(nodeId, Collections.emptyList(),
                 new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId),
true);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 9b1fb7e..db67bbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -50,8 +50,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.jetbrains.annotations.Nullable;
@@ -599,6 +597,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
         }
 
         /**
+         * @param topVer Topology version.
          * @return Nodes for query execution.
          */
         private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
@@ -629,53 +628,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
         private void init() {
             final ClusterNode node = nodes.poll();
 
-            GridCacheQueryFutureAdapter<?, ?, R> fut0 = (GridCacheQueryFutureAdapter<?,
?, R>)(node.isLocal() ?
+            fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
                 qryMgr.queryLocal(bean) :
                 qryMgr.queryDistributed(bean, Collections.singleton(node)));
-
-            fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>()
{
-                @Override public void apply(IgniteInternalFuture<Collection<R>>
fut) {
-                    try {
-                        onDone(fut.get());
-                    }
-                    catch (IgniteClientDisconnectedCheckedException e) {
-                        onDone(e);
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
-                            unreservedTopVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
-
-                            assert unreservedTopVer != null;
-                        }
-
-                        if (F.isEmpty(nodes)) {
-                            final AffinityTopologyVersion topVer = unreservedTopVer;
-
-                            if (topVer != null && --unreservedNodesRetryCnt >
0) {
-                                cctx.affinity().affinityReadyFuture(topVer).listen(
-                                    new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                                        @Override public void apply(
-                                            IgniteInternalFuture<AffinityTopologyVersion>
future) {
-
-                                            nodes = fallbacks(topVer);
-
-                                            // Race is impossible here because query retries
are executed one by one.
-                                            unreservedTopVer = null;
-
-                                            init();
-                                        }
-                                    });
-                            }
-                            else
-                                onDone(e);
-                        }
-                        else
-                            init();
-                    }
-                }
-            });
-
-            fut = fut0;
         }
 
         /** {@inheritDoc} */
@@ -689,6 +644,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T>
{
         }
 
         /** {@inheritDoc} */
+        @Override public Collection<R> get() throws IgniteCheckedException {
+            assert false;
+
+            return super.get();
+        }
+
+        /** {@inheritDoc} */
         @Override public R next() {
             if (firstItemReturned)
                 return fut.next();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index a7b55eb..25ace1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -33,8 +33,9 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CachePeekModes;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheOffheapSwapEntry;
@@ -819,8 +820,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht()
: cctx.dht());
 
+        final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
+
         final ExpiryPolicy plc = cctx.expiry();
 
+        final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
         final boolean backups = qry.includeBackups() || cctx.isReplicated();
 
         final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
@@ -841,8 +846,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     else if (part < 0 || part >= cctx.affinity().partitions())
                         iter = F.emptyIterator();
                     else {
-                        AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
-
                         locPart = dht.topology().localPartition(part, topVer, false);
 
                         // double check for owning state
@@ -899,7 +902,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         V val;
 
                         try {
-                            val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+                            GridCacheEntryEx entry = cache.peekEx(key);
+
+                            CacheObject cacheVal =
+                                entry != null ? entry.peek(true, false, false, topVer, expiryPlc)
: null;
+
+                            val = cacheVal != null ? (V)cacheVal.value(cctx.cacheObjectContext(),
false) : null;
+                        }
+                        catch (GridCacheEntryRemovedException e) {
+                            val = null;
                         }
                         catch (IgniteCheckedException e) {
                             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index c087ffc..02b213e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -22,17 +22,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
@@ -43,14 +40,11 @@ import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -90,15 +84,9 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     /** Expected first node ID. */
     private static UUID expNodeId;
 
-    /** Expected fallback node ID. */
-    private static UUID expFallbackNodeId;
-
     /** Communication SPI factory. */
     private CommunicationSpiFactory commSpiFactory;
 
-    /** Latch. */
-    private static CountDownLatch latch;
-
     /** Test entries. */
     private Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
 
@@ -150,7 +138,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
             int part = anyLocalPartition(cache.context());
 
-            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null,
part, false);
+            QueryCursor<Cache.Entry<Integer, Integer>> qry =
+                cache.query(new ScanQuery<Integer, Integer>().setPartition(part));
 
             doTestScanQuery(qry, part);
         }
@@ -180,7 +169,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
             expNodeId = tup.get2();
 
-            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null,
part, false);
+            QueryCursor<Cache.Entry<Integer, Integer>> qry =
+                cache.query(new ScanQuery<Integer, Integer>().setPartition(part));
 
             doTestScanQuery(qry, part);
         }
@@ -190,12 +180,17 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
-     * Scan should activate fallback mechanism when new nodes join topology and rebalancing
happens in parallel with
-     * scan query.
-     *
      * @throws Exception In case of error.
      */
     public void testScanFallbackOnRebalancing() throws Exception {
+        scanFallbackOnRebalancing(false);
+    }
+
+    /**
+     * @param cur If {@code true} tests query cursor.
+     * @throws Exception In case of error.
+     */
+    private void scanFallbackOnRebalancing(final boolean cur) throws Exception {
         cacheMode = CacheMode.PARTITIONED;
         clientMode = false;
         backups = 2;
@@ -244,15 +239,21 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
                         IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
 
+                        int cntr = 0;
+
                         while (!done.get()) {
                             int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
 
-                            info("Running query [node=" + nodeId + ", part=" + part + ']');
+                            if (cntr++ % 100 == 0)
+                                info("Running query [node=" + nodeId + ", part=" + part +
']');
 
-                            try (QueryCursor<Cache.Entry<Integer, Integer>> cur
=
+                            try (QueryCursor<Cache.Entry<Integer, Integer>> cur0
=
                                      cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5)))
{
 
-                                doTestScanQueryCursor(cur, part);
+                                if (cur)
+                                    doTestScanQueryCursor(cur0, part);
+                                else
+                                    doTestScanQuery(cur0, part);
                             }
                         }
 
@@ -278,7 +279,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
      *
      * @throws Exception In case of error.
      */
-    public void testScanFallbackOnRebalancingCursor() throws Exception {
+    public void testScanFallbackOnRebalancingCursor1() throws Exception {
         cacheMode = CacheMode.PARTITIONED;
         clientMode = false;
         backups = 1;
@@ -315,13 +316,16 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
                         IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
 
+                        int cntr = 0;
+
                         while (!done.get()) {
                             int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
 
-                            info("Running query [node=" + nodeId + ", part=" + part + ']');
+                            if (cntr++ % 100 == 0)
+                                info("Running query [node=" + nodeId + ", part=" + part +
']');
 
                             try (QueryCursor<Cache.Entry<Integer, Integer>> cur
=
-                                cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5)))
{
+                                     cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5)))
{
 
                                 doTestScanQueryCursor(cur, part);
                             }
@@ -340,95 +344,15 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
-     * Scan should try first remote node and fallbacks to second remote node.
-     *
      * @throws Exception If failed.
      */
-    public void testScanFallback() throws Exception {
-        cacheMode = CacheMode.PARTITIONED;
-        backups = 1;
-        commSpiFactory = new TestFallbackCommunicationSpiFactory();
-
-        final Set<Integer> candidates = new TreeSet<>();
-
-        final AtomicBoolean test = new AtomicBoolean(false);
-
-        for(int j = 0; j < 2; j++) {
-            clientMode = true;
-
-            latch = new CountDownLatch(1);
-
-            try {
-                final Ignite ignite0 = startGrid(0);
-
-                clientMode = false;
-
-                final IgniteEx ignite1 = startGrid(1);
-                final IgniteEx ignite2 = startGrid(2);
-                startGrid(3);
-
-                if (test.get()) {
-                    expNodeId = ignite1.localNode().id();
-                    expFallbackNodeId = ignite2.localNode().id();
-                }
-
-                final IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite0);
-
-                if (!test.get()) {
-                    candidates.addAll(localPartitions(ignite1));
-
-                    candidates.retainAll(localPartitions(ignite2));
-                }
-
-                Runnable run = new Runnable() {
-                    @Override public void run() {
-                        try {
-                            startGrid(4);
-                            startGrid(5);
-
-                            awaitPartitionMapExchange();
-
-                            if (!test.get()) {
-                                candidates.removeAll(localPartitions(ignite1));
-
-                                F.retain(candidates, false, localPartitions(ignite2));
-                            }
-
-                            latch.countDown();
-                        }
-                        catch (Exception e) {
-                            e.printStackTrace();
-                        }
-
-                    }
-                };
-
-                Integer part = null;
-                CacheQuery<Map.Entry<Integer, Integer>> qry = null;
-
-                if (test.get()) {
-                    part = F.first(candidates);
-
-                    qry = cache.context().queries().createScanQuery(null, part, false);
-                }
-
-                new Thread(run).start();
-
-                if (test.get())
-                    doTestScanQuery(qry, part);
-                else
-                    latch.await();
-            }
-            finally {
-                test.set(true);
-
-                stopAllGrids();
-            }
-        }
+    public void testScanFallbackOnRebalancingCursor2() throws Exception {
+        scanFallbackOnRebalancing(true);
     }
 
     /**
      * @param ignite Ignite.
+     * @return Cache.
      */
     protected IgniteCacheProxy<Integer, Integer> fillCache(Ignite ignite) {
         IgniteCacheProxy<Integer, Integer> cache =
@@ -452,16 +376,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
     /**
      * @param qry Query.
+     * @param part Partition.
      */
-    protected void doTestScanQuery(CacheQuery<Map.Entry<Integer, Integer>> qry,
int part)
-        throws IgniteCheckedException {
-        CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
-
-        Collection<Map.Entry<Integer, Integer>> qryEntries = fut.get();
+    protected void doTestScanQuery(QueryCursor<Cache.Entry<Integer, Integer>>
qry, int part) {
+        Collection<Cache.Entry<Integer, Integer>> qryEntries = qry.getAll();
 
         Map<Integer, Integer> map = entries.get(part);
 
-        for (Map.Entry<Integer, Integer> e : qryEntries)
+        for (Cache.Entry<Integer, Integer> e : qryEntries)
             assertEquals(map.get(e.getKey()), e.getValue());
 
         assertEquals("Invalid number of entries for partition: " + part, map.size(), qryEntries.size());
@@ -472,7 +394,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
      * @param part Partition number.
      */
     protected void doTestScanQueryCursor(
-        QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) throws IgniteCheckedException
{
+        QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) {
 
         Map<Integer, Integer> map = entries.get(part);
 
@@ -491,6 +413,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
     /**
      * @param cctx Cctx.
+     * @return Local partition.
      */
     private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {
         return F.first(cctx.topology().localPartitions()).id();
@@ -498,6 +421,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
     /**
      * @param cctx Cctx.
+     * @return Remote partition.
      */
     private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx)
{
         ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes());
@@ -513,6 +437,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
     /**
      * @param ignite Ignite.
+     * @return Local partitions.
      */
     private Set<Integer> localPartitions(Ignite ignite) {
         GridCacheContext cctx = ((IgniteCacheProxy)ignite.cache(null)).context();
@@ -536,7 +461,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
      */
     private interface CommunicationSpiFactory {
         /**
-         * Creates communication SPI instance.
+         * @return Communication SPI instance.
          */
         TcpCommunicationSpi create();
     }
@@ -549,13 +474,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
         @Override public TcpCommunicationSpi create() {
             return new TcpCommunicationSpi() {
                 @Override public void sendMessage(ClusterNode node, Message msg,
-                    IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException
{
+                    IgniteInClosure<IgniteException> ackC) throws IgniteSpiException
{
                     Object origMsg = ((GridIoMessage)msg).message();
 
                     if (origMsg instanceof GridCacheQueryRequest)
                         fail(); //should use local node
 
-                    super.sendMessage(node, msg, ackClosure);
+                    super.sendMessage(node, msg, ackC);
                 }
             };
         }
@@ -569,44 +494,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
         @Override public TcpCommunicationSpi create() {
             return new TcpCommunicationSpi() {
                 @Override public void sendMessage(ClusterNode node, Message msg,
-                    IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException
{
+                    IgniteInClosure<IgniteException> ackC) throws IgniteSpiException
{
                     Object origMsg = ((GridIoMessage)msg).message();
 
                     if (origMsg instanceof GridCacheQueryRequest)
                         assertEquals(expNodeId, node.id());
 
-                    super.sendMessage(node, msg, ackClosure);
-                }
-            };
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestFallbackCommunicationSpiFactory implements CommunicationSpiFactory
{
-        /** {@inheritDoc} */
-        @Override public TcpCommunicationSpi create() {
-            return new TcpCommunicationSpi() {
-                @Override public void sendMessage(ClusterNode node, Message msg,
-                    IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException
{
-                    Object origMsg = ((GridIoMessage)msg).message();
-
-                    if (origMsg instanceof GridCacheQueryRequest) {
-                        if (latch.getCount() > 0)
-                            assertEquals(expNodeId, node.id());
-                        else
-                            assertEquals(expFallbackNodeId, node.id());
-
-                        try {
-                            latch.await();
-                        }
-                        catch (InterruptedException e) {
-                            throw new IgniteSpiException(e);
-                        }
-                    }
-
-                    super.sendMessage(node, msg, ackClosure);
+                    super.sendMessage(node, msg, ackC);
                 }
             };
         }


Mime
View raw message