Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ED951182B7 for ; Fri, 19 Jun 2015 14:45:20 +0000 (UTC) Received: (qmail 79796 invoked by uid 500); 19 Jun 2015 14:45:20 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 79763 invoked by uid 500); 19 Jun 2015 14:45:20 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 79754 invoked by uid 99); 19 Jun 2015 14:45:20 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2015 14:45:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 5844118019D for ; Fri, 19 Jun 2015 14:45:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id txErrwcSivlZ for ; Fri, 19 Jun 2015 14:45:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 9CE9C4E5E1 for ; Fri, 19 Jun 2015 14:44:57 +0000 (UTC) Received: (qmail 77290 invoked by uid 99); 19 Jun 2015 14:44:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2015 14:44:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7B773E3CBE; Fri, 19 Jun 2015 14:44:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 19 Jun 2015 14:45:12 -0000 Message-Id: <038aa67d55f546b5b306a5aa08b62cee@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [17/50] incubator-ignite git commit: Merge branches 'ignite-484-1' and 'ignite-sprint-6' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1 Merge branches 'ignite-484-1' and 'ignite-sprint-6' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/efb42447 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/efb42447 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/efb42447 Branch: refs/heads/ignite-917 Commit: efb4244779b94c9c9f35c63708e4a41da2430bce Parents: 94060c9 4298238 af829d0 Author: S.Vladykin Authored: Wed Jun 17 19:50:12 2015 +0300 Committer: S.Vladykin Committed: Wed Jun 17 19:50:12 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 4 + .../processors/cache/IgniteCacheProxy.java | 7 + .../dht/GridDhtTransactionalCacheAdapter.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 12 +- .../dr/IgniteDrDataStreamerCacheUpdater.java | 7 +- .../CacheStoreUsageMultinodeAbstractTest.java | 305 +++++++++++++++++++ ...eUsageMultinodeDynamicStartAbstractTest.java | 169 ++++++++++ ...oreUsageMultinodeDynamicStartAtomicTest.java | 32 ++ ...heStoreUsageMultinodeDynamicStartTxTest.java | 32 ++ ...reUsageMultinodeStaticStartAbstractTest.java | 158 ++++++++++ ...toreUsageMultinodeStaticStartAtomicTest.java | 32 ++ ...cheStoreUsageMultinodeStaticStartTxTest.java | 32 ++ .../testsuites/IgniteCacheTestSuite4.java | 4 + .../h2/twostep/GridReduceQueryExecutor.java | 3 +- 15 files changed, 793 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efb42447/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 6635dde,6c407d9,11054b7..b956167 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@@@ -273,514 -273,477 -265,113 +273,515 @@@@ public class GridReduceQueryExecutor } /** + * @param r Query run. + * @param retryVer Retry version. + * @param nodeId Node ID. + */ + private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) { + r.state(retryVer, nodeId); + } + + /** + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. + * @return {@code true} If preloading is active. + */ + private boolean isPreloadingActive(final GridCacheContext cctx, List extraSpaces) { + if (hasMovingPartitions(cctx)) + return true; + + if (extraSpaces != null) { + for (String extraSpace : extraSpaces) { + if (hasMovingPartitions(cacheContext(extraSpace))) + return true; + } + } + + return false; + } + + /** + * @return {@code true} If cache context + */ + private boolean hasMovingPartitions(GridCacheContext cctx) { + GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false); + + for (GridDhtPartitionMap map : fullMap.values()) { + if (map.hasMovingPartitions()) + return true; + } + + return false; + } + + /** + * @param name Cache name. + * @return Cache context. + */ + private GridCacheContext cacheContext(String name) { + return ctx.cache().internalCache(name).context(); + } + + /** + * @param topVer Topology version. + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. + * @return Data nodes or {@code null} if repartitioning started and we need to retry.. + */ + private Collection stableDataNodes( + AffinityTopologyVersion topVer, + final GridCacheContext cctx, + List extraSpaces + ) { + String space = cctx.name(); + - Set nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer)); ++ Set nodes = new HashSet<>(dataNodes(space, topVer)); + + if (F.isEmpty(nodes)) + throw new CacheException("No data nodes found for cache: " + space); + + if (!F.isEmpty(extraSpaces)) { + for (String extraSpace : extraSpaces) { + GridCacheContext extraCctx = cacheContext(extraSpace); + + if (extraCctx.isLocal()) + continue; // No consistency guaranties for local caches. + + if (cctx.isReplicated() && !extraCctx.isReplicated()) + throw new CacheException("Queries running on replicated cache should not contain JOINs " + + "with partitioned tables."); + - Collection extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer); ++ Collection extraNodes = dataNodes(extraSpace, topVer); + + if (F.isEmpty(extraNodes)) + throw new CacheException("No data nodes found for cache: " + extraSpace); + + if (cctx.isReplicated() && extraCctx.isReplicated()) { + nodes.retainAll(extraNodes); + + if (nodes.isEmpty()) { + if (isPreloadingActive(cctx, extraSpaces)) + return null; // Retry. + else + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); + } + } + else if (!cctx.isReplicated() && extraCctx.isReplicated()) { + if (!extraNodes.containsAll(nodes)) + if (isPreloadingActive(cctx, extraSpaces)) + return null; // Retry. + else + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); + } + else if (!cctx.isReplicated() && !extraCctx.isReplicated()) { + if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) + if (isPreloadingActive(cctx, extraSpaces)) + return null; // Retry. + else + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); + } + else + throw new IllegalStateException(); + } + } + + return nodes; + } + + /** * @param cctx Cache context. * @param qry Query. + * @param keepPortable Keep portable. * @return Cursor. */ public Iterator> query(GridCacheContext cctx, GridCacheTwoStepQuery qry, boolean keepPortable) { - for (;;) { - long qryReqId = reqIdGen.incrementAndGet(); ++ for (int attempt = 0;; attempt++) { ++ if (attempt != 0) { ++ try { ++ Thread.sleep(attempt * 10); // Wait for exchange. ++ } ++ catch (InterruptedException e) { ++ Thread.currentThread().interrupt(); + - QueryRun r = new QueryRun(); ++ throw new CacheException("Query was interrupted.", e); ++ } ++ } + - r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize(); + long qryReqId = reqIdGen.incrementAndGet(); - r.tbls = new ArrayList<>(qry.mapQueries().size()); + QueryRun r = new QueryRun(); - String space = cctx.name(); + r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize(); - r.conn = (JdbcConnection)h2.connectionForSpace(space); + r.tbls = new ArrayList<>(qry.mapQueries().size()); - // TODO Add topology version. - ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space); + String space = cctx.name(); - if (cctx.isReplicated() || qry.explain()) { - assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node."; + r.conn = (JdbcConnection)h2.connectionForSpace(space); - // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. - dataNodes = dataNodes.forRandom(); - } + AffinityTopologyVersion topVer = h2.readyTopologyVersion(); - final Collection nodes = dataNodes.nodes(); + List extraSpaces = extraSpaces(space, qry.spaces()); - for (GridCacheSqlQuery mapQry : qry.mapQueries()) { - GridMergeTable tbl; + Collection nodes; + + // Explicit partition mapping for unstable topology. + Map partsMap = null; + + if (isPreloadingActive(cctx, extraSpaces)) { + if (cctx.isReplicated()) - nodes = replicatedDataNodes(cctx, extraSpaces); ++ nodes = replicatedUnstableDataNodes(cctx, extraSpaces); + else { - partsMap = partitionLocations(cctx, extraSpaces); ++ partsMap = partitionedUnstableDataNodes(cctx, extraSpaces); + + nodes = partsMap == null ? null : partsMap.keySet(); + } + } + else + nodes = stableDataNodes(topVer, cctx, extraSpaces); + + if (nodes == null) + continue; // Retry. + + assert !nodes.isEmpty(); + + if (cctx.isReplicated() || qry.explain()) { - assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node."; + + assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : + + "We must be on a client node."; + + // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. + nodes = Collections.singleton(F.rand(nodes)); + } + + for (GridCacheSqlQuery mapQry : qry.mapQueries()) { + GridMergeTable tbl; + + try { + tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + GridMergeIndex idx = tbl.getScanIndex(null); + + for (ClusterNode node : nodes) + idx.addSource(node.id()); + + r.tbls.add(tbl); + + curFunTbl.set(tbl); + } + + r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); + + runs.put(qryReqId, r); try { - tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO + Collection mapQrys = qry.mapQueries(); + + if (qry.explain()) { + mapQrys = new ArrayList<>(qry.mapQueries().size()); + + for (GridCacheSqlQuery mapQry : qry.mapQueries()) + mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters())); + } + + if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes. + Marshaller m = ctx.config().getMarshaller(); + + for (GridCacheSqlQuery mapQry : mapQrys) + mapQry.marshallParams(m); + } + + boolean retry = false; + + if (send(nodes, + new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) { + U.await(r.latch); + + Object state = r.state.get(); + + if (state != null) { + if (state instanceof CacheException) + throw new CacheException("Failed to run map query remotely.", (CacheException)state); + + if (state instanceof AffinityTopologyVersion) { + retry = true; + + // If remote node asks us to retry then we have outdated full partition map. + h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state); + } + } + } + else // Send failed. + retry = true; + + ResultSet res = null; + + if (!retry) { + if (qry.explain()) + return explainPlan(r.conn, space, qry); + + GridCacheSqlQuery rdc = qry.reduceQuery(); + + res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters())); + } + + for (GridMergeTable tbl : r.tbls) { + if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. + send(nodes, new GridQueryCancelRequest(qryReqId), null); + +// dropTable(r.conn, tbl.getName()); TODO + } + + if (retry) { + if (Thread.currentThread().isInterrupted()) + throw new IgniteInterruptedCheckedException("Query was interrupted."); + + continue; + } + + return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable); } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + catch (IgniteCheckedException | RuntimeException e) { + U.closeQuiet(r.conn); + - if (e instanceof CacheException) - throw (CacheException)e; - + throw new CacheException("Failed to run reduce query locally.", e); + } + finally { + if (!runs.remove(qryReqId, r)) + U.warn(log, "Query run was already removed: " + qryReqId); + + curFunTbl.remove(); } + } + } + + /** + * Calculates data nodes for replicated caches on unstable topology. + * + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. + * @return Collection of all data nodes owning all the caches or {@code null} for retry. + */ - private Collection replicatedDataNodes(final GridCacheContext cctx, List extraSpaces) { ++ private Collection replicatedUnstableDataNodes(final GridCacheContext cctx, ++ List extraSpaces) { + assert cctx.isReplicated() : cctx.name() + " must be replicated"; + - Set nodes = owningReplicatedDataNodes(cctx); ++ Set nodes = replicatedUnstableDataNodes(cctx); + - GridMergeIndex idx = tbl.getScanIndex(null); ++ if (F.isEmpty(nodes)) ++ return null; // Retry. - for (ClusterNode node : nodes) - idx.addSource(node.id()); + if (!F.isEmpty(extraSpaces)) { + for (String extraSpace : extraSpaces) { + GridCacheContext extraCctx = cacheContext(extraSpace); - r.tbls.add(tbl); + if (extraCctx.isLocal()) + continue; - curFunTbl.set(tbl); + if (!extraCctx.isReplicated()) + throw new CacheException("Queries running on replicated cache should not contain JOINs " + + "with partitioned tables."); + - nodes.retainAll(owningReplicatedDataNodes(extraCctx)); ++ Set extraOwners = replicatedUnstableDataNodes(extraCctx); ++ ++ if (F.isEmpty(extraOwners)) ++ return null; // Retry. ++ ++ nodes.retainAll(extraOwners); + + if (nodes.isEmpty()) + return null; // Retry. + } } - r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); + return nodes; + } + + /** ++ * @param space Cache name. ++ * @param topVer Topology version. ++ * @return Collection of data nodes. ++ */ ++ private Collection dataNodes(String space, AffinityTopologyVersion topVer) { ++ Collection res = ctx.discovery().cacheAffinityNodes(space, topVer); + - runs.put(qryReqId, r); ++ return res != null ? res : Collections.emptySet(); ++ } + - try { - Collection mapQrys = qry.mapQueries(); ++ /** + * Collects all the nodes owning all the partitions for the given replicated cache. + * + * @param cctx Cache context. - * @return Owning nodes. ++ * @return Owning nodes or {@code null} if we can't find owners for some partitions. + */ - private Set owningReplicatedDataNodes(GridCacheContext cctx) { ++ private Set replicatedUnstableDataNodes(GridCacheContext cctx) { + assert cctx.isReplicated() : cctx.name() + " must be replicated"; + + String space = cctx.name(); + - Set dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE)); ++ Set dataNodes = new HashSet<>(dataNodes(space, NONE)); + + if (dataNodes.isEmpty()) + throw new CacheException("No data nodes found for cache '" + space + "'"); + + // Find all the nodes owning all the partitions for replicated cache. - for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) { ++ for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { + List owners = cctx.topology().owners(p); + - if (owners.isEmpty()) - throw new CacheException("No data nodes found for cache '" + space + - "' for partition " + p); ++ if (F.isEmpty(owners)) ++ return null; // Retry. + + dataNodes.retainAll(owners); + + if (dataNodes.isEmpty()) - throw new CacheException("No data nodes found for cache '" + space + - "' owning all the partitions."); ++ return null; // Retry. + } + + return dataNodes; + } + + /** + * Calculates partition mapping for partitioned cache on unstable topology. + * + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. + * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. + */ + @SuppressWarnings("unchecked") - private Map partitionLocations(final GridCacheContext cctx, List extraSpaces) { ++ private Map partitionedUnstableDataNodes(final GridCacheContext cctx, ++ List extraSpaces) { + assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned"; + + final int partsCnt = cctx.affinity().partitions(); + + if (extraSpaces != null) { // Check correct number of partitions for partitioned caches. + for (String extraSpace : extraSpaces) { + GridCacheContext extraCctx = cacheContext(extraSpace); - if (qry.explain()) { - mapQrys = new ArrayList<>(qry.mapQueries().size()); + if (extraCctx.isReplicated() || extraCctx.isLocal()) + continue; - for (GridCacheSqlQuery mapQry : qry.mapQueries()) - mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters())); + int parts = extraCctx.affinity().partitions(); + + if (parts != partsCnt) + throw new CacheException("Number of partitions must be the same for correct collocation in " + + "caches " + cctx.name() + " and " + extraSpace + "."); } + } + + Set[] partLocs = new Set[partsCnt]; - if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes. - Marshaller m = ctx.config().getMarshaller(); + // Fill partition locations for main cache. + for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { + List owners = cctx.topology().owners(p); - if (F.isEmpty(owners)) - for (GridCacheSqlQuery mapQry : mapQrys) - mapQry.marshallParams(m); ++ if (F.isEmpty(owners)) { ++ if (!F.isEmpty(dataNodes(cctx.name(), NONE))) ++ return null; // Retry. ++ + throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p); + } - send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys)); + partLocs[p] = new HashSet<>(owners); + } - r.latch.await(); + if (extraSpaces != null) { + // Find owner intersections for each participating partitioned cache partition. + // We need this for logical collocation between different partitioned caches with the same affinity. + for (String extraSpace : extraSpaces) { + GridCacheContext extraCctx = cacheContext(extraSpace); - if (r.rmtErr != null) - throw new CacheException("Failed to run map query remotely.", r.rmtErr); + if (extraCctx.isReplicated() || extraCctx.isLocal()) + continue; - if (qry.explain()) - return explainPlan(r.conn, space, qry); + for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { + List owners = extraCctx.topology().owners(p); - if (F.isEmpty(owners)) - GridCacheSqlQuery rdc = qry.reduceQuery(); ++ if (F.isEmpty(owners)) { ++ if (!F.isEmpty(dataNodes(extraSpace, NONE))) ++ return null; // Retry. + - final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters())); + throw new CacheException("No data nodes found for cache '" + extraSpace + + "' for partition " + p); ++ } - for (GridMergeTable tbl : r.tbls) { - if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. - send(nodes, new GridQueryCancelRequest(qryReqId)); + if (partLocs[p] == null) + partLocs[p] = new HashSet<>(owners); + else { + partLocs[p].retainAll(owners); // Intersection of owners. -// dropTable(r.conn, tbl.getName()); TODO + if (partLocs[p].isEmpty()) + return null; // Intersection is empty -> retry. + } + } } - return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable); + // Filter nodes where not all the replicated caches loaded. + for (String extraSpace : extraSpaces) { + GridCacheContext extraCctx = cacheContext(extraSpace); + + if (!extraCctx.isReplicated()) + continue; + - Set dataNodes = owningReplicatedDataNodes(extraCctx); ++ Set dataNodes = replicatedUnstableDataNodes(extraCctx); ++ ++ if (F.isEmpty(dataNodes)) ++ return null; // Retry. + + for (Set partLoc : partLocs) { + partLoc.retainAll(dataNodes); + + if (partLoc.isEmpty()) + return null; // Retry. + } + } } - catch (IgniteCheckedException | InterruptedException | RuntimeException e) { - U.closeQuiet(r.conn); - if (e instanceof CacheException) - throw (CacheException)e; + // Collect the final partitions mapping. + Map res = new HashMap<>(); + + // Here partitions in all IntArray's will be sorted in ascending order, this is important. + for (int p = 0; p < partLocs.length; p++) { + Set pl = partLocs[p]; - throw new CacheException("Failed to run reduce query locally.", e); + assert !F.isEmpty(pl) : pl; + + ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl); + + IntArray parts = res.get(n); + + if (parts == null) + res.put(n, parts = new IntArray()); + + parts.add(p); } - finally { - if (!runs.remove(qryReqId, r)) - U.warn(log, "Query run was already removed: " + qryReqId); - curFunTbl.remove(); + return res; + } + + /** + * @param mainSpace Main space. + * @param allSpaces All spaces. + * @return List of all extra spaces or {@code null} if none. + */ + private List extraSpaces(String mainSpace, Set allSpaces) { + if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace))) + return null; + + ArrayList res = new ArrayList<>(allSpaces.size()); + + for (String space : allSpaces) { + if (!F.eq(space, mainSpace)) + res.add(space); } + + return res; } /**