Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AFB1117A8A for ; Thu, 11 Jun 2015 13:12:32 +0000 (UTC) Received: (qmail 11729 invoked by uid 500); 11 Jun 2015 13:12:32 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 11697 invoked by uid 500); 11 Jun 2015 13:12:32 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 11688 invoked by uid 99); 11 Jun 2015 13:12:32 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2015 13:12:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 2D797CD44C for ; Thu, 11 Jun 2015 13:12:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id nBFDszrpwLJf for ; Thu, 11 Jun 2015 13:12:28 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id EA87F2644D for ; Thu, 11 Jun 2015 13:12:27 +0000 (UTC) Received: (qmail 11447 invoked by uid 99); 11 Jun 2015 13:12:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2015 13:12:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B75CFDFFD5; Thu, 11 Jun 2015 13:12:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 11 Jun 2015 13:12:27 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/50] incubator-ignite git commit: ignite-389 Avoid backups filtering in case of partition scan query Repository: incubator-ignite Updated Branches: refs/heads/ignite-745 ed2360877 -> 01bcfd8a7 (forced update) ignite-389 Avoid backups filtering in case of partition scan query Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5d6bb532 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5d6bb532 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5d6bb532 Branch: refs/heads/ignite-745 Commit: 5d6bb532c7de35cfea7674b5fc1446e72a5fa985 Parents: f00a9e9 Author: agura Authored: Thu May 28 18:30:08 2015 +0300 Committer: agura Committed: Thu May 28 18:30:08 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/cache/query/ScanQuery.java | 12 +- .../cache/query/GridCacheQueryAdapter.java | 122 +++---------------- .../cache/query/GridCacheQueryManager.java | 9 +- 3 files changed, 28 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java index f56b0c7..e6b69bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java @@ -46,6 +46,11 @@ public final class ScanQuery extends Query> { this(null, null); } + /** + * Creates partition scan query returning all entries for given partition. + * + * @param part Partition. + */ public ScanQuery(int part) { this(part, null); } @@ -62,9 +67,10 @@ public final class ScanQuery extends Query> { /** * Create scan query with filter. * + * @param part Partition. * @param filter Filter. If {@code null} then all entries will be returned. */ - public ScanQuery(Integer part, @Nullable IgniteBiPredicate filter) { + public ScanQuery(@Nullable Integer part, @Nullable IgniteBiPredicate filter) { setPartition(part); setFilter(filter); } @@ -96,7 +102,7 @@ public final class ScanQuery extends Query> { * * @return Partition number or {@code null}. */ - public Integer getPartition() { + @Nullable public Integer getPartition() { return part; } @@ -106,7 +112,7 @@ public final class ScanQuery extends Query> { * * @param part Partition number over which this query should iterate. */ - public void setPartition(Integer part) { + public void setPartition(@Nullable Integer part) { this.part = part; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 6574f0a..2f32faa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -26,14 +26,15 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; + import org.jetbrains.annotations.*; import java.util.*; -import java.util.concurrent.*; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*; @@ -457,7 +458,7 @@ public class GridCacheQueryAdapter implements CacheQuery { return (CacheQueryFuture)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); else if (type == SCAN && part != null && nodes.size() > 1) - return new CacheQueryFallbackFuture(nodes, bean, qryMgr); + return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr); else return (CacheQueryFuture)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } @@ -524,9 +525,10 @@ public class GridCacheQueryAdapter implements CacheQuery { /** * Wrapper for queries with fallback. */ - private static class CacheQueryFallbackFuture extends GridCacheQueryFutureAdapter { - /** Target. */ - private GridCacheQueryFutureAdapter fut; + private static class CacheQueryFallbackFuture extends GridFutureAdapter> + implements CacheQueryFuture { + /** Query future. */ + private volatile GridCacheQueryFutureAdapter fut; /** Backups. */ private final Queue nodes; @@ -559,13 +561,10 @@ public class GridCacheQueryAdapter implements CacheQuery { ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE)); - if (node != null) { + if (node != null) fallbacks.add(node); - fallbacks.addAll(F.view(nodes, F.not(IS_LOC_NODE))); - } - else - fallbacks.addAll(nodes); + fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes); return fallbacks; } @@ -576,10 +575,11 @@ public class GridCacheQueryAdapter implements CacheQuery { private void init() { ClusterNode node = nodes.poll(); - fut = (GridCacheQueryFutureAdapter)(node.isLocal() ? qryMgr.queryLocal(bean) : - qryMgr.queryDistributed(bean, Collections.singleton(node))); + GridCacheQueryFutureAdapter fut0 = + (GridCacheQueryFutureAdapter)(node.isLocal() ? qryMgr.queryLocal(bean) : + qryMgr.queryDistributed(bean, Collections.singleton(node))); - fut.listen(new IgniteInClosure>>() { + fut0.listen(new IgniteInClosure>>() { @Override public void apply(IgniteInternalFuture> fut) { try { onDone(fut.get()); @@ -592,26 +592,8 @@ public class GridCacheQueryAdapter implements CacheQuery { } } }); - } - - /** {@inheritDoc} */ - @Override protected boolean onPage(UUID nodeId, boolean last) { - return fut.onPage(nodeId, last); - } - /** {@inheritDoc} */ - @Override protected void loadPage() { - fut.loadPage(); - } - - /** {@inheritDoc} */ - @Override protected void loadAllPages() throws IgniteInterruptedCheckedException { - fut.loadAllPages(); - } - - /** {@inheritDoc} */ - @Override protected void cancelQuery() throws IgniteCheckedException { - fut.cancelQuery(); + fut = fut0; } /** {@inheritDoc} */ @@ -625,84 +607,8 @@ public class GridCacheQueryAdapter implements CacheQuery { } /** {@inheritDoc} */ - @Override void clear() { - fut.clear(); - } - - /** {@inheritDoc} */ - @Override public long endTime() { - return fut.endTime(); - } - - /** {@inheritDoc} */ - @Override protected void enqueue(Collection col) { - fut.enqueue(col); - } - - /** {@inheritDoc} */ - @Override boolean fields() { - return fut.fields(); - } - - /** {@inheritDoc} */ - @Override public Collection get() throws IgniteCheckedException { - return fut.get(); - } - - /** {@inheritDoc} */ - @Override public Collection get(long timeout, TimeUnit unit) throws IgniteCheckedException { - return fut.get(timeout, unit); - } - - /** {@inheritDoc} */ @Override public R next() { return fut.next(); } - - /** {@inheritDoc} */ - @Override public Collection nextPage() throws IgniteCheckedException { - return fut.nextPage(); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(Collection res, Throwable err) { - return fut.onDone(res, err); - } - - /** {@inheritDoc} */ - @Override public Collection nextPage(long timeout) throws IgniteCheckedException { - return fut.nextPage(timeout); - } - - /** {@inheritDoc} */ - @Override protected void onNodeLeft(UUID evtNodeId) { - fut.onNodeLeft(evtNodeId); - } - - /** {@inheritDoc} */ - @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection data, - @Nullable Throwable err, boolean finished) { - fut.onPage(nodeId, data, err, finished); - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - fut.onTimeout(); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - fut.printMemoryStats(); - } - - /** {@inheritDoc} */ - @Override public GridCacheQueryBean query() { - return fut.query(); - } - - /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return fut.timeoutId(); - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index fac3d8f..652d62e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -795,7 +795,6 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte !locPart.reserve()) throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved"); - iter = new Iterator() { private Iterator iter0 = locPart.keySet().iterator(); @@ -1329,9 +1328,11 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte K key = row.getKey(); - // Filter backups for SCAN queries. Other types are filtered in indexing manager. - if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL && qry.type() == SCAN && - !incBackups && !cctx.affinity().primary(cctx.localNode(), key, topVer)) { + // Filter backups for SCAN queries, if it isn't partition scan. + // Other types are filtered in indexing manager. + if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null && + cctx.config().getCacheMode() != LOCAL && !incBackups && + !cctx.affinity().primary(cctx.localNode(), key, topVer)) { if (log.isDebugEnabled()) log.debug("Ignoring backup element [row=" + row + ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups +