ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [3/4] incubator-ignite git commit: ignite-921 Create scan query able to iterate over single partition
Date Wed, 27 May 2015 16:40:26 GMT
ignite-921 Create scan query able to iterate over single partition


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b58bb122
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b58bb122
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b58bb122

Branch: refs/heads/ignite-389
Commit: b58bb122da1d4c196b8125a28a4c1df33a9fc82f
Parents: 982235b
Author: agura <agura@gridgain.com>
Authored: Mon May 25 20:26:04 2015 +0300
Committer: agura <agura@gridgain.com>
Committed: Tue May 26 21:18:49 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  15 +-
 .../processors/cache/GridCacheSwapManager.java  |  55 ++++-
 .../processors/cache/IgniteCacheProxy.java      |   2 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   7 +
 .../processors/cache/query/CacheQuery.java      |   2 +-
 .../query/GridCacheDistributedQueryManager.java |   3 +
 .../cache/query/GridCacheQueryAdapter.java      | 228 ++++++++++++++++++-
 .../cache/query/GridCacheQueryManager.java      | 200 +++++++++-------
 .../cache/query/GridCacheQueryRequest.java      |  31 ++-
 .../datastructures/GridCacheSetImpl.java        |   4 +-
 ...achePartitionedPreloadLifecycleSelfTest.java |   2 +-
 ...CacheReplicatedPreloadLifecycleSelfTest.java |   6 +-
 .../GridCacheSwapScanQueryAbstractSelfTest.java | 112 ++++++---
 ...CacheScanPartitionQueryFallbackSelfTest.java | 213 +++++++++++++++++
 .../cache/IgniteCacheAbstractQuerySelfTest.java |  53 ++++-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 16 files changed, 795 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d390037..d7cec9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1317,8 +1317,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
             final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();
 
-            IgniteInternalFuture<Object> readFut =
-                readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() {
+            IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null,
+                subjId, taskName, new CI2<KeyCacheObject, Object>() {
                     /** Version for all loaded entries. */
                     private GridCacheVersion nextVer = ctx.versions().next();
 
@@ -1948,7 +1948,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param filter Optional filter.
      * @return Put operation future.
      */
-    public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) {
+    public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val,
+        @Nullable final CacheEntryPredicate... filter) {
         A.notNull(key, "key", val, "val");
 
         if (keyCheck)
@@ -3117,7 +3118,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException {
+    @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout)
+        throws IgniteCheckedException {
         if (F.isEmpty(keys))
             return true;
 
@@ -3689,7 +3691,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
             return localIteratorHonorExpirePolicy(opCtx);
 
-        CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable())
+        CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable())
             .keepAll(false)
             .execute();
 
@@ -3918,7 +3920,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                 return t;
             }
-            catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | IgniteTxRollbackCheckedException e) {
+            catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
+                IgniteTxRollbackCheckedException e) {
                 throw e;
             }
             catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..e4b1cbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1211,7 +1211,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         checkIteratorQueue();
 
         if (offHeapEnabled() && !swapEnabled())
-            return rawOffHeapIterator(true, true);
+            return rawOffHeapIterator(null, true, true);
 
         if (swapEnabled() && !offHeapEnabled())
             return rawSwapIterator(true, true);
@@ -1227,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             private Map.Entry<byte[], byte[]> cur;
 
             {
-                it = rawOffHeapIterator(true, true);
+                it = rawOffHeapIterator(null, true, true);
 
                 advance();
             }
@@ -1554,11 +1554,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param c Key/value closure.
+     * @param part Partition.
      * @param primary Include primaries.
      * @param backup Include backups.
      * @return Off-heap iterator.
      */
     public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+        Integer part,
         boolean primary,
         boolean backup)
     {
@@ -1574,24 +1576,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
 
-        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
-            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        Set<Integer> parts;
+
+        if (part == null)
+            parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+                cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        else
+            parts = Collections.singleton(part);
 
         return new CloseablePartitionsIterator<T, T>(parts) {
             @Override protected GridCloseableIterator<T> partitionIterator(int part)
-                throws IgniteCheckedException
-            {
+                throws IgniteCheckedException {
                 return offheap.iterator(spaceName, c, part);
             }
         };
     }
 
     /**
+     *
+     * @param part Partition.
      * @param primary Include primaries.
      * @param backup Include backups.
      * @return Raw off-heap iterator.
      */
-    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary,
+    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Nullable Integer part,
+        final boolean primary,
         final boolean backup)
     {
         if (!offheapEnabled || (!primary && !backup))
@@ -1626,8 +1635,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
 
-        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
-            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        Set<Integer> parts;
+
+        if (part == null)
+            parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+                cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        else
+            parts = Collections.singleton(part);
 
         return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) {
             private Map.Entry<byte[], byte[]> cur;
@@ -1701,6 +1715,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param part Partition.
+     * @return Raw off-heap iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part)
+        throws IgniteCheckedException
+    {
+        if (!swapEnabled)
+            return new GridEmptyCloseableIterator<>();
+
+        checkIteratorQueue();
+
+        return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(
+            Collections.singleton(part)) {
+            @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return swapMgr.rawIterator(spaceName, part);
+            }
+        };
+    }
+
+    /**
      * @param primary If {@code true} includes primary entries.
      * @param backup If {@code true} includes backup entries.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f840015..0009127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -353,7 +353,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         if (filter instanceof ScanQuery) {
             IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
 
-            qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable);
+            qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable);
 
             if (grp != null)
                 qry.projection(grp);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 0749f66..8ac3809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>
     }
 
     /**
+     * @return Keys belonging to partition.
+     */
+    public Set<KeyCacheObject> keySet() {
+        return map.keySet();
+    }
+
+    /**
      * @return Entries belonging to partition.
      */
     public Collection<GridDhtCacheEntry> entries() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 0658828..2d2db1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -76,7 +76,7 @@ import org.jetbrains.annotations.*;
  *     </li>
  *     <li>
  *         Joins will work correctly only if joined objects are stored in
- *         collocated mode or at least one side of the join is stored in
+ *         colocated mode or at least one side of the join is stored in
  *         {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to
  *         {@link AffinityKey} javadoc for more information about colocation.
  *     </li>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index a579aab..2b93144 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 false,
                 null,
                 req.keyValueFilter(),
+                req.partition(),
                 req.className(),
                 req.clause(),
                 req.includeMetaData(),
@@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.query().clause(),
                 clsName,
                 qry.query().scanFilter(),
+                qry.query().partition(),
                 qry.reducer(),
                 qry.transform(),
                 qry.query().pageSize(),
@@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.query().clause(),
                 null,
                 null,
+                null,
                 qry.reducer(),
                 qry.transform(),
                 qry.query().pageSize(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 4b1fc87..05198a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -21,7 +21,9 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -31,6 +33,7 @@ import org.apache.ignite.plugin.security.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
 
@@ -56,6 +59,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /** */
     private final IgniteBiPredicate<Object, Object> filter;
 
+    /** Partition. */
+    private Integer part;
+
     /** */
     private final boolean incMeta;
 
@@ -95,6 +101,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param clsName Class name.
      * @param clause Clause.
      * @param filter Scan filter.
+     * @param part Partition.
      * @param incMeta Include metadata flag.
      * @param keepPortable Keep portable flag.
      */
@@ -103,6 +110,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         @Nullable String clsName,
         @Nullable String clause,
         @Nullable IgniteBiPredicate<Object, Object> filter,
+        @Nullable Integer part,
         boolean incMeta,
         boolean keepPortable) {
         assert cctx != null;
@@ -113,6 +121,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         this.clsName = clsName;
         this.clause = clause;
         this.filter = filter;
+        this.part = part;
         this.incMeta = incMeta;
         this.keepPortable = keepPortable;
 
@@ -132,6 +141,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param dedup Enable dedup flag.
      * @param prj Grid projection.
      * @param filter Key-value filter.
+     * @param part Partition.
      * @param clsName Class name.
      * @param clause Clause.
      * @param incMeta Include metadata flag.
@@ -149,6 +159,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         boolean dedup,
         ClusterGroup prj,
         IgniteBiPredicate<Object, Object> filter,
+        @Nullable Integer part,
         @Nullable String clsName,
         String clause,
         boolean incMeta,
@@ -165,6 +176,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         this.dedup = dedup;
         this.prj = prj;
         this.filter = filter;
+        this.part = part;
         this.clsName = clsName;
         this.clause = clause;
         this.incMeta = incMeta;
@@ -334,6 +346,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     }
 
     /**
+     * @return Partition.
+     */
+    @Nullable public Integer partition() {
+        return part;
+    }
+
+    /**
      * @throws IgniteCheckedException If query is invalid.
      */
     public void validate() throws IgniteCheckedException {
@@ -376,10 +395,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         return execute(null, rmtTransform, args);
     }
 
+    /** {@inheritDoc} */
     @Override public QueryMetrics metrics() {
         return metrics.copy();
     }
 
+    /** {@inheritDoc} */
     @Override public void resetMetrics() {
         metrics = new GridCacheQueryMetricsAdapter();
     }
@@ -418,18 +439,34 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         taskHash = cctx.kernalContext().job().currentTaskNameHash();
 
-        GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
+        final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
             (IgniteClosure<Object, Object>)rmtTransform, args);
 
-        GridCacheQueryManager qryMgr = cctx.queries();
+        final GridCacheQueryManager qryMgr = cctx.queries();
 
         boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
 
         if (type == SQL_FIELDS || type == SPI)
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
-        else
-            return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
+        else {
+            final CacheQueryFuture<R> fut =
+                (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
+
+            if (type == SCAN && part != null) {
+                assert nodes.size() == 1;
+
+                final Queue<ClusterNode> backups = new LinkedList<>(
+                    cctx.affinity().backups(part, cctx.affinity().affinityTopologyVersion()));
+
+                if (F.isEmpty(backups))
+                    return fut;
+
+                return new CacheQueryFallbackFuture<>(backups, bean, qryMgr, fut);
+            }
+
+            return fut;
+        }
     }
 
     /**
@@ -448,14 +485,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
             case REPLICATED:
                 if (prj != null)
-                    return nodes(cctx, prj);
+                    return nodes(cctx, prj, partition());
 
                 return cctx.affinityNode() ?
                     Collections.singletonList(cctx.localNode()) :
-                    Collections.singletonList(F.rand(nodes(cctx, null)));
+                    Collections.singletonList(F.rand(nodes(cctx, null, partition())));
 
             case PARTITIONED:
-                return nodes(cctx, prj);
+                return nodes(cctx, prj, partition());
 
             default:
                 throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
@@ -467,13 +504,17 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param prj Projection (optional).
      * @return Collection of data nodes in provided projection (if any).
      */
-    private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) {
+    private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
+        @Nullable final ClusterGroup prj, @Nullable final Integer part) {
         assert cctx != null;
 
         return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode n) {
+                AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
                 return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
-                    (prj == null || prj.node(n.id()) != null);
+                    (prj == null || prj.node(n.id()) != null) &&
+                    (part == null || cctx.affinity().primary(n, part.intValue(), topVer));
             }
         });
     }
@@ -482,4 +523,173 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     @Override public String toString() {
         return S.toString(GridCacheQueryAdapter.class, this);
     }
+
+    /**
+     * Wrapper for queries with fallback.
+     */
+    private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object, Object, R> {
+        /** Target. */
+        private GridCacheQueryFutureAdapter<?, ?, R> target;
+
+        /** Backups. */
+        private final Queue<ClusterNode> backups;
+
+        /** Bean. */
+        private final GridCacheQueryBean bean;
+
+        /** Query manager. */
+        private final GridCacheQueryManager qryMgr;
+
+        /**
+         * @param backups Backups.
+         * @param bean Bean.
+         * @param qryMgr Query manager.
+         * @param fut Future.
+         */
+        public CacheQueryFallbackFuture(Queue<ClusterNode> backups, GridCacheQueryBean bean,
+            GridCacheQueryManager qryMgr, CacheQueryFuture<R> fut) {
+            this.backups = backups;
+            this.bean = bean;
+            this.qryMgr = qryMgr;
+            this.target = (GridCacheQueryFutureAdapter<?, ?, R>)fut;
+
+            init();
+        }
+
+        /**
+         *
+         */
+        private void init() {
+            target.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
+                @Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
+                    try {
+                        onDone(fut.get());
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (F.isEmpty(backups))
+                            onDone(e);
+                        else {
+                            Set<ClusterNode> backup = Collections.singleton(backups.poll());
+
+                            target =
+                                (GridCacheQueryFutureAdapter<?, ?, R>)qryMgr.queryDistributed(bean, backup);
+
+                            init();
+                        }
+                    }
+                }
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean onPage(UUID nodeId, boolean last) {
+            return target.onPage(nodeId, last);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void loadPage() {
+            target.loadPage();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
+            target.loadAllPages();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void cancelQuery() throws IgniteCheckedException {
+            target.cancelQuery();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int available() {
+            return target.available();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean cancel() throws IgniteCheckedException {
+            return target.cancel();
+        }
+
+        /** {@inheritDoc} */
+        @Override void clear() {
+            target.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return target.endTime();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void enqueue(Collection<?> col) {
+            target.enqueue(col);
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean fields() {
+            return target.fields();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<R> get() throws IgniteCheckedException {
+            return target.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
+            return target.get(timeout, unit);
+        }
+
+        /** {@inheritDoc} */
+        @Override public R next() {
+            return target.next();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<R> nextPage() throws IgniteCheckedException {
+            return target.nextPage();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(Collection<R> res, Throwable err) {
+            return target.onDone(res, err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException {
+            return target.nextPage(timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onNodeLeft(UUID evtNodeId) {
+            target.onNodeLeft(evtNodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data,
+            @Nullable Throwable err, boolean finished) {
+            target.onPage(nodeId, data, err, finished);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            target.onTimeout();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void printMemoryStats() {
+            target.printMemoryStats();
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheQueryBean query() {
+            return target.query();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return target.timeoutId();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 16a8028..fac3d8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -52,6 +52,8 @@ import java.util.concurrent.*;
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
 
 /**
@@ -111,8 +113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         final Object recipient = recipient(nodeId, entry.getKey());
 
                         entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() {
-                            @Override
-                            public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
+                            @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
                                 throws IgniteCheckedException {
                                 f.get().closeIfNotShared(recipient);
                             }
@@ -768,98 +769,139 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         final boolean backups = qry.includeBackups() || cctx.isReplicated();
 
-        final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
-            private IgniteBiTuple<K, V> next;
+        final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+            new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+                private IgniteBiTuple<K, V> next;
 
-            private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
+                private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
 
-            private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+                private Iterator<K> iter;
 
-            {
-                advance();
-            }
+                private GridDhtLocalPartition locPart;
 
-            @Override public boolean onHasNext() {
-                return next != null;
-            }
+                {
+                    Integer part = qry.partition();
 
-            @Override public IgniteBiTuple<K, V> onNext() {
-                if (next == null)
-                    throw new NoSuchElementException();
+                    if (part == null || dht == null)
+                        iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+                    else if (part < 0 || part >= cctx.affinity().partitions())
+                        iter = F.emptyIterator();
+                    else {
+                        AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
-                IgniteBiTuple<K, V> next0 = next;
+                        locPart = dht.topology().localPartition(part, topVer, false);
 
-                advance();
+                        if (locPart == null || (locPart.state() != OWNING && locPart.state() != RENTING) ||
+                            !locPart.reserve())
+                            throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved");
 
-                return next0;
-            }
 
-            private void advance() {
-                IgniteBiTuple<K, V> next0 = null;
+                        iter = new Iterator<K>() {
+                            private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
 
-                while (iter.hasNext()) {
-                    next0 = null;
+                            @Override public boolean hasNext() {
+                                return iter0.hasNext();
+                            }
 
-                    K key = iter.next();
+                            @Override public K next() {
+                                KeyCacheObject key = iter0.next();
 
-                    V val;
+                                return key.value(cctx.cacheObjectContext(), false);
+                            }
 
-                    try {
-                        val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+                            @Override public void remove() {
+                                iter0.remove();
+                            }
+                        };
                     }
-                    catch (IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to peek value: " + e);
 
-                        val = null;
-                    }
+                    advance();
+                }
 
-                    if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
-                        dht.sendTtlUpdateRequest(expiryPlc);
+                @Override public boolean onHasNext() {
+                    return next != null;
+                }
 
-                        expiryPlc = cctx.cache().expiryPolicy(plc);
-                    }
+                @Override public IgniteBiTuple<K, V> onNext() {
+                    if (next == null)
+                        throw new NoSuchElementException();
 
-                    if (val != null) {
-                        next0 = F.t(key, val);
+                    IgniteBiTuple<K, V> next0 = next;
 
-                        if (checkPredicate(next0))
-                            break;
-                        else
-                            next0 = null;
-                    }
+                    advance();
+
+                    return next0;
                 }
 
-                next = next0 != null ?
-                    new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
-                    null;
+                private void advance() {
+                    IgniteBiTuple<K, V> next0 = null;
 
-                if (next == null)
-                    sendTtlUpdate();
-            }
+                    while (iter.hasNext()) {
+                        next0 = null;
 
-            @Override protected void onClose() {
-                sendTtlUpdate();
-            }
+                        K key = iter.next();
+
+                        V val;
+
+                        try {
+                            val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to peek value: " + e);
+
+                            val = null;
+                        }
+
+                        if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+                            dht.sendTtlUpdateRequest(expiryPlc);
+
+                            expiryPlc = cctx.cache().expiryPolicy(plc);
+                        }
+
+                        if (val != null) {
+                            next0 = F.t(key, val);
+
+                            if (checkPredicate(next0))
+                                break;
+                            else
+                                next0 = null;
+                        }
+                    }
 
-            private void sendTtlUpdate() {
-                if (dht != null && expiryPlc != null) {
-                    dht.sendTtlUpdateRequest(expiryPlc);
+                    next = next0 != null ?
+                        new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+                        null;
 
-                    expiryPlc = null;
+                    if (next == null)
+                        sendTtlUpdate();
                 }
-            }
 
-            private boolean checkPredicate(Map.Entry<K, V> e) {
-                if (keyValFilter != null) {
-                    Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+                @Override protected void onClose() {
+                    sendTtlUpdate();
 
-                    return keyValFilter.apply(e0.getKey(), e0.getValue());
+                    if (locPart != null)
+                        locPart.release();
                 }
 
-                return true;
-            }
-        };
+                private void sendTtlUpdate() {
+                    if (dht != null && expiryPlc != null) {
+                        dht.sendTtlUpdateRequest(expiryPlc);
+
+                        expiryPlc = null;
+                    }
+                }
+
+                private boolean checkPredicate(Map.Entry<K, V> e) {
+                    if (keyValFilter != null) {
+                        Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable());
+
+                        return keyValFilter.apply(e0.getKey(), e0.getValue());
+                    }
+
+                    return true;
+                }
+            };
 
         final GridIterator<IgniteBiTuple<K, V>> it;
 
@@ -914,7 +956,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         throws IgniteCheckedException {
         IgniteBiPredicate<K, V> filter = qry.scanFilter();
 
-        Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups);
+        Integer part = qry.partition();
+
+        Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
+            cctx.swap().rawSwapIterator(part);
 
         return scanIterator(it, filter, qry.keepPortable());
     }
@@ -930,10 +975,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         if (cctx.offheapTiered() && filter != null) {
             OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable());
 
-            return cctx.swap().rawOffHeapIterator(c, true, backups);
+            return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups);
         }
         else {
-            Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups);
+            Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups);
 
             return scanIterator(it, filter, qry.keepPortable());
         }
@@ -1222,7 +1267,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             try {
                 // Preparing query closures.
-                IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+                IgniteClosure<Map.Entry<K, V>, Object> trans =
+                    (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+
                 IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
 
                 injectResources(trans);
@@ -1529,11 +1576,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false,
                     qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())));
             }
-            catch (Error e) {
-                fut.onDone(e);
-
-                throw e;
-            }
             catch (Throwable e) {
                 fut.onDone(e);
 
@@ -1843,7 +1885,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 return new IgniteBiPredicate<K, V>() {
                     @Override public boolean apply(K k, V v) {
-                        return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE);
+                        return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE);
                     }
                 };
             }
@@ -2920,6 +2962,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             null,
             null,
             null,
+            null,
             false,
             keepPortable);
     }
@@ -2928,17 +2971,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * Creates user's predicate based scan query.
      *
      * @param filter Scan filter.
+     * @param part Partition.
      * @param keepPortable Keep portable flag.
      * @return Created query.
      */
-    @SuppressWarnings("unchecked")
     public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
-        boolean keepPortable) {
+        @Nullable Integer part, boolean keepPortable) {
+
         return new GridCacheQueryAdapter<>(cctx,
             SCAN,
             null,
             null,
             (IgniteBiPredicate<Object, Object>)filter,
+            part,
             false,
             keepPortable);
     }
@@ -2962,6 +3007,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             clsName,
             search,
             null,
+            null,
             false,
             keepPortable);
     }
@@ -2982,6 +3028,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             null,
             qry,
             null,
+            null,
             false,
             keepPortable);
     }
@@ -3002,6 +3049,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             null,
             qry,
             null,
+            null,
             incMeta,
             keepPortable);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 845077f..7577954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -26,6 +26,8 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 
+import org.jetbrains.annotations.*;
+
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
     /** */
     private int taskHash;
 
+    /** Partition. */
+    private Integer part;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
      * @param clause Query clause.
      * @param clsName Query class name.
      * @param keyValFilter Key-value filter.
+     * @param part Partition.
      * @param rdc Reducer.
      * @param trans Transformer.
      * @param pageSize Page size.
@@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
         String clause,
         String clsName,
         IgniteBiPredicate<Object, Object> keyValFilter,
+        @Nullable Integer part,
         IgniteReducer<Object, Object> rdc,
         IgniteClosure<Object, Object> trans,
         int pageSize,
@@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
         this.clause = clause;
         this.clsName = clsName;
         this.keyValFilter = keyValFilter;
+        this.part = part;
         this.rdc = rdc;
         this.trans = trans;
         this.pageSize = pageSize;
@@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
         return taskHash;
     }
 
+    /**
+     * @return partition.
+     */
+    @Nullable public Integer partition() {
+        return part;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -537,6 +552,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
                 writer.incrementState();
 
+            case 21:
+                if (!writer.writeInt("part", part != null ? part : -1))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -701,6 +721,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
                 reader.incrementState();
 
+            case 21:
+                int part0 = reader.readInt("part");
+
+                part = part0 == -1 ? null : part0;
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return true;
@@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 21;
+        return 22;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f516968..c0e763f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -114,7 +114,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
             }
 
             CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
-                new GridSetQueryPredicate<>(id, collocated), false, false);
+                new GridSetQueryPredicate<>(id, collocated), -1, false, false);
 
             Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
 
@@ -345,7 +345,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     private GridCloseableIterator<T> iterator0() {
         try {
             CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
-                new GridSetQueryPredicate<>(id, collocated), false, false);
+                new GridSetQueryPredicate<>(id, collocated), null, false, false);
 
             Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
index 9d41074..4601586 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java
@@ -176,7 +176,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo
             for (int j = 0; j < G.allGrids().size(); j++) {
                 GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
 
-                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
 
                 int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() {
                     @IgniteInstanceResource

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
index 62bf3f7..cc8217d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java
@@ -179,7 +179,7 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
             for (int j = 0; j < G.allGrids().size(); j++) {
                 GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two");
 
-                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false);
+                CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false);
 
                 final int i0 = j;
                 final int j0 = i;
@@ -207,8 +207,8 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa
                             Object v1 = e.getValue();
                             Object v2 = ((IgniteKernal)grid).getCache("one").get(key);
 
-                            assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 +
-                                ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
+                            assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" +
+                                key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2);
                             assertEquals(v1, v2);
                         }
                         catch (IgniteCheckedException e1) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
index 068a46c..6ccfbc2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java
@@ -115,49 +115,81 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
      * @throws Exception If failed.
      */
     public void testQuery() throws Exception {
-        checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME));
+        checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false);
 
-        checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME));
+        checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false);
+
+        checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true);
+
+        checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true);
     }
 
     /**
      * @param cache Cache.
+     * @param scanPartitions Scan partitions.
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    private void checkQuery(GridCacheAdapter cache) throws Exception {
+    private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception {
         final int ENTRY_CNT = 500;
 
-        for (int i = 0; i < ENTRY_CNT; i++)
-            cache.getAndPut(new Key(i), new Person("p-" + i, i));
+        Map<Integer, Map<Key, Person>> entries = new HashMap<>();
+
+        for (int i = 0; i < ENTRY_CNT; i++) {
+            Key key = new Key(i);
+            Person val = new Person("p-" + i, i);
+
+            int part = cache.context().affinity().partition(key);
+
+            cache.getAndPut(key, val);
+
+            Map<Key, Person> partEntries = entries.get(part);
+
+            if (partEntries == null)
+                entries.put(part, partEntries = new HashMap<>());
+
+            partEntries.put(key, val);
+        }
 
         try {
-            CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
-                new IgniteBiPredicate<Key, Person>() {
-                    @Override public boolean apply(Key key, Person p) {
-                        assertEquals(key.id, (Integer)p.salary);
+            int partitions = scanPartitions ? cache.context().affinity().partitions() : 1;
 
-                        return key.id % 2 == 0;
-                    }
-                }, false);
+            for (int i = 0; i < partitions; i++) {
+                CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery(
+                    new IgniteBiPredicate<Key, Person>() {
+                        @Override public boolean apply(Key key, Person p) {
+                            assertEquals(key.id, (Integer)p.salary);
 
-            Collection<Map.Entry<Key, Person>> res = qry.execute().get();
+                            return key.id % 2 == 0;
+                        }
+                    }, (scanPartitions ? i : null), false);
 
-            assertEquals(ENTRY_CNT / 2, res.size());
+                Collection<Map.Entry<Key, Person>> res = qry.execute().get();
 
-            for (Map.Entry<Key, Person> e : res) {
-                Key k = e.getKey();
-                Person p = e.getValue();
+                if (!scanPartitions)
+                    assertEquals(ENTRY_CNT / 2, res.size());
 
-                assertEquals(k.id, (Integer)p.salary);
-                assertEquals(0, k.id % 2);
-            }
+                for (Map.Entry<Key, Person> e : res) {
+                    Key k = e.getKey();
+                    Person p = e.getValue();
 
-            qry = cache.context().queries().createScanQuery(null, false);
+                    assertEquals(k.id, (Integer)p.salary);
+                    assertEquals(0, k.id % 2);
 
-            res = qry.execute().get();
+                    if (scanPartitions) {
+                        Map<Key, Person> partEntries = entries.get(i);
 
-            assertEquals(ENTRY_CNT, res.size());
+                        assertEquals(p, partEntries.get(k));
+                    }
+                }
+
+                qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false);
+
+                res = qry.execute().get();
+
+                if (!scanPartitions)
+                    assertEquals(ENTRY_CNT, res.size());
+            }
 
             testMultithreaded(cache, ENTRY_CNT / 2);
         }
@@ -185,7 +217,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
 
                             return key.id % 2 == 0;
                         }
-                    }, false);
+                    }, null, false);
 
                 for (int i = 0; i < 250; i++) {
                     Collection<Map.Entry<Key, Person>> res = qry.execute().get();
@@ -229,7 +261,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
 
                         return val % 2 == 0;
                     }
-                }, false);
+                }, null, false);
 
             Collection<Map.Entry<String, Long>> res = qry.execute().get();
 
@@ -244,7 +276,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
                 assertEquals(0, val % 2);
             }
 
-            qry = cache.context().queries().createScanQuery(null, false);
+            qry = cache.context().queries().createScanQuery(null, null, false);
 
             res = qry.execute().get();
 
@@ -284,7 +316,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
 
                         return key % 2 == 0;
                     }
-                }, false);
+                }, null, false);
 
             Collection<Map.Entry<Integer, byte[]>> res = qry.execute().get();
 
@@ -299,7 +331,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
                 assertEquals(0, key % 2);
             }
 
-            qry = cache.context().queries().createScanQuery(null, false);
+            qry = cache.context().queries().createScanQuery(null, null, false);
 
             res = qry.execute().get();
 
@@ -367,5 +399,29 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA
             this.name = name;
             this.salary = salary;
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Person person = (Person)o;
+
+            if (salary != person.salary)
+                return false;
+
+            return !(name != null ? !name.equals(person.name) : person.name != null);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = name != null ? name.hashCode() : 0;
+
+            return 31 * result + salary;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
new file mode 100644
index 0000000..3b1b842
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ * Tests partition scan query fallback.
+ */
+public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 5;
+
+    /** Kys count. */
+    private static final int KEYS_CNT = 1;
+
+    /** Backups. */
+    private int backups;
+
+    /** Cache mode. */
+    private CacheMode cacheMode;
+
+    /** Fallback. */
+    private boolean fallback;
+
+    /** Primary node id. */
+    private static volatile UUID expNodeId;
+
+    /** Fail node id. */
+    private static volatile UUID failNodeId;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfg.setBackups(backups);
+        ccfg.setNearConfiguration(null);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimary() throws Exception {
+        cacheMode = CacheMode.PARTITIONED;
+        backups = 0;
+        failNodeId = null;
+        fallback = false;
+
+        doTestScanPartition();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFallbackToBackup() throws Exception {
+        cacheMode = CacheMode.PARTITIONED;
+        backups = 1;
+        failNodeId = null;
+        fallback = true;
+
+        doTestScanPartition();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    protected void doTestScanPartition() throws Exception {
+        try {
+            Ignite ignite = startGrids(GRID_CNT);
+
+            IgniteCacheProxy<Integer, Integer> cache =
+                (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null);
+
+            Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                cache.put(i, i);
+
+                int part = cache.context().affinity().partition(i);
+
+                Map<Integer, Integer> partEntries = entries.get(part);
+
+                if (partEntries == null)
+                    entries.put(part, partEntries = new HashMap<>());
+
+                partEntries.put(i, i);
+            }
+
+            IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context(), true);
+
+            int part = tup.get1();
+
+            if (fallback)
+                failNodeId = tup.get2();
+            else
+                expNodeId = tup.get2();
+
+            if (fallback)
+                expNodeId = remoteBackup(part, cache.context());
+
+            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+
+            CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+            Collection<Map.Entry<Integer, Integer>> expEntries = fut.get();
+
+            for (Map.Entry<Integer, Integer> e : expEntries) {
+                Map<Integer, Integer> map = entries.get(part);
+
+                if(map == null)
+                    assertTrue(expEntries.isEmpty());
+                else
+                    assertEquals(map.get(e.getKey()), e.getValue());
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param cctx Cctx.
+     * @param primary Primary.
+     */
+    private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx, boolean primary) {
+        ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes());
+
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
+
+        Set<Integer> parts = primary ?
+            affMgr.primaryPartitions(node.id(), topVer) : affMgr.backupPartitions(node.id(), topVer);
+
+        return new IgniteBiTuple<>(F.first(parts), node.id());
+    }
+
+    /**
+     * @param part Partition.
+     * @param cctx Cctx.
+     */
+    private UUID remoteBackup(int part, final GridCacheContext cctx) {
+        final UUID locUuid = cctx.localNodeId();
+
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
+
+        return F.first(F.view(affMgr.backups(part, topVer), new IgnitePredicate<ClusterNode>() {
+            @Override public boolean apply(ClusterNode node) {
+                return !node.id().equals(locUuid);
+            }
+        })).id();
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg)
+            throws IgniteSpiException {
+            Object origMsg = ((GridIoMessage)msg).message();
+
+            if (origMsg instanceof GridCacheQueryRequest) {
+                if (node.id().equals(failNodeId))
+                    throw new IgniteSpiException("");
+                else
+                    assertEquals(expNodeId, node.id());
+            }
+
+            super.sendMessage(node, msg);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 1a60bbd..eb5027c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -61,6 +61,9 @@ import static org.junit.Assert.*;
  * Various tests for cache queries.
  */
 public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstractTest {
+    /** Key count. */
+    private static final int KEY_CNT = 5000;
+
     /** Cache store. */
     private static TestStore store = new TestStore();
 
@@ -643,6 +646,47 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
     }
 
     /**
+     * @throws Exception In case of error.
+     */
+    public void testScanPartitionQuery() throws Exception {
+        IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+        GridCacheContext cctx = ((IgniteCacheProxy)cache).context();
+
+        Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+
+        for (int i = 0; i < KEY_CNT; i++) {
+            cache.put(i, i);
+
+            int part = cctx.affinity().partition(i);
+
+            Map<Integer, Integer> partEntries = entries.get(part);
+
+            if (partEntries == null)
+                entries.put(part, partEntries = new HashMap<>());
+
+            partEntries.put(i, i);
+        }
+
+        for (int i = 0; i < cctx.affinity().partitions(); i++) {
+            CacheQuery<Map.Entry<Integer, Integer>> qry =
+                ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false);
+
+            CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+            Map<Integer, Integer> exp = entries.get(i);
+
+            Collection<Map.Entry<Integer, Integer>> actual = fut.get();
+
+            if (exp == null)
+                assertTrue(actual.isEmpty());
+            else
+                for (Map.Entry<Integer, Integer> entry : actual)
+                    assertTrue(entry.getValue().equals(exp.get(entry.getKey())));
+        }
+    }
+
+    /**
      * JUnit.
      *
      * @throws Exception In case of error.
@@ -1048,11 +1092,13 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
         for (int i = 0; i < 20; i++)
             cache.put(i, i);
 
-        QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(new IgniteBiPredicate<Integer,Integer>() {
+        IgniteBiPredicate<Integer, Integer> filter = new IgniteBiPredicate<Integer, Integer>() {
             @Override public boolean apply(Integer k, Integer v) {
                 return k >= 10;
             }
-        }));
+        };
+
+        QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(filter));
 
         q.getAll();
 
@@ -1187,7 +1233,8 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
      * @return {@code true} if index has a table for given class.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) throws IgniteCheckedException {
+    private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr)
+        throws IgniteCheckedException {
         return qryMgr.size(cls) != -1;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index f42963a..713cf84 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -70,6 +70,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);
         suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);
 
+        // Scan queries.
+        suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class);
 
         // Fields queries.
         suite.addTestSuite(IgniteCacheLocalFieldsQuerySelfTest.class);


Mime
View raw message