Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B74AC17E94 for ; Wed, 27 May 2015 19:16:58 +0000 (UTC) Received: (qmail 61164 invoked by uid 500); 27 May 2015 19:16:58 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 61131 invoked by uid 500); 27 May 2015 19:16:58 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 61118 invoked by uid 99); 27 May 2015 19:16:58 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 May 2015 19:16:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 36E7D1822A3 for ; Wed, 27 May 2015 19:16:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.229 X-Spam-Level: X-Spam-Status: No, score=-3.229 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 6ioIn61TarZW for ; Wed, 27 May 2015 19:16:50 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 8853F454E4 for ; Wed, 27 May 2015 19:16:49 +0000 (UTC) Received: (qmail 61098 invoked by uid 99); 27 May 2015 19:16:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 May 2015 19:16:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C72C1DFFF0; Wed, 27 May 2015 19:16:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Message-Id: <42eaf636dab14797a06e5ed8d5fb3874@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ignite git commit: ignite-389 Change fallback nodes order: local first Date: Wed, 27 May 2015 19:16:48 +0000 (UTC) Repository: incubator-ignite Updated Branches: refs/heads/ignite-389 5a7dd02f5 -> d151244ee ignite-389 Change fallback nodes order: local first Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d151244e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d151244e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d151244e Branch: refs/heads/ignite-389 Commit: d151244ee95ad2bba986136561e2326a434c3b5b Parents: 5a7dd02 Author: agura Authored: Wed May 27 22:16:28 2015 +0300 Committer: agura Committed: Wed May 27 22:16:28 2015 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheQueryAdapter.java | 132 ++++++++++--------- ...CacheScanPartitionQueryFallbackSelfTest.java | 2 +- 2 files changed, 72 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d151244e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 1f7b736..6574f0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -41,6 +41,13 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy * Query adapter. */ public class GridCacheQueryAdapter implements CacheQuery { + /** Is local node predicate. */ + private static final IgnitePredicate IS_LOC_NODE = new IgnitePredicate() { + @Override public boolean apply(ClusterNode n) { + return n.isLocal(); + } + }; + /** */ private final GridCacheContext cctx; @@ -449,24 +456,10 @@ public class GridCacheQueryAdapter implements CacheQuery { if (type == SQL_FIELDS || type == SPI) return (CacheQueryFuture)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); - else { - final CacheQueryFuture fut = - (CacheQueryFuture)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); - - if (type == SCAN && part != null) { - assert nodes.size() == 1; - - final Queue backups = new LinkedList<>( - cctx.affinity().backups(part, cctx.affinity().affinityTopologyVersion())); - - if (F.isEmpty(backups)) - return fut; - - return new CacheQueryFallbackFuture<>(backups, bean, qryMgr, fut); - } - - return fut; - } + else if (type == SCAN && part != null && nodes.size() > 1) + return new CacheQueryFallbackFuture(nodes, bean, qryMgr); + else + return (CacheQueryFuture)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } /** @@ -484,7 +477,7 @@ public class GridCacheQueryAdapter implements CacheQuery { return Collections.singletonList(cctx.localNode()); case REPLICATED: - if (prj != null) + if (prj != null || partition() != null) return nodes(cctx, prj, partition()); return cctx.affinityNode() ? @@ -508,12 +501,13 @@ public class GridCacheQueryAdapter implements CacheQuery { @Nullable final ClusterGroup prj, @Nullable final Integer part) { assert cctx != null; - final List owners = part == null ? null : - cctx.topology().owners(part, cctx.affinity().affinityTopologyVersion()); + final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + + final Set owners = + part == null ? Collections.emptySet() : new HashSet<>(cctx.topology().owners(part, topVer)); return F.view(CU.allNodes(cctx), new P1() { @Override public boolean apply(ClusterNode n) { - AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); return cctx.discovery().cacheAffinityNode(n, cctx.name()) && (prj == null || prj.node(n.id()) != null) && @@ -532,10 +526,10 @@ public class GridCacheQueryAdapter implements CacheQuery { */ private static class CacheQueryFallbackFuture extends GridCacheQueryFutureAdapter { /** Target. */ - private GridCacheQueryFutureAdapter target; + private GridCacheQueryFutureAdapter fut; /** Backups. */ - private final Queue backups; + private final Queue nodes; /** Bean. */ private final GridCacheQueryBean bean; @@ -544,41 +538,57 @@ public class GridCacheQueryAdapter implements CacheQuery { private final GridCacheQueryManager qryMgr; /** - * @param backups Backups. + * @param nodes Backups. * @param bean Bean. * @param qryMgr Query manager. - * @param fut Future. */ - public CacheQueryFallbackFuture(Queue backups, GridCacheQueryBean bean, - GridCacheQueryManager qryMgr, CacheQueryFuture fut) { - this.backups = backups; + public CacheQueryFallbackFuture(Collection nodes, GridCacheQueryBean bean, + GridCacheQueryManager qryMgr) { + this.nodes = fallbacks(nodes); this.bean = bean; this.qryMgr = qryMgr; - this.target = (GridCacheQueryFutureAdapter)fut; init(); } /** + * @param nodes Nodes. + */ + private Queue fallbacks(Collection nodes) { + Queue fallbacks = new LinkedList<>(); + + ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE)); + + if (node != null) { + fallbacks.add(node); + + fallbacks.addAll(F.view(nodes, F.not(IS_LOC_NODE))); + } + else + fallbacks.addAll(nodes); + + return fallbacks; + } + + /** * */ private void init() { - target.listen(new IgniteInClosure>>() { + ClusterNode node = nodes.poll(); + + fut = (GridCacheQueryFutureAdapter)(node.isLocal() ? qryMgr.queryLocal(bean) : + qryMgr.queryDistributed(bean, Collections.singleton(node))); + + fut.listen(new IgniteInClosure>>() { @Override public void apply(IgniteInternalFuture> fut) { try { onDone(fut.get()); } catch (IgniteCheckedException e) { - if (F.isEmpty(backups)) + if (F.isEmpty(nodes)) onDone(e); - else { - Set backup = Collections.singleton(backups.poll()); - - target = - (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, backup); - + else init(); - } } } }); @@ -586,113 +596,113 @@ public class GridCacheQueryAdapter implements CacheQuery { /** {@inheritDoc} */ @Override protected boolean onPage(UUID nodeId, boolean last) { - return target.onPage(nodeId, last); + return fut.onPage(nodeId, last); } /** {@inheritDoc} */ @Override protected void loadPage() { - target.loadPage(); + fut.loadPage(); } /** {@inheritDoc} */ @Override protected void loadAllPages() throws IgniteInterruptedCheckedException { - target.loadAllPages(); + fut.loadAllPages(); } /** {@inheritDoc} */ @Override protected void cancelQuery() throws IgniteCheckedException { - target.cancelQuery(); + fut.cancelQuery(); } /** {@inheritDoc} */ @Override public int available() { - return target.available(); + return fut.available(); } /** {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException { - return target.cancel(); + return fut.cancel(); } /** {@inheritDoc} */ @Override void clear() { - target.clear(); + fut.clear(); } /** {@inheritDoc} */ @Override public long endTime() { - return target.endTime(); + return fut.endTime(); } /** {@inheritDoc} */ @Override protected void enqueue(Collection col) { - target.enqueue(col); + fut.enqueue(col); } /** {@inheritDoc} */ @Override boolean fields() { - return target.fields(); + return fut.fields(); } /** {@inheritDoc} */ @Override public Collection get() throws IgniteCheckedException { - return target.get(); + return fut.get(); } /** {@inheritDoc} */ @Override public Collection get(long timeout, TimeUnit unit) throws IgniteCheckedException { - return target.get(timeout, unit); + return fut.get(timeout, unit); } /** {@inheritDoc} */ @Override public R next() { - return target.next(); + return fut.next(); } /** {@inheritDoc} */ @Override public Collection nextPage() throws IgniteCheckedException { - return target.nextPage(); + return fut.nextPage(); } /** {@inheritDoc} */ @Override public boolean onDone(Collection res, Throwable err) { - return target.onDone(res, err); + return fut.onDone(res, err); } /** {@inheritDoc} */ @Override public Collection nextPage(long timeout) throws IgniteCheckedException { - return target.nextPage(timeout); + return fut.nextPage(timeout); } /** {@inheritDoc} */ @Override protected void onNodeLeft(UUID evtNodeId) { - target.onNodeLeft(evtNodeId); + fut.onNodeLeft(evtNodeId); } /** {@inheritDoc} */ @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection data, @Nullable Throwable err, boolean finished) { - target.onPage(nodeId, data, err, finished); + fut.onPage(nodeId, data, err, finished); } /** {@inheritDoc} */ @Override public void onTimeout() { - target.onTimeout(); + fut.onTimeout(); } /** {@inheritDoc} */ @Override public void printMemoryStats() { - target.printMemoryStats(); + fut.printMemoryStats(); } /** {@inheritDoc} */ @Override public GridCacheQueryBean query() { - return target.query(); + return fut.query(); } /** {@inheritDoc} */ @Override public IgniteUuid timeoutId() { - return target.timeoutId(); + return fut.timeoutId(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d151244e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java index 3b1b842..31336e6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -41,7 +41,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT private static final int GRID_CNT = 5; /** Kys count. */ - private static final int KEYS_CNT = 1; + private static final int KEYS_CNT = 5000; /** Backups. */ private int backups;