ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [05/30] ignite git commit: ignite-1366 Start cache processor before query processor. Initialize topology version for GridCacheQueryRequest to do not miss messages before message handler is registered.
Date Fri, 04 Sep 2015 23:45:29 GMT
ignite-1366 Start cache processor before query processor. Initialize topology version for GridCacheQueryRequest
to do not miss messages before message handler is registered.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15f3edb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15f3edb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15f3edb5

Branch: refs/heads/ignite-264
Commit: 15f3edb546c9f08ed46e3baa51f41250d57b1d98
Parents: f1f6be8
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Sep 4 10:30:18 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Sep 4 10:30:18 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   2 +-
 .../dht/GridPartitionedGetFuture.java           |  14 +-
 .../distributed/near/GridNearGetFuture.java     |  13 ++
 .../query/GridCacheDistributedQueryFuture.java  |   5 +-
 .../query/GridCacheDistributedQueryManager.java |   9 +-
 .../cache/query/GridCacheQueryManager.java      | 177 ++++++-------------
 .../cache/query/GridCacheQueryRequest.java      |  59 ++++++-
 .../IgniteCacheNodeJoinAbstractTest.java        |  42 +++++
 8 files changed, 185 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index ad4940a..7deede7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -876,8 +876,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
             startProcessor(new GridAffinityProcessor(ctx));
             startProcessor(createComponent(GridSegmentationProcessor.class, ctx));
             startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
-            startProcessor(new GridQueryProcessor(ctx));
             startProcessor(new GridCacheProcessor(ctx));
+            startProcessor(new GridQueryProcessor(ctx));
             startProcessor(new GridTaskSessionProcessor(ctx));
             startProcessor(new GridJobProcessor(ctx));
             startProcessor(new GridTaskProcessor(ctx));

http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 2f0de86..3ddf6d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -770,6 +770,18 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                 if (log.isDebugEnabled())
                     log.debug("Remapping mini get future [invalidParts=" + invalidParts +
", fut=" + this + ']');
 
+                if (!canRemap) {
+                    map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+                        @Override public boolean apply(KeyCacheObject key) {
+                            return invalidParts.contains(cctx.affinity().partition(key));
+                        }
+                    }), F.t(node, keys), topVer);
+
+                    onDone(createResultMap(res.entries()));
+
+                    return;
+                }
+
                 // Need to wait for next topology version to remap.
                 IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
 
@@ -779,7 +791,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                         AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get());
 
                         // This will append new futures to compound list.
-                        map(F.view(keys.keySet(),  new P1<KeyCacheObject>() {
+                        map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
                             @Override public boolean apply(KeyCacheObject key) {
                                 return invalidParts.contains(cctx.affinity().partition(key));
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 9d2113e..a7875f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -904,6 +904,19 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                 if (log.isDebugEnabled())
                     log.debug("Remapping mini get future [invalidParts=" + invalidParts +
", fut=" + this + ']');
 
+                if (!canRemap) {
+                    map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
+                        @Override public boolean apply(KeyCacheObject key) {
+                            return invalidParts.contains(cctx.affinity().partition(key));
+                        }
+                    }), F.t(node, keys), topVer);
+
+                    // It is critical to call onDone after adding futures to compound list.
+                    onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers,
topVer));
+
+                    return;
+                }
+
                 // Need to wait for next topology version to remap.
                 IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 32a4599..1d547c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -98,7 +98,10 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
                 subgrid.clear();
             }
 
-            final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(), reqId,
fields());
+            final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(),
+                reqId,
+                fields(),
+                qryMgr.queryTopologyVersion());
 
             // Process cancel query directly (without sending) for local node,
             cctx.closures().callLocalSafe(new Callable<Object>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index d1fdfcf..4422952 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -566,7 +566,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 false,
                 qry.query().keepPortable(),
                 qry.query().subjectId(),
-                qry.query().taskHash());
+                qry.query().taskHash(),
+                queryTopologyVersion());
 
             addQueryFuture(req.id(), fut);
 
@@ -610,7 +611,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 all,
                 qry.keepPortable(),
                 qry.subjectId(),
-                qry.taskHash());
+                qry.taskHash(),
+                queryTopologyVersion());
 
             sendRequest(fut, req, nodes);
         }
@@ -675,7 +677,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.query().includeMetadata(),
                 qry.query().keepPortable(),
                 qry.query().subjectId(),
-                qry.query().taskHash());
+                qry.query().taskHash(),
+                queryTopologyVersion());
 
             addQueryFuture(req.id(), fut);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index b3f8720..2041464 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -168,6 +168,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     /** */
     private boolean enabled;
 
+    /** */
+    private AffinityTopologyVersion qryTopVer;
+
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
         qryProc = cctx.kernalContext().query();
@@ -182,12 +185,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 if (futs != null) {
                     for (Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>>
entry : futs.entrySet()) {
-                        final Object recipient = recipient(nodeId, entry.getKey());
+                        final Object rcpt = recipient(nodeId, entry.getKey());
 
                         entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K,
V>>>() {
                             @Override public void applyx(IgniteInternalFuture<QueryResult<K,
V>> f)
                                 throws IgniteCheckedException {
-                                f.get().closeIfNotShared(recipient);
+                                f.get().closeIfNotShared(rcpt);
                             }
                         });
                     }
@@ -197,12 +200,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 if (fieldsFuts != null) {
                     for (Map.Entry<Long, GridFutureAdapter<FieldsResult>> entry
: fieldsFuts.entrySet()) {
-                        final Object recipient = recipient(nodeId, entry.getKey());
+                        final Object rcpt = recipient(nodeId, entry.getKey());
 
                         entry.getValue().listen(new CIX1<IgniteInternalFuture<FieldsResult>>()
{
                             @Override public void applyx(IgniteInternalFuture<FieldsResult>
f)
                                 throws IgniteCheckedException {
-                                f.get().closeIfNotShared(recipient);
+                                f.get().closeIfNotShared(rcpt);
                             }
                         });
                     }
@@ -213,6 +216,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         enabled = GridQueryProcessor.isEnabled(cctx.config());
+
+        qryTopVer = cctx.startTopologyVersion();
+
+        if (qryTopVer == null)
+            qryTopVer = new AffinityTopologyVersion(cctx.localNode().order(), 0);
     }
 
     /**
@@ -281,16 +289,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     /**
      * Rebuilds all search indexes of given value type.
      *
-     * @param valType Value type.
-     * @return Future that will be completed when rebuilding of all indexes is finished.
-     */
-    public IgniteInternalFuture<?> rebuildIndexes(Class<?> valType) {
-        return rebuildIndexes(valType.getName());
-    }
-
-    /**
-     * Rebuilds all search indexes of given value type.
-     *
      * @param typeName Value type name.
      * @return Future that will be completed when rebuilding of all indexes is finished.
      */
@@ -307,23 +305,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     * Rebuilds all search indexes of all types.
-     *
-     * @return Future that will be completed when rebuilding of all indexes is finished.
-     */
-    public IgniteInternalFuture<?> rebuildAllIndexes() {
-        if (!enterBusy())
-            throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
-
-        try {
-            return qryProc.rebuildAllIndexes();
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
      * Marks this request as canceled.
      *
      * @param reqId Request id.
@@ -531,12 +512,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param loc Local query or not.
      * @param subjId Security subject ID.
      * @param taskName Task name.
-     * @param recipient ID of the recipient.
+     * @param rcpt ID of the recipient.
      * @return Collection of found keys.
      * @throws IgniteCheckedException In case of error.
      */
+    @SuppressWarnings("unchecked")
     private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry,
-        @Nullable Object[] args, boolean loc, @Nullable UUID subjId, @Nullable String taskName,
Object recipient)
+        @Nullable Object[] args, boolean loc, @Nullable UUID subjId, @Nullable String taskName,
Object rcpt)
         throws IgniteCheckedException {
         if (qry.type() == null) {
             assert !loc;
@@ -555,16 +537,16 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             res = (QueryResult<K, V>)qryResCache.get(resKey);
 
-            if (res != null && res.addRecipient(recipient))
+            if (res != null && res.addRecipient(rcpt))
                 return res;
 
-            res = new QueryResult<>(qry.type(), recipient);
+            res = new QueryResult<>(qry.type(), rcpt);
 
             if (qryResCache.putIfAbsent(resKey, res) != null)
                 resKey = null;
         }
         else
-            res = new QueryResult<>(qry.type(), recipient);
+            res = new QueryResult<>(qry.type(), rcpt);
 
         GridCloseableIterator<IgniteBiTuple<K, V>> iter;
 
@@ -667,12 +649,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param loc Local query or not.
      * @param subjId Security subject ID.
      * @param taskName Task name.
-     * @param recipient ID of the recipient.
+     * @param rcpt ID of the recipient.
      * @return Collection of found keys.
      * @throws IgniteCheckedException In case of error.
      */
     private FieldsResult executeFieldsQuery(GridCacheQueryAdapter<?> qry, @Nullable
Object[] args,
-        boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object recipient)
throws IgniteCheckedException {
+        boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object rcpt) throws
IgniteCheckedException {
         assert qry != null;
 
         FieldsResult res;
@@ -709,10 +691,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             res = (FieldsResult)qryResCache.get(resKey);
 
-            if (res != null && res.addRecipient(recipient))
+            if (res != null && res.addRecipient(rcpt))
                 return res; // Cached result found.
 
-            res = new FieldsResult(recipient);
+            res = new FieldsResult(rcpt);
 
             if (qryResCache.putIfAbsent(resKey, res) != null)
                 resKey = null; // Failed to cache result.
@@ -736,7 +718,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     taskName));
             }
 
-            res = new FieldsResult(recipient);
+            res = new FieldsResult(rcpt);
         }
 
         try {
@@ -1191,7 +1173,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 // If metadata needs to be returned to user and cleaned from internal fields
- copy it.
                 List<GridQueryFieldMetadata> meta = qryInfo.includeMetaData() ?
-                    (res.metaData() != null ? new ArrayList<GridQueryFieldMetadata>(res.metaData())
: null) :
+                    (res.metaData() != null ? new ArrayList<>(res.metaData()) : null)
:
                     res.metaData();
 
                 if (!qryInfo.includeMetaData())
@@ -1996,6 +1978,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
+     * @return Topology version for query requests.
+     */
+    public AffinityTopologyVersion queryTopologyVersion() {
+        return qryTopVer;
+    }
+
+    /**
      * @param qry Query.
      * @return Filter.
      */
@@ -2347,10 +2336,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         /**
          * @param type Query type.
-         * @param recipient ID of the recipient.
+         * @param rcpt ID of the recipient.
          */
-        private QueryResult(GridCacheQueryType type, Object recipient) {
-            super(recipient);
+        private QueryResult(GridCacheQueryType type, Object rcpt) {
+            super(rcpt);
 
             this.type = type;
         }
@@ -2374,10 +2363,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         private List<GridQueryFieldMetadata> meta;
 
         /**
-         * @param recipient ID of the recipient.
+         * @param rcpt ID of the recipient.
          */
-        FieldsResult(Object recipient) {
-            super(recipient);
+        FieldsResult(Object rcpt) {
+            super(rcpt);
         }
 
         /**
@@ -2674,39 +2663,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     *
-     */
-    private class GridCacheScanSwapEntry implements Cache.Entry<K, V> {
-        /** */
-        private final AbstractLazySwapEntry e;
-
-        /**
-         * @param e Entry.
-         */
-        private GridCacheScanSwapEntry(AbstractLazySwapEntry e) {
-            this.e = e;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public V getValue() {
-            return e.value();
-        }
-
-        /** {@inheritDoc} */
-        @Override public K getKey() {
-            return e.key();
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T> T unwrap(Class<T> clazz) {
-            if (clazz.isAssignableFrom(getClass()))
-                return clazz.cast(this);
-
-            throw new IllegalArgumentException();
-        }
-    }
-
-    /**
      * Cached result.
      */
     private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>>
{
@@ -2720,10 +2676,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
 
         /**
-         * @param recipient ID of the recipient.
+         * @param rcpt ID of the recipient.
          */
-        protected CachedResult(Object recipient) {
-            boolean res = addRecipient(recipient);
+        protected CachedResult(Object rcpt) {
+            boolean res = addRecipient(rcpt);
 
             assert res;
         }
@@ -2731,17 +2687,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         /**
          * Close if this result does not have any other recipients.
          *
-         * @param recipient ID of the recipient.
+         * @param rcpt ID of the recipient.
          * @throws IgniteCheckedException If failed.
          */
-        public void closeIfNotShared(Object recipient) throws IgniteCheckedException {
+        public void closeIfNotShared(Object rcpt) throws IgniteCheckedException {
             assert isDone();
 
             synchronized (recipients) {
                 if (recipients.isEmpty())
                     return;
 
-                recipients.remove(recipient);
+                recipients.remove(rcpt);
 
                 if (recipients.isEmpty())
                     get().close();
@@ -2749,17 +2705,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         }
 
         /**
-         * @param recipient ID of the recipient.
+         * @param rcpt ID of the recipient.
          * @return {@code true} If the recipient successfully added.
          */
-        public boolean addRecipient(Object recipient) {
+        public boolean addRecipient(Object rcpt) {
             synchronized (recipients) {
                 if (isDone())
                     return false;
 
-                assert !recipients.containsKey(recipient) : recipient + " -> " + recipients;
+                assert !recipients.containsKey(rcpt) : rcpt + " -> " + recipients;
 
-                recipients.put(recipient, new QueueIterator(recipient));
+                recipients.put(rcpt, new QueueIterator(rcpt));
             }
 
             return true;
@@ -2798,18 +2754,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         }
 
         /**
-         * @param recipient ID of the recipient.
+         * @param rcpt ID of the recipient.
          * @throws IgniteCheckedException If failed.
          */
-        public IgniteSpiCloseableIterator<R> iterator(Object recipient) throws IgniteCheckedException
{
-            assert recipient != null;
+        public IgniteSpiCloseableIterator<R> iterator(Object rcpt) throws IgniteCheckedException
{
+            assert rcpt != null;
 
             IgniteSpiCloseableIterator<R> it = get();
 
             assert it != null;
 
             synchronized (recipients) {
-                return queue == null ? it : recipients.get(recipient);
+                return queue == null ? it : recipients.get(rcpt);
             }
         }
 
@@ -2825,7 +2781,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             private static final int NEXT_SIZE = 64;
 
             /** */
-            private final Object recipient;
+            private final Object rcpt;
 
             /** */
             private int pos;
@@ -2834,10 +2790,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             private Queue<R> next;
 
             /**
-             * @param recipient ID of the recipient.
+             * @param rcpt ID of the recipient.
              */
-            private QueueIterator(Object recipient) {
-                this.recipient = recipient;
+            private QueueIterator(Object rcpt) {
+                this.rcpt = rcpt;
             }
 
             /**
@@ -2850,7 +2806,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             /** {@inheritDoc} */
             @Override public void close() throws IgniteCheckedException {
-                closeIfNotShared(recipient);
+                closeIfNotShared(rcpt);
             }
 
             /** {@inheritDoc} */
@@ -3101,25 +3057,4 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             false,
             keepPortable);
     }
-
-    /**
-     * Creates SQL fields query which will include results metadata if needed.
-     *
-     * @param qry SQL query.
-     * @param incMeta Whether to include results metadata.
-     * @param keepPortable Keep portable flag.
-     * @return Created query.
-     */
-    public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean incMeta,
boolean keepPortable) {
-        assert qry != null;
-
-        return new GridCacheQueryAdapter<>(cctx,
-            SQL_FIELDS,
-            null,
-            qry,
-            null,
-            null,
-            incMeta,
-            keepPortable);
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index c21ac66..f7ef76f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
@@ -121,6 +122,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
     /** Partition. */
     private int part;
 
+    /** */
+    private AffinityTopologyVersion topVer;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -129,13 +133,21 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
     }
 
     /**
+     * Creates cancel query request.
+     *
+     * @param cacheId Cache ID.
      * @param id Request to cancel.
      * @param fields Fields query flag.
+     * @param topVer Topology version.
      */
-    public GridCacheQueryRequest(int cacheId, long id, boolean fields) {
+    public GridCacheQueryRequest(int cacheId,
+        long id,
+        boolean fields,
+        AffinityTopologyVersion topVer) {
         this.cacheId = cacheId;
         this.id = id;
         this.fields = fields;
+        this.topVer = topVer;
 
         cancel = true;
     }
@@ -151,6 +163,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
      * @param fields Fields query flag.
      * @param all Whether to load all pages.
      * @param keepPortable Whether to keep portables.
+     * @param subjId Subject ID.
+     * @param taskHash Task name hash code.
+     * @param topVer Topology version.
      */
     public GridCacheQueryRequest(
         int cacheId,
@@ -162,7 +177,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
         boolean all,
         boolean keepPortable,
         UUID subjId,
-        int taskHash
+        int taskHash,
+        AffinityTopologyVersion topVer
     ) {
         this.cacheId = cacheId;
         this.id = id;
@@ -174,6 +190,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
         this.keepPortable = keepPortable;
         this.subjId = subjId;
         this.taskHash = taskHash;
+        this.topVer = topVer;
     }
 
     /**
@@ -192,6 +209,10 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
      * @param incBackups {@code true} if need to include backups.
      * @param args Query arguments.
      * @param incMeta Include meta data or not.
+     * @param keepPortable Keep portable flag.
+     * @param subjId Subject ID.
+     * @param taskHash Task name hash code.
+     * @param topVer Topology version.
      */
     public GridCacheQueryRequest(
         int cacheId,
@@ -211,7 +232,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
         boolean incMeta,
         boolean keepPortable,
         UUID subjId,
-        int taskHash
+        int taskHash,
+        AffinityTopologyVersion topVer
     ) {
         assert type != null || fields;
         assert clause != null || (type == SCAN || type == SET || type == SPI);
@@ -235,10 +257,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
         this.keepPortable = keepPortable;
         this.subjId = subjId;
         this.taskHash = taskHash;
+        this.topVer = topVer;
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException
{
         super.prepareMarshal(ctx);
 
@@ -554,12 +581,18 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeByteArray("transBytes", transBytes))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 21:
+                if (!writer.writeByteArray("transBytes", transBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 22:
                 if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
                     return false;
 
@@ -718,7 +751,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
                 reader.incrementState();
 
             case 20:
-                transBytes = reader.readByteArray("transBytes");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -726,6 +759,14 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
                 reader.incrementState();
 
             case 21:
+                transBytes = reader.readByteArray("transBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 22:
                 byte typeOrd;
 
                 typeOrd = reader.readByte("type");
@@ -749,11 +790,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements
GridCache
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 22;
+        return 23;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheQueryRequest.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
index 58aa571..2e7f2ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
@@ -106,4 +107,45 @@ public abstract class IgniteCacheNodeJoinAbstractTest extends IgniteCacheAbstrac
             stopGrid(1);
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQuery() throws Exception {
+        final IgniteCache<Integer, Integer> cache = jcache(0);
+
+        for (int i = 0; i < 5; i++) {
+            log.info("Iteration: " + i);
+
+            final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    startGrid(1);
+
+                    return null;
+                }
+            });
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ScanQuery qry = new ScanQuery();
+
+                    while (!stop.get() && !fut.isDone())
+                        cache.query(qry).getAll();
+
+                    return null;
+                }
+            }, 10, "test-qry");
+
+            try {
+                fut.get(60_000);
+            }
+            finally {
+                stop.set(true);
+            }
+
+            stopGrid(1);
+        }
+    }
 }
\ No newline at end of file


Mime
View raw message