Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 58363200CB5 for ; Wed, 12 Jul 2017 15:06:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 569F2165EE7; Wed, 12 Jul 2017 13:06:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 01611165EE1 for ; Wed, 12 Jul 2017 15:06:20 +0200 (CEST) Received: (qmail 42830 invoked by uid 500); 12 Jul 2017 13:06:19 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 42821 invoked by uid 99); 12 Jul 2017 13:06:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Jul 2017 13:06:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 596B1E1812; Wed, 12 Jul 2017 13:06:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-5578 Affinity for local join Date: Wed, 12 Jul 2017 13:06:19 +0000 (UTC) archived-at: Wed, 12 Jul 2017 13:06:22 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5578-locJoin 62e3e70fc -> 0c7124122 ignite-5578 Affinity for local join Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c712412 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c712412 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c712412 Branch: refs/heads/ignite-5578-locJoin Commit: 0c7124122e8eebaad1a85f844277a5cf1564a8de Parents: 62e3e70 Author: sboikov Authored: Wed Jul 12 11:32:41 2017 +0300 Committer: sboikov Committed: Wed Jul 12 15:49:44 2017 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 8 + .../cache/CacheAffinitySharedManager.java | 105 ++++++---- .../GridCachePartitionExchangeManager.java | 19 +- .../GridDhtPartitionsExchangeFuture.java | 191 +++++++++++++------ 4 files changed, 233 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index a8c6c59..a8ac825 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -337,6 +337,14 @@ public class GridAffinityAssignmentCache { } /** + * @param topVer + * @return + */ + public boolean lastVersionEquals(AffinityTopologyVersion topVer) { + return topVer.equals(lastVersion()); + } + + /** * @return Last calculated affinity version. */ public AffinityTopologyVersion lastVersion() { http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 45586c7..3f24547 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1230,7 +1230,7 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap public GridAffinityAssignmentCache affinity(Integer grpId) { CacheGroupHolder grpHolder = grpHolders.get(grpId); - assert grpHolder != null : grpId; + assert grpHolder != null : debugGroupName(grpId); return grpHolder.affinity(); } @@ -1311,6 +1311,19 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap } /** + * @param grpId Group ID. + * @return Group name for debug purpose. + */ + private String debugGroupName(int grpId) { + CacheGroupDescriptor desc = caches.group(grpId); + + if (desc != null) + return desc.cacheOrGroupName(); + else + return "Unknown group: " + grpId; + } + + /** * @param fut Exchange future. * @throws IgniteCheckedException If failed. */ @@ -1450,8 +1463,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. * @return Future completed when caches initialization is done. */ - private IgniteInternalFuture initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut) - throws IgniteCheckedException { + public IgniteInternalFuture initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut, + final boolean newAff) throws IgniteCheckedException { final List> futs = new ArrayList<>(); forAllRegisteredCacheGroups(new IgniteInClosureX() { @@ -1483,51 +1496,75 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap final GridAffinityAssignmentCache aff = grpHolder.affinity(); - List exchFuts = cctx.exchange().exchangeFutures(); + if (newAff) { + if (!aff.lastVersionEquals(fut.topologyVersion())) { + List> assign = + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); - int idx = exchFuts.indexOf(fut); + aff.initialize(fut.topologyVersion(), assign); + } + } + else { + List exchFuts = cctx.exchange().exchangeFutures(); - assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx + - ", total=" + exchFuts.size() + ']'; + int idx = exchFuts.indexOf(fut); - final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1); + assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx + + ", total=" + exchFuts.size() + ']'; - if (log.isDebugEnabled()) { - log.debug("Need initialize affinity on coordinator [" + - "cacheGrp=" + desc.cacheOrGroupName() + - "prevAff=" + prev.topologyVersion() + ']'); - } + final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1); - assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev; + if (log.isDebugEnabled()) { + log.debug("Need initialize affinity on coordinator [" + + "cacheGrp=" + desc.cacheOrGroupName() + + "prevAff=" + prev.topologyVersion() + ']'); + } - GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - desc.groupId(), - prev.topologyVersion(), - prev.discoCache()); + assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev; - fetchFut.init(false); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, + desc.groupId(), + prev.topologyVersion(), + prev.discoCache()); - final GridFutureAdapter affFut = new GridFutureAdapter<>(); + fetchFut.init(false); - fetchFut.listen(new IgniteInClosureX>() { - @Override public void applyx(IgniteInternalFuture fetchFut) - throws IgniteCheckedException { - fetchAffinity(prev.topologyVersion(), - prev.discoveryEvent(), - prev.discoCache(), - aff, (GridDhtAssignmentFetchFuture)fetchFut); + final GridFutureAdapter affFut = new GridFutureAdapter<>(); - aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + fetchFut.listen(new IgniteInClosureX>() { + @Override public void applyx(IgniteInternalFuture fetchFut) + throws IgniteCheckedException { + fetchAffinity(prev.topologyVersion(), + prev.discoveryEvent(), + prev.discoCache(), + aff, + (GridDhtAssignmentFetchFuture)fetchFut); - affFut.onDone(fut.topologyVersion()); - } - }); + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + + affFut.onDone(fut.topologyVersion()); + } + }); - futs.add(affFut); + futs.add(affFut); + } } - else + else { grpHolder = new CacheGroupHolder1(grp, null); + if (newAff) { + GridAffinityAssignmentCache aff = grpHolder.affinity(); + + if (!aff.lastVersionEquals(fut.topologyVersion())) { + List> assign = aff.calculate(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache()); + + aff.initialize(fut.topologyVersion(), assign); + } + } + } + CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder); assert old == null : old; @@ -1757,7 +1794,7 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap */ public IgniteInternalFuture>>> initAffinityOnNodeLeft( final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - IgniteInternalFuture initFut = initCoordinatorCaches(fut); + IgniteInternalFuture initFut = initCoordinatorCaches(fut, false); if (initFut != null && !initFut.isDone()) { final GridFutureAdapter>>> resFut = new GridFutureAdapter<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 018537c..51214e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1438,7 +1438,24 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana return; try { - sendLocalPartitions(node, msg.exchangeId()); + List futs = exchangeFutures(); + + GridDhtPartitionsExchangeFuture fut = null; + + for (int i = futs.size() - 1; i >= 0; i++) { + GridDhtPartitionsExchangeFuture fut0 = futs.get(i); + + if (fut0.exchangeId().equals(msg.exchangeId())) { + fut = fut0; + + break; + } + } + + if (fut != null) + fut.processSinglePartitionRequest(node, msg); + else + sendLocalPartitions(node, msg.exchangeId()); } finally { leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 81b288c..4a39bae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -136,6 +136,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private ClusterNode crd; + /** */ + private boolean crdReady; + /** ExchangeFuture id. */ private final GridDhtPartitionExchangeId exchId; @@ -169,7 +172,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * Messages received on non-coordinator are stored in case if this node * becomes coordinator. */ - private final Map singleMsgs = new ConcurrentHashMap8<>(); + private final Map singleMsgs = new ConcurrentHashMap8<>(); /** Messages received from new coordinator. */ private final Map fullMsgs = new ConcurrentHashMap8<>(); @@ -224,6 +227,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private ExchangeContext exchCtx; + /** */ + private FinishState finishState; + /** * @param cctx Cache context. * @param busyLock Busy lock. @@ -452,6 +458,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean crdNode = crd != null && crd.isLocal(); + if (crdNode) + crdReady = true; + exchLog.info("Started exchange init [topVer=" + topVer + ", crd=" + crdNode + ", evt=" + discoEvt.type() + @@ -1434,44 +1443,44 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } - processMessage(node, msg); + processMessage(node.id(), msg); } }); } } /** - * @param node Sender node. + * @param nodeId Sender node. * @param msg Message. */ - private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { + private void processMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { boolean allReceived = false; boolean updateSingleMap = false; synchronized (this) { assert crd != null; - if (crd.isLocal()) { - if (remaining.remove(node.id())) { + if (crd.isLocal() && crdReady) { + if (remaining.remove(nodeId)) { updateSingleMap = true; pendingSingleUpdates++; if (stateChangeExchange() && msg.getError() != null) - changeGlobalStateExceptions.put(node.id(), msg.getError()); + changeGlobalStateExceptions.put(nodeId, msg.getError()); allReceived = remaining.isEmpty(); } } else - singleMsgs.put(node, msg); + singleMsgs.put(nodeId, msg); } if (updateSingleMap) { try { // Do not update partition map, in case cluster transitioning to inactive state. if (!deactivateCluster()) - updatePartitionSingleMap(node, msg); + updatePartitionSingleMap(nodeId, msg); } finally { synchronized (this) { @@ -1747,17 +1756,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!crd.equals(discoCache.serverNodes().get(0))) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (!grp.isLocal()) { - if (localJoinExchange() && grp.affinity().lastVersion().topologyVersion() == -1L) { - List> aff = grp.affinity().calculate(topologyVersion(), - discoEvt, - discoCache); - - grp.affinity().initialize(topologyVersion(), aff); - } - + if (!grp.isLocal()) grp.topology().beforeExchange(this, !centralizedAff); - } } } @@ -2013,12 +2013,37 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param node Sender node. * @param msg Message. */ + public void processSinglePartitionRequest(final ClusterNode node, GridDhtPartitionsSingleRequest msg) { + if (!cctx.discovery().alive(node.id())) + return; + + initFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + synchronized (this) { + if (finishState != null && node.id().equals(finishState.crdId)) + return; + } + + try { + sendLocalPartitions(node); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message to coordinator: " + e); + } + } + }); + } + + /** + * @param node Sender node. + * @param msg Message. + */ private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { assert exchId.equals(msg.exchangeId()) : msg; assert msg.lastVersion() != null : msg; synchronized (this) { - if (crd == null) + if (crd == null || finishState != null) return; if (!crd.equals(node)) { @@ -2031,6 +2056,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } + + finishState = new FinishState(crd.id()); } Set affReq = exchCtx.groupsAffinityRequestOnJoin(); @@ -2040,7 +2067,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Collection cachesAff = msg.cachesAffinity(); - assert !F.isEmpty(cachesAff) : cachesAff; + assert !F.isEmpty(cachesAff) : msg; assert cachesAff.size() >= affReq.size(); int cnt = 0; @@ -2153,11 +2180,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * Updates partition map in all caches. * - * @param node Node sent message. + * @param nodeId Node message received from. * @param msg Partitions single message. */ - private void updatePartitionSingleMap(ClusterNode node, GridDhtPartitionsSingleMessage msg) { - msgs.put(node.id(), msg); + private void updatePartitionSingleMap(UUID nodeId, GridDhtPartitionsSingleMessage msg) { + msgs.put(nodeId, msg); for (Map.Entry entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); @@ -2282,7 +2309,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { boolean crdChanged = false; boolean allReceived = false; - Set reqFrom = null; + Set remaining0 = null; ClusterNode crd0; @@ -2301,11 +2328,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (crd != null && crd.isLocal()) { - if (rmvd) + if (crdChanged) + remaining0 = new HashSet<>(remaining); + else if (crdReady && rmvd) allReceived = remaining.isEmpty(); - - if (crdChanged && !remaining.isEmpty()) - reqFrom = new HashSet<>(remaining); } crd0 = crd; @@ -2334,35 +2360,30 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (stateChangeExchange() && changeGlobalStateE != null) changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE); - if (allReceived) { - awaitSingleMapUpdates(); - - onAllReceived(); + if (crdChanged) { + boolean newAff = localJoinExchange(); + + IgniteInternalFuture fut = cctx.affinity().initCoordinatorCaches( + GridDhtPartitionsExchangeFuture.this, newAff); + + if (fut == null || fut.isDone()) + onBecomeCoordinator(); + else { + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + onBecomeCoordinator(); + } + }); + } return; } - if (crdChanged && reqFrom != null) { - GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId); - - for (UUID nodeId : reqFrom) { - try { - // It is possible that some nodes finished exchange with previous coordinator. - cctx.io().send(nodeId, req, SYSTEM_POOL); - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Node left during partition exchange [nodeId=" + nodeId + - ", exchId=" + exchId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to request partitions from node: " + nodeId, e); - } - } - } + if (allReceived) { + awaitSingleMapUpdates(); - for (Map.Entry m : singleMsgs.entrySet()) - processMessage(m.getKey(), m.getValue()); + onAllReceived(); + } } else { if (crdChanged) { @@ -2373,11 +2394,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } } - catch (Exception e) { + catch (IgniteCheckedException e) { if (reconnectOnError(e)) onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); else - throw e; + U.error(log, "Failed to process node left event: " + e, e); } finally { leaveBusy(); @@ -2391,6 +2412,51 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * + */ + private void onBecomeCoordinator() { + Set remaining0 = null; + + synchronized (this) { + assert crd != null && crd.isLocal(); + assert !crdReady; + + crdReady = true; + + if (!remaining.isEmpty()) + remaining0 = new HashSet<>(remaining); + } + + if (remaining0 != null) { + // It is possible that some nodes finished exchange with previous coordinator. + GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId); + + for (UUID nodeId : remaining0) { + try { + if (!singleMsgs.containsKey(nodeId)) + cctx.io().send(nodeId, req, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException ignored) { + if (log.isDebugEnabled()) + log.debug("Node left during partition exchange [nodeId=" + nodeId + + ", exchId=" + exchId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to request partitions from node: " + nodeId, e); + } + } + + for (Map.Entry m : singleMsgs.entrySet()) + processMessage(m.getKey(), m.getValue()); + } + else { + awaitSingleMapUpdates(); + + onAllReceived(); + } + } + + /** * @param e Exception. * @return {@code True} if local node should try reconnect in case of error. */ @@ -2529,4 +2595,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return nextTimeout <= limit ? nextTimeout : limit; } + + /** + * + */ + private static class FinishState { + /** */ + private final UUID crdId; + + /** + * @param crdId Coordinator node. + */ + FinishState(UUID crdId) { + this.crdId = crdId; + } + } }