Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A35E4187A8 for ; Wed, 17 Feb 2016 14:57:40 +0000 (UTC) Received: (qmail 35593 invoked by uid 500); 17 Feb 2016 14:57:40 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 35557 invoked by uid 500); 17 Feb 2016 14:57:40 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 35548 invoked by uid 99); 17 Feb 2016 14:57:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Feb 2016 14:57:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EF7B3E0577; Wed, 17 Feb 2016 14:57:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-1232 Fixed issue when cancel request processed before query request Date: Wed, 17 Feb 2016 14:57:39 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/ignite-1232 8bb68cf53 -> 86bddb94a ignite-1232 Fixed issue when cancel request processed before query request Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86bddb94 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86bddb94 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86bddb94 Branch: refs/heads/ignite-1232 Commit: 86bddb94afd661d03183eb10f01683168838947d Parents: 8bb68cf Author: sboikov Authored: Wed Feb 17 17:57:30 2016 +0300 Committer: sboikov Committed: Wed Feb 17 17:57:30 2016 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 6 -- .../query/h2/twostep/GridMapQueryExecutor.java | 80 +++++++++++++++++++- .../IgniteCacheQuerySelfTestSuite.java | 2 + 3 files changed, 79 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/86bddb94/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 3572e12..7e5ef94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -289,12 +289,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { altTypeId = new TypeId(ccfg.getName(), ctx.cacheObjects().typeId(qryEntity.getValueType())); } - if (desc.affinityKey() != null) { - // TODO: IGNITE-1232 - if (!desc.fields().containsKey(desc.affinityKey())) - desc.affinityKey(null); - } - addTypeByName(ccfg, desc); types.put(typeId, desc); http://git-wip-us.apache.org/repos/asf/ignite/blob/86bddb94/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 7b7711a..b45d962 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -64,11 +64,13 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.jdbc.JdbcResultSet; @@ -85,6 +87,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED; import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages; +import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; /** * Map query executor. @@ -126,6 +129,10 @@ public class GridMapQueryExecutor { private final ConcurrentMap, GridReservable> reservations = new ConcurrentHashMap8<>(); + /** */ + private final GridBoundedConcurrentLinkedHashMap qryHist = + new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); + /** * @param busyLock Busy lock. */ @@ -219,10 +226,15 @@ public class GridMapQueryExecutor { * @param msg Message. */ private void onCancel(ClusterNode node, GridQueryCancelRequest msg) { - ConcurrentMap nodeRess = resultsForNode(node.id()); - long qryReqId = msg.queryRequestId(); + Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), qryReqId), Boolean.FALSE); + + if (old == null || !old) + return; + + ConcurrentMap nodeRess = resultsForNode(node.id()); + GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP); QueryResults results = nodeRess.remove(qryReqId); @@ -457,7 +469,7 @@ public class GridMapQueryExecutor { int pageSize, boolean distributedJoins ) { - ConcurrentMap nodeRess = resultsForNode(node.id()); + ConcurrentMap nodeRess = resultsForNode(node.id()); QueryResults qr = null; @@ -523,6 +535,18 @@ public class GridMapQueryExecutor { reserved = null; try { + Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), reqId), Boolean.TRUE); + + if (old != null) { + assert !old; + + GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, MAP); + + nodeRess.remove(reqId); + + return; + } + // Run queries. int i = 0; @@ -780,6 +804,9 @@ public class GridMapQueryExecutor { return true; } + /** + * + */ void cancel() { if (canceled) return; @@ -937,4 +964,51 @@ public class GridMapQueryExecutor { throw new IllegalStateException(); } } + + /** + * + */ + private static class QueryKey { + /** */ + private final UUID nodeId; + + /** */ + private final long qryId; + + /** + * @param nodeId Node ID. + * @param qryId Query ID. + */ + public QueryKey(UUID nodeId, long qryId) { + this.nodeId = nodeId; + this.qryId = qryId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryKey key = (QueryKey)o; + + return qryId == key.qryId && nodeId.equals(key.nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + (int) (qryId ^ (qryId >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(QueryKey.class, this); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/86bddb94/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 42e05a7..b2b1c61 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeFailTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest; @@ -161,6 +162,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheQuerySerializationSelfTest.class); suite.addTestSuite(IgniteBinaryObjectFieldsQuerySelfTest.class); suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartDistributedJoinSelfTest.class); // Scan queries. suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class);