ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1232 Fixed issue when cancel request processed before query request
Date Wed, 17 Feb 2016 14:57:39 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 8bb68cf53 -> 86bddb94a


ignite-1232 Fixed issue when cancel request processed before query request


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86bddb94
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86bddb94
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86bddb94

Branch: refs/heads/ignite-1232
Commit: 86bddb94afd661d03183eb10f01683168838947d
Parents: 8bb68cf
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Feb 17 17:57:30 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Feb 17 17:57:30 2016 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    |  6 --
 .../query/h2/twostep/GridMapQueryExecutor.java  | 80 +++++++++++++++++++-
 .../IgniteCacheQuerySelfTestSuite.java          |  2 +
 3 files changed, 79 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/86bddb94/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 3572e12..7e5ef94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -289,12 +289,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         altTypeId = new TypeId(ccfg.getName(), ctx.cacheObjects().typeId(qryEntity.getValueType()));
                     }
 
-                    if (desc.affinityKey() != null) {
-                        // TODO: IGNITE-1232
-                        if (!desc.fields().containsKey(desc.affinityKey()))
-                            desc.affinityKey(null);
-                    }
-
                     addTypeByName(ccfg, desc);
                     types.put(typeId, desc);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86bddb94/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 7b7711a..b45d962 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -64,11 +64,13 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.jdbc.JdbcResultSet;
@@ -85,6 +87,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
 import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
 /**
  * Map query executor.
@@ -126,6 +129,10 @@ public class GridMapQueryExecutor {
     private final ConcurrentMap<T2<String, AffinityTopologyVersion>, GridReservable>
reservations =
         new ConcurrentHashMap8<>();
 
+    /** */
+    private final GridBoundedConcurrentLinkedHashMap<QueryKey, Boolean> qryHist =
+        new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
+
     /**
      * @param busyLock Busy lock.
      */
@@ -219,10 +226,15 @@ public class GridMapQueryExecutor {
      * @param msg Message.
      */
     private void onCancel(ClusterNode node, GridQueryCancelRequest msg) {
-        ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
-
         long qryReqId = msg.queryRequestId();
 
+        Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), qryReqId), Boolean.FALSE);
+
+        if (old == null || !old)
+            return;
+
+        ConcurrentMap<Long, QueryResults> nodeRess = resultsForNode(node.id());
+
         GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
 
         QueryResults results = nodeRess.remove(qryReqId);
@@ -457,7 +469,7 @@ public class GridMapQueryExecutor {
         int pageSize,
         boolean distributedJoins
     ) {
-        ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
+        ConcurrentMap<Long, QueryResults> nodeRess = resultsForNode(node.id());
 
         QueryResults qr = null;
 
@@ -523,6 +535,18 @@ public class GridMapQueryExecutor {
             reserved = null;
 
             try {
+                Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), reqId), Boolean.TRUE);
+
+                if (old != null) {
+                    assert !old;
+
+                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, MAP);
+
+                    nodeRess.remove(reqId);
+
+                    return;
+                }
+
                 // Run queries.
                 int i = 0;
 
@@ -780,6 +804,9 @@ public class GridMapQueryExecutor {
             return true;
         }
 
+        /**
+         *
+         */
         void cancel() {
             if (canceled)
                 return;
@@ -937,4 +964,51 @@ public class GridMapQueryExecutor {
             throw new IllegalStateException();
         }
     }
+
+    /**
+     *
+     */
+    private static class QueryKey {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private final long qryId;
+
+        /**
+         * @param nodeId Node ID.
+         * @param qryId Query ID.
+         */
+        public QueryKey(UUID nodeId, long qryId) {
+            this.nodeId = nodeId;
+            this.qryId = qryId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryKey key = (QueryKey)o;
+
+            return qryId == key.qryId && nodeId.equals(key.nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+
+            res = 31 * res + (int) (qryId ^ (qryId >>> 32));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(QueryKey.class, this);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86bddb94/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 42e05a7..b2b1c61 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeFailTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
@@ -161,6 +162,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);
         suite.addTestSuite(IgniteBinaryObjectFieldsQuerySelfTest.class);
         suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class);
+        suite.addTestSuite(IgniteCacheQueryNodeRestartDistributedJoinSelfTest.class);
 
         // Scan queries.
         suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class);


Mime
View raw message