ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [10/15] ignite git commit: IGNITE-1239 - Fixed scan query failover on changing topology.
Date Thu, 17 Sep 2015 07:48:24 GMT
IGNITE-1239 - Fixed scan query failover on changing topology.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f5220af
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f5220af
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f5220af

Branch: refs/heads/ignite-1282
Commit: 5f5220af0449fd3957d011fcef954b19da852e18
Parents: d928ef4
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 16 18:06:40 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 16 18:06:40 2015 +0300

----------------------------------------------------------------------
 .../query/GridCacheDistributedQueryFuture.java  |  27 ++-
 .../cache/query/GridCacheLocalQueryFuture.java  |   5 +
 .../cache/query/GridCacheQueryAdapter.java      | 170 ++++++++------
 .../query/GridCacheQueryFutureAdapter.java      |  11 +-
 .../cache/query/GridCacheQueryManager.java      |  30 ++-
 .../GridCacheSwapScanQueryAbstractSelfTest.java | 118 +++++-----
 ...CacheScanPartitionQueryFallbackSelfTest.java | 224 ++++++-------------
 7 files changed, 287 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5f5220af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 1d547c5..e745e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
@@ -142,9 +143,24 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
         }
 
         if (callOnPage)
-            // We consider node departure as a reception of last empty
-            // page from this node.
-            onPage(nodeId, Collections.emptyList(), null, true);
+            onPage(nodeId, Collections.emptyList(),
+                new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitFirstPage() throws IgniteCheckedException {
+        try {
+            firstPageLatch.await();
+
+            if (isDone() && error() != null)
+                // Throw the exception if future failed.
+                get();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -229,9 +245,12 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
 
     /** {@inheritDoc} */
     @Override public boolean onDone(Collection<R> res, Throwable err) {
+        boolean done = super.onDone(res, err);
+
+        // Must release the lath after onDone() in order for a waiting thread to see an exception, if any.
         firstPageLatch.countDown();
 
-        return super.onDone(res, err);
+        return done;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f5220af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index 46af18a..248dfa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -77,6 +77,11 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public void awaitFirstPage() throws IgniteCheckedException {
+        get();
+    }
+
     /** */
     private class LocalQueryRunnable implements GridPlainRunnable {
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f5220af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 3ac5746..855e239 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -34,6 +35,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
@@ -41,14 +43,13 @@ import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.jetbrains.annotations.Nullable;
@@ -63,13 +64,6 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
  * Query adapter.
  */
 public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
-    /** Is local node predicate. */
-    private static final IgnitePredicate<ClusterNode> IS_LOC_NODE = new IgnitePredicate<ClusterNode>() {
-        @Override public boolean apply(ClusterNode n) {
-            return n.isLocal();
-        }
-    };
-
     /** */
     private final GridCacheContext<?, ?> cctx;
 
@@ -446,7 +440,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
-        if (nodes.isEmpty())
+        if (nodes.isEmpty() && (type != SCAN || part == null))
             return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException());
 
         if (log.isDebugEnabled())
@@ -477,8 +471,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         if (type == SQL_FIELDS || type == SPI)
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
-        else if (type == SCAN && part != null && nodes.size() > 1)
-            return new CacheQueryFallbackFuture<>(nodes, part, bean, qryMgr, cctx);
+        else if (type == SCAN && part != null && !cctx.isLocal())
+            return new CacheQueryFallbackFuture<>(part, bean, qryMgr, cctx);
         else
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
     }
@@ -581,37 +575,48 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         /** Partition. */
         private final int part;
 
+        /** Flag indicating that a first item has been returned to a user. */
+        private boolean firstItemReturned;
+
         /**
-         * @param nodes Backups.
          * @param part Partition.
          * @param bean Bean.
          * @param qryMgr Query manager.
          * @param cctx Cache context.
          */
-        public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, int part, GridCacheQueryBean bean,
+        private CacheQueryFallbackFuture(int part, GridCacheQueryBean bean,
             GridCacheQueryManager qryMgr, GridCacheContext cctx) {
-            this.nodes = fallbacks(nodes);
             this.bean = bean;
             this.qryMgr = qryMgr;
             this.cctx = cctx;
             this.part = part;
 
+            nodes = fallbacks(cctx.discovery().topologyVersionEx());
+
             init();
         }
 
         /**
-         * @param nodes Nodes.
+         * @param topVer Topology version.
          * @return Nodes for query execution.
          */
-        private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) {
-            Queue<ClusterNode> fallbacks = new LinkedList<>();
+        private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
+            Deque<ClusterNode> fallbacks = new LinkedList<>();
+            Collection<ClusterNode> owners = new HashSet<>();
 
-            ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
+            for (ClusterNode node : cctx.topology().owners(part, topVer)) {
+                if (node.isLocal())
+                    fallbacks.addFirst(node);
+                else
+                    fallbacks.add(node);
 
-            if (node != null)
-                fallbacks.add(node);
+                owners.add(node);
+            }
 
-            fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes);
+            for (ClusterNode node : cctx.topology().moving(part)) {
+                if (!owners.contains(node))
+                    fallbacks.add(node);
+            }
 
             return fallbacks;
         }
@@ -623,53 +628,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         private void init() {
             final ClusterNode node = nodes.poll();
 
-            GridCacheQueryFutureAdapter<?, ?, R> fut0 = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
+            fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
                 qryMgr.queryLocal(bean) :
                 qryMgr.queryDistributed(bean, Collections.singleton(node)));
-
-            fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
-                @Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
-                    try {
-                        onDone(fut.get());
-                    }
-                    catch (IgniteClientDisconnectedCheckedException e) {
-                        onDone(e);
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
-                            unreservedTopVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
-
-                            assert unreservedTopVer != null;
-                        }
-
-                        if (F.isEmpty(nodes)) {
-                            final AffinityTopologyVersion topVer = unreservedTopVer;
-
-                            if (topVer != null && --unreservedNodesRetryCnt > 0) {
-                                cctx.affinity().affinityReadyFuture(topVer).listen(
-                                    new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                                        @Override public void apply(
-                                            IgniteInternalFuture<AffinityTopologyVersion> future) {
-
-                                            nodes = fallbacks(cctx.topology().owners(part, topVer));
-
-                                            // Race is impossible here because query retries are executed one by one.
-                                            unreservedTopVer = null;
-
-                                            init();
-                                        }
-                                    });
-                            }
-                            else
-                                onDone(e);
-                        }
-                        else
-                            init();
-                    }
-                }
-            });
-
-            fut = fut0;
         }
 
         /** {@inheritDoc} */
@@ -683,8 +644,81 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         }
 
         /** {@inheritDoc} */
+        @Override public Collection<R> get() throws IgniteCheckedException {
+            assert false;
+
+            return super.get();
+        }
+
+        /** {@inheritDoc} */
         @Override public R next() {
-            return fut.next();
+            if (firstItemReturned)
+                return fut.next();
+
+            while (true) {
+                try {
+                    fut.awaitFirstPage();
+
+                    firstItemReturned = true;
+
+                    return fut.next();
+                }
+                catch (IgniteClientDisconnectedCheckedException e) {
+                    throw CU.convertToCacheException(e);
+                }
+                catch (IgniteCheckedException e) {
+                    retryIfPossible(e);
+                }
+            }
+        }
+
+        /**
+         * @param e Exception for query run.
+         */
+        private void retryIfPossible(IgniteCheckedException e) {
+            try {
+                IgniteInternalFuture<?> retryFut;
+
+                if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
+                    AffinityTopologyVersion waitVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
+
+                    assert waitVer != null;
+
+                    retryFut = cctx.affinity().affinityReadyFuture(waitVer);
+                }
+                else if (e.hasCause(ClusterTopologyCheckedException.class)) {
+                    ClusterTopologyCheckedException topEx = X.cause(e, ClusterTopologyCheckedException.class);
+
+                    retryFut = topEx.retryReadyFuture();
+                }
+                else if (e.hasCause(ClusterGroupEmptyCheckedException.class)) {
+                    ClusterGroupEmptyCheckedException ex = X.cause(e, ClusterGroupEmptyCheckedException.class);
+
+                    retryFut = ex.retryReadyFuture();
+                }
+                else
+                    throw CU.convertToCacheException(e);
+
+                if (F.isEmpty(nodes)) {
+                    if (--unreservedNodesRetryCnt > 0) {
+                        if (retryFut != null)
+                            retryFut.get();
+
+                        nodes = fallbacks(unreservedTopVer == null ? cctx.discovery().topologyVersionEx() : unreservedTopVer);
+
+                        unreservedTopVer = null;
+
+                        init();
+                    }
+                    else
+                        throw CU.convertToCacheException(e);
+                }
+                else
+                    init();
+            }
+            catch (IgniteCheckedException ex) {
+                throw CU.convertToCacheException(ex);
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f5220af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index ad9ee39..2a4fbda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -183,6 +183,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     }
 
     /**
+     * Waits for the first page to be received from remote node(s), if any.
+     *
+     * @throws IgniteCheckedException If query execution failed with an error.
+     */
+    public abstract void awaitFirstPage() throws IgniteCheckedException;
+
+    /**
      * Returns next page for the query.
      *
      * @return Next page or {@code null} if no more pages available.
@@ -380,13 +387,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
                 synchronized (mux) {
                     enqueue(Collections.emptyList());
 
-                    onPage(nodeId, true);
-
                     onDone(nodeId != null ?
                         new IgniteCheckedException("Failed to execute query on node [query=" + qry +
                             ", nodeId=" + nodeId + "]", err) :
                         new IgniteCheckedException("Failed to execute query locally: " + qry, err));
 
+                    onPage(nodeId, true);
+
                     mux.notifyAll();
                 }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f5220af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 1d934d8..25ace1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -33,8 +33,9 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CachePeekModes;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheOffheapSwapEntry;
@@ -819,8 +820,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
 
+        final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
+
         final ExpiryPolicy plc = cctx.expiry();
 
+        final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
         final boolean backups = qry.includeBackups() || cctx.isReplicated();
 
         final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
@@ -841,8 +846,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     else if (part < 0 || part >= cctx.affinity().partitions())
                         iter = F.emptyIterator();
                     else {
-                        AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
-
                         locPart = dht.topology().localPartition(part, topVer, false);
 
                         // double check for owning state
@@ -899,7 +902,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         V val;
 
                         try {
-                            val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+                            GridCacheEntryEx entry = cache.peekEx(key);
+
+                            CacheObject cacheVal =
+                                entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
+
+                            val = cacheVal != null ? (V)cacheVal.value(cctx.cacheObjectContext(), false) : null;
+                        }
+                        catch (GridCacheEntryRemovedException e) {
+                            val = null;
                         }
                         catch (IgniteCheckedException e) {
                             if (log.isDebugEnabled())
@@ -1557,7 +1568,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 }
             }
             catch (Throwable e) {
-                U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
+                if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+                    U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
 
                 onPageReady(loc, qryInfo, null, true, e);
 
@@ -1572,8 +1584,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
                         }
                         catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
-                                cctx.nodeId() + "]", e);
+                            if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+                                U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
+                                    cctx.nodeId() + "]", e);
                         }
                     }
                 }
@@ -1694,7 +1707,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     fut.get().closeIfNotShared(recipient(sndId, reqId));
                 }
                 catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to close iterator.", e);
+                    if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+                        U.error(log, "Failed to close iterator.", e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f5220af/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
index 6f4a4ab..94c8d00 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
@@ -19,13 +19,16 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -115,13 +118,13 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     public void testQuery() throws Exception {
-        checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false);
+        checkQuery(grid(0).cache(ATOMIC_CACHE_NAME), false);
 
-        checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false);
+        checkQuery(grid(0).cache(TRANSACTIONAL_CACHE_NAME), false);
 
-        checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true);
+        checkQuery(grid(0).cache(ATOMIC_CACHE_NAME), true);
 
-        checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true);
+        checkQuery(grid(0).cache(TRANSACTIONAL_CACHE_NAME), true);
     }
 
     /**
@@ -130,16 +133,18 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception {
+    private void checkQuery(IgniteCache cache, boolean scanPartitions) throws Exception {
         final int ENTRY_CNT = 500;
 
         Map<Integer, Map<Key, Person>> entries = new HashMap<>();
 
+        Affinity<Object> aff = ignite(0).affinity(cache.getName());
+
         for (int i = 0; i < ENTRY_CNT; i++) {
             Key key = new Key(i);
             Person val = new Person("p-" + i, i);
 
-            int part = cache.context().affinity().partition(key);
+            int part = aff.partition(key);
 
             cache.getAndPut(key, val);
 
@@ -152,24 +157,26 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
         }
 
         try {
-            int partitions = scanPartitions ? cache.context().affinity().partitions() : 1;
+            int partitions = scanPartitions ? aff.partitions() : 1;
 
             for (int i = 0; i < partitions; i++) {
-                CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
-                    new IgniteBiPredicate<Key, Person>() {
-                        @Override public boolean apply(Key key, Person p) {
-                            assertEquals(key.id, (Integer)p.salary);
+                ScanQuery<Key, Person> qry = new ScanQuery<>(new IgniteBiPredicate<Key, Person>() {
+                    @Override public boolean apply(Key key, Person p) {
+                        assertEquals(key.id, (Integer)p.salary);
+
+                        return key.id % 2 == 0;
+                    }
+                });
 
-                            return key.id % 2 == 0;
-                        }
-                    }, (scanPartitions ? i : null), false);
+                if (scanPartitions)
+                    qry.setPartition(i);
 
-                Collection<Map.Entry<Key, Person>> res = qry.execute().get();
+                List<Cache.Entry<Key, Person>> res = cache.query(qry).getAll();
 
                 if (!scanPartitions)
                     assertEquals(ENTRY_CNT / 2, res.size());
 
-                for (Map.Entry<Key, Person> e : res) {
+                for (Cache.Entry<Key, Person> e : res) {
                     Key k = e.getKey();
                     Person p = e.getValue();
 
@@ -183,9 +190,12 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
                     }
                 }
 
-                qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false);
+                qry = new ScanQuery<>();
+
+                if (scanPartitions)
+                    qry.setPartition(i);
 
-                res = qry.execute().get();
+                res = cache.query(qry).getAll();
 
                 if (!scanPartitions)
                     assertEquals(ENTRY_CNT, res.size());
@@ -204,23 +214,22 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @param expCnt Expected entries in query result.
      * @throws Exception If failed.
      */
-    private void testMultithreaded(final GridCacheAdapter cache, final int expCnt) throws Exception {
+    private void testMultithreaded(final IgniteCache cache, final int expCnt) throws Exception {
         log.info("Starting multithreaded queries.");
 
         GridTestUtils.runMultiThreaded(new Callable<Void>() {
             @SuppressWarnings("unchecked")
             @Override public Void call() throws Exception {
-                CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
-                    new IgniteBiPredicate<Key, Person>() {
-                        @Override public boolean apply(Key key, Person p) {
-                            assertEquals(key.id, (Integer)p.salary);
+                ScanQuery<Key, Person> qry = new ScanQuery<>(new IgniteBiPredicate<Key, Person>() {
+                    @Override public boolean apply(Key key, Person p) {
+                        assertEquals(key.id, (Integer)p.salary);
 
-                            return key.id % 2 == 0;
-                        }
-                    }, null, false);
+                        return key.id % 2 == 0;
+                    }
+                });
 
                 for (int i = 0; i < 250; i++) {
-                    Collection<Map.Entry<Key, Person>> res = qry.execute().get();
+                    List<Cache.Entry<Key, Person>> res = cache.query(qry).getAll();
 
                     assertEquals(expCnt, res.size());
 
@@ -237,9 +246,9 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     public void testQueryPrimitives() throws Exception {
-        checkQueryPrimitives(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME));
+        checkQueryPrimitives(grid(0).cache(ATOMIC_CACHE_NAME));
 
-        checkQueryPrimitives(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME));
+        checkQueryPrimitives(grid(0).cache(TRANSACTIONAL_CACHE_NAME));
     }
 
     /**
@@ -247,27 +256,26 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void checkQueryPrimitives(GridCacheAdapter cache) throws Exception {
+    private void checkQueryPrimitives(IgniteCache cache) throws Exception {
         final int ENTRY_CNT = 500;
 
         for (int i = 0; i < ENTRY_CNT; i++)
-            cache.getAndPut(String.valueOf(i), (long) i);
+            cache.getAndPut(String.valueOf(i), (long)i);
 
         try {
-            CacheQuery<Map.Entry<String, Long>> qry = cache.context().queries().createScanQuery(
-                new IgniteBiPredicate<String, Long>() {
-                    @Override public boolean apply(String key, Long val) {
-                        assertEquals(key, String.valueOf(val));
+            ScanQuery<String, Long> qry = new ScanQuery<>(new IgniteBiPredicate<String, Long>() {
+                @Override public boolean apply(String key, Long val) {
+                    assertEquals(key, String.valueOf(val));
 
-                        return val % 2 == 0;
-                    }
-                }, null, false);
+                    return val % 2 == 0;
+                }
+            });
 
-            Collection<Map.Entry<String, Long>> res = qry.execute().get();
+            Collection<Cache.Entry<String, Long>> res = cache.query(qry).getAll();
 
             assertEquals(ENTRY_CNT / 2, res.size());
 
-            for (Map.Entry<String, Long> e : res) {
+            for (Cache.Entry<String, Long> e : res) {
                 String key = e.getKey();
                 Long val = e.getValue();
 
@@ -276,9 +284,9 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
                 assertEquals(0, val % 2);
             }
 
-            qry = cache.context().queries().createScanQuery(null, null, false);
+            qry = new ScanQuery<>();
 
-            res = qry.execute().get();
+            res = cache.query(qry).getAll();
 
             assertEquals(ENTRY_CNT, res.size());
         }
@@ -292,9 +300,9 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     public void testQueryValueByteArray() throws Exception {
-        checkQueryValueByteArray(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME));
+        checkQueryValueByteArray(grid(0).cache(ATOMIC_CACHE_NAME));
 
-        checkQueryValueByteArray(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME));
+        checkQueryValueByteArray(grid(0).cache(TRANSACTIONAL_CACHE_NAME));
     }
 
     /**
@@ -302,27 +310,27 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void checkQueryValueByteArray(GridCacheAdapter cache) throws Exception {
+    private void checkQueryValueByteArray(IgniteCache cache) throws Exception {
         final int ENTRY_CNT = 100;
 
         for (int i = 0; i < ENTRY_CNT; i++)
             cache.getAndPut(i, new byte[i]);
 
         try {
-            CacheQuery<Map.Entry<Integer, byte[]>> qry = cache.context().queries().createScanQuery(
+            ScanQuery<Integer, byte[]> qry = new ScanQuery<>(
                 new IgniteBiPredicate<Integer, byte[]>() {
                     @Override public boolean apply(Integer key, byte[] val) {
                         assertEquals(key, (Integer)val.length);
 
                         return key % 2 == 0;
                     }
-                }, null, false);
+                });
 
-            Collection<Map.Entry<Integer, byte[]>> res = qry.execute().get();
+            Collection<Cache.Entry<Integer, byte[]>> res = cache.query(qry).getAll();
 
             assertEquals(ENTRY_CNT / 2, res.size());
 
-            for (Map.Entry<Integer, byte[]> e : res) {
+            for (Cache.Entry<Integer, byte[]> e : res) {
                 Integer key = e.getKey();
                 byte[] val = e.getValue();
 
@@ -331,9 +339,9 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
                 assertEquals(0, key % 2);
             }
 
-            qry = cache.context().queries().createScanQuery(null, null, false);
+            qry = new ScanQuery<>();
 
-            res = qry.execute().get();
+            res = cache.query(qry).getAll();
 
             assertEquals(ENTRY_CNT, res.size());
         }
@@ -419,9 +427,9 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            int result = name != null ? name.hashCode() : 0;
+            int res = name != null ? name.hashCode() : 0;
 
-            return 31 * result + salary;
+            return 31 * res + salary;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f5220af/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 1ef470a..02b213e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -22,36 +22,29 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -91,18 +84,15 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     /** Expected first node ID. */
     private static UUID expNodeId;
 
-    /** Expected fallback node ID. */
-    private static UUID expFallbackNodeId;
-
     /** Communication SPI factory. */
     private CommunicationSpiFactory commSpiFactory;
 
-    /** Latch. */
-    private static CountDownLatch latch;
-
     /** Test entries. */
     private Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
 
+    /** */
+    private boolean syncRebalance;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -120,6 +110,10 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
         ccfg.setCacheMode(cacheMode);
         ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
         ccfg.setBackups(backups);
+
+        if (syncRebalance)
+            ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
         ccfg.setNearConfiguration(null);
 
         cfg.setCacheConfiguration(ccfg);
@@ -144,7 +138,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
             int part = anyLocalPartition(cache.context());
 
-            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+            QueryCursor<Cache.Entry<Integer, Integer>> qry =
+                cache.query(new ScanQuery<Integer, Integer>().setPartition(part));
 
             doTestScanQuery(qry, part);
         }
@@ -174,7 +169,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
             expNodeId = tup.get2();
 
-            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+            QueryCursor<Cache.Entry<Integer, Integer>> qry =
+                cache.query(new ScanQuery<Integer, Integer>().setPartition(part));
 
             doTestScanQuery(qry, part);
         }
@@ -184,16 +180,22 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
-     * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with
-     * scan query.
-     *
      * @throws Exception In case of error.
      */
     public void testScanFallbackOnRebalancing() throws Exception {
+        scanFallbackOnRebalancing(false);
+    }
+
+    /**
+     * @param cur If {@code true} tests query cursor.
+     * @throws Exception In case of error.
+     */
+    private void scanFallbackOnRebalancing(final boolean cur) throws Exception {
         cacheMode = CacheMode.PARTITIONED;
         clientMode = false;
-        backups = 1;
+        backups = 2;
         commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
+        syncRebalance = true;
 
         try {
             Ignite ignite = startGrids(GRID_CNT);
@@ -214,6 +216,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
                             Thread.sleep(3000);
 
+                            info("Will stop grid: " + getTestGridName(id));
+
                             stopGrid(id);
 
                             if (done.get())
@@ -224,7 +228,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
                         return null;
                     }
-                }, GRID_CNT);
+                }, 2);
 
             final AtomicInteger nodeIdx = new AtomicInteger();
 
@@ -233,18 +237,24 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
                     @Override public Object call() throws Exception {
                         int nodeId = nodeIdx.getAndIncrement();
 
-                        IgniteCacheProxy<Integer, Integer> cache = (IgniteCacheProxy<Integer, Integer>)
-                            grid(nodeId).<Integer, Integer>cache(null);
+                        IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
+
+                        int cntr = 0;
 
                         while (!done.get()) {
-                            IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context());
+                            int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
 
-                            int part = tup.get1();
+                            if (cntr++ % 100 == 0)
+                                info("Running query [node=" + nodeId + ", part=" + part + ']');
 
-                            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(
-                                null, part, false);
+                            try (QueryCursor<Cache.Entry<Integer, Integer>> cur0 =
+                                     cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
 
-                            doTestScanQuery(qry, part);
+                                if (cur)
+                                    doTestScanQueryCursor(cur0, part);
+                                else
+                                    doTestScanQuery(cur0, part);
+                            }
                         }
 
                         return null;
@@ -269,9 +279,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
      *
      * @throws Exception In case of error.
      */
-    public void testScanFallbackOnRebalancingCursor() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1239");
-
+    public void testScanFallbackOnRebalancingCursor1() throws Exception {
         cacheMode = CacheMode.PARTITIONED;
         clientMode = false;
         backups = 1;
@@ -308,15 +316,19 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
                         IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
 
+                        int cntr = 0;
+
                         while (!done.get()) {
                             int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
 
-                            QueryCursor<Cache.Entry<Integer, Integer>> cur =
-                                cache.query(new ScanQuery<Integer, Integer>(part));
+                            if (cntr++ % 100 == 0)
+                                info("Running query [node=" + nodeId + ", part=" + part + ']');
 
-                            U.debug(log, "Running query [node=" + nodeId + ", part=" + part + ']');
+                            try (QueryCursor<Cache.Entry<Integer, Integer>> cur =
+                                     cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
 
-                            doTestScanQueryCursor(cur, part);
+                                doTestScanQueryCursor(cur, part);
+                            }
                         }
 
                         return null;
@@ -332,95 +344,15 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
-     * Scan should try first remote node and fallbacks to second remote node.
-     *
      * @throws Exception If failed.
      */
-    public void testScanFallback() throws Exception {
-        cacheMode = CacheMode.PARTITIONED;
-        backups = 1;
-        commSpiFactory = new TestFallbackCommunicationSpiFactory();
-
-        final Set<Integer> candidates = new TreeSet<>();
-
-        final AtomicBoolean test = new AtomicBoolean(false);
-
-        for(int j = 0; j < 2; j++) {
-            clientMode = true;
-
-            latch = new CountDownLatch(1);
-
-            try {
-                final Ignite ignite0 = startGrid(0);
-
-                clientMode = false;
-
-                final IgniteEx ignite1 = startGrid(1);
-                final IgniteEx ignite2 = startGrid(2);
-                startGrid(3);
-
-                if (test.get()) {
-                    expNodeId = ignite1.localNode().id();
-                    expFallbackNodeId = ignite2.localNode().id();
-                }
-
-                final IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite0);
-
-                if (!test.get()) {
-                    candidates.addAll(localPartitions(ignite1));
-
-                    candidates.retainAll(localPartitions(ignite2));
-                }
-
-                Runnable run = new Runnable() {
-                    @Override public void run() {
-                        try {
-                            startGrid(4);
-                            startGrid(5);
-
-                            awaitPartitionMapExchange();
-
-                            if (!test.get()) {
-                                candidates.removeAll(localPartitions(ignite1));
-
-                                F.retain(candidates, false, localPartitions(ignite2));
-                            }
-
-                            latch.countDown();
-                        }
-                        catch (Exception e) {
-                            e.printStackTrace();
-                        }
-
-                    }
-                };
-
-                Integer part = null;
-                CacheQuery<Map.Entry<Integer, Integer>> qry = null;
-
-                if (test.get()) {
-                    part = F.first(candidates);
-
-                    qry = cache.context().queries().createScanQuery(null, part, false);
-                }
-
-                new Thread(run).start();
-
-                if (test.get())
-                    doTestScanQuery(qry, part);
-                else
-                    latch.await();
-            }
-            finally {
-                test.set(true);
-
-                stopAllGrids();
-            }
-        }
+    public void testScanFallbackOnRebalancingCursor2() throws Exception {
+        scanFallbackOnRebalancing(true);
     }
 
     /**
      * @param ignite Ignite.
+     * @return Cache.
      */
     protected IgniteCacheProxy<Integer, Integer> fillCache(Ignite ignite) {
         IgniteCacheProxy<Integer, Integer> cache =
@@ -444,16 +376,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
     /**
      * @param qry Query.
+     * @param part Partition.
      */
-    protected void doTestScanQuery(CacheQuery<Map.Entry<Integer, Integer>> qry, int part)
-        throws IgniteCheckedException {
-        CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
-
-        Collection<Map.Entry<Integer, Integer>> qryEntries = fut.get();
+    protected void doTestScanQuery(QueryCursor<Cache.Entry<Integer, Integer>> qry, int part) {
+        Collection<Cache.Entry<Integer, Integer>> qryEntries = qry.getAll();
 
         Map<Integer, Integer> map = entries.get(part);
 
-        for (Map.Entry<Integer, Integer> e : qryEntries)
+        for (Cache.Entry<Integer, Integer> e : qryEntries)
             assertEquals(map.get(e.getKey()), e.getValue());
 
         assertEquals("Invalid number of entries for partition: " + part, map.size(), qryEntries.size());
@@ -464,7 +394,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
      * @param part Partition number.
      */
     protected void doTestScanQueryCursor(
-        QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) throws IgniteCheckedException {
+        QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) {
 
         Map<Integer, Integer> map = entries.get(part);
 
@@ -483,6 +413,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
     /**
      * @param cctx Cctx.
+     * @return Local partition.
      */
     private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {
         return F.first(cctx.topology().localPartitions()).id();
@@ -490,6 +421,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
     /**
      * @param cctx Cctx.
+     * @return Remote partition.
      */
     private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx) {
         ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes());
@@ -505,6 +437,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
     /**
      * @param ignite Ignite.
+     * @return Local partitions.
      */
     private Set<Integer> localPartitions(Ignite ignite) {
         GridCacheContext cctx = ((IgniteCacheProxy)ignite.cache(null)).context();
@@ -528,7 +461,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
      */
     private interface CommunicationSpiFactory {
         /**
-         * Creates communication SPI instance.
+         * @return Communication SPI instance.
          */
         TcpCommunicationSpi create();
     }
@@ -541,13 +474,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
         @Override public TcpCommunicationSpi create() {
             return new TcpCommunicationSpi() {
                 @Override public void sendMessage(ClusterNode node, Message msg,
-                    IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+                    IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
                     Object origMsg = ((GridIoMessage)msg).message();
 
                     if (origMsg instanceof GridCacheQueryRequest)
                         fail(); //should use local node
 
-                    super.sendMessage(node, msg, ackClosure);
+                    super.sendMessage(node, msg, ackC);
                 }
             };
         }
@@ -561,44 +494,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
         @Override public TcpCommunicationSpi create() {
             return new TcpCommunicationSpi() {
                 @Override public void sendMessage(ClusterNode node, Message msg,
-                    IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+                    IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
                     Object origMsg = ((GridIoMessage)msg).message();
 
                     if (origMsg instanceof GridCacheQueryRequest)
                         assertEquals(expNodeId, node.id());
 
-                    super.sendMessage(node, msg, ackClosure);
-                }
-            };
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestFallbackCommunicationSpiFactory implements CommunicationSpiFactory {
-        /** {@inheritDoc} */
-        @Override public TcpCommunicationSpi create() {
-            return new TcpCommunicationSpi() {
-                @Override public void sendMessage(ClusterNode node, Message msg,
-                    IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
-                    Object origMsg = ((GridIoMessage)msg).message();
-
-                    if (origMsg instanceof GridCacheQueryRequest) {
-                        if (latch.getCount() > 0)
-                            assertEquals(expNodeId, node.id());
-                        else
-                            assertEquals(expFallbackNodeId, node.id());
-
-                        try {
-                            latch.await();
-                        }
-                        catch (InterruptedException e) {
-                            throw new IgniteSpiException(e);
-                        }
-                    }
-
-                    super.sendMessage(node, msg, ackClosure);
+                    super.sendMessage(node, msg, ackC);
                 }
             };
         }


Mime
View raw message