ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [5/6] ignite git commit: Merge branch 'ignite-5578-locJoin' into ignite-5578
Date Wed, 19 Jul 2017 10:13:34 GMT
Merge branch 'ignite-5578-locJoin' into ignite-5578

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5978213e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5978213e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5978213e

Branch: refs/heads/ignite-5578
Commit: 5978213e1e8c8d81d4b33ced4e38bec1edd5cee5
Parents: a7cb829 ba46cbd
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Jul 19 13:13:15 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Jul 19 13:13:15 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java |  3 +-
 .../discovery/GridDiscoveryManager.java         |  5 +-
 .../cache/CacheAffinitySharedManager.java       | 52 ++++++++++++++++++--
 .../GridCachePartitionExchangeManager.java      | 17 -------
 .../preloader/CacheGroupAffinityMessage.java    |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        |  7 ++-
 .../preloader/GridDhtPartitionsFullMessage.java |  4 +-
 7 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 6ec9b73,1e34f0c..8d309ed
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -697,7 -697,11 +697,7 @@@ public class GridDiscoveryManager exten
                  // event notifications, since SPI notifies manager about all events from
this listener.
                  if (verChanged) {
                      if (discoCache == null)
-                         discoCache = createDiscoCache(nextTopVer, ctx.state().clusterState(),
locNode, topSnapshot);
 -                        discoCache = createDiscoCache(
 -                            nextTopVer,
 -                            ctx.state().clusterState(),
 -                            locNode,
 -                            topSnapshot);
++                        discoCache = createDiscoCache(nextTopVer,ctx.state().clusterState(),
locNode, topSnapshot);
  
                      discoCacheHist.put(nextTopVer, discoCache);
  
@@@ -767,8 -771,11 +767,8 @@@
  
                      topHist.clear();
  
 -                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache(
 -                        AffinityTopologyVersion.ZERO,
 -                        ctx.state().clusterState(),
 -                        locNode,
 -                        Collections.<ClusterNode>emptySet())));
 +                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                         createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(),
locNode, Collections.<ClusterNode>emptySet())));
++                        createDiscoCache(AffinityTopologyVersion.ZERO,ctx.state().clusterState(),
locNode, Collections.<ClusterNode>emptySet())));
                  }
                  else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                      assert locNode.isClient() : locNode;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index ba6a22b,bb27613..1fc59bb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@@ -1244,130 -1247,51 +1246,174 @@@ public class CacheAffinitySharedManager
          return grpHolder.affinity();
      }
  
 +    public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, final
GridDhtPartitionsFullMessage msg) {
 +        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 +
 +        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 +
 +        log.info("mergeExchangesOnServerLeft [topVer=" + fut.context().events().discoveryCache().version()
+ ']');
 +
 +        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>()
{
 +            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException
{
 +                ExchangeDiscoveryEvents evts = fut.context().events();
 +
 +                Map<Integer, CacheGroupAffinityMessage> idealAffDiff = msg.idealAffinityDiff();
 +
 +                List<List<ClusterNode>> idealAssignment =
 +                    aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
 +
 +                CacheGroupAffinityMessage affMsg = idealAffDiff != null ? idealAffDiff.get(aff.groupId())
: null;
 +
 +                List<List<ClusterNode>> newAssignment;
 +
 +                if (affMsg != null) {
 +                    Map<Integer, GridLongList> diff = affMsg.assignmentsDiff();
 +
 +                    assert !F.isEmpty(diff);
 +
 +                    newAssignment = new ArrayList<>(idealAssignment);
 +
 +                    for (Map.Entry<Integer, GridLongList> e : diff.entrySet()) {
 +                        GridLongList assign = e.getValue();
 +
 +                        newAssignment.set(e.getKey(), CacheGroupAffinityMessage.toNodes(assign,
 +                            nodesByOrder,
 +                            evts.discoveryCache()));
 +                    }
 +                }
 +                else
 +                    newAssignment = idealAssignment;
 +
 +                aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment,
affCache));
 +            }
 +        });
 +    }
 +
 +    public void onJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage
msg)
 +        throws IgniteCheckedException {
 +        final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
 +
 +        final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 +
 +        final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
 +
 +        assert !F.isEmpty(joinedNodeAff) : msg;
 +        assert joinedNodeAff.size() >= affReq.size();
 +
 +        forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>()
{
 +            @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException
{
 +                if (affReq.contains(aff.groupId())) {
 +                    assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
 +
 +                    CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
 +
 +                    assert affMsg != null;
 +
 +                    ExchangeDiscoveryEvents evts = fut.context().events();
 +
 +                    List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder,
evts.discoveryCache());
 +
 +                    // Calculate ideal assignments.
 +                    if (!aff.centralizedAffinityFunction())
 +                        aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
 +
 +                    aff.initialize(evts.topologyVersion(), assignments);
 +
 +                    CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
 +
 +                    assert grp != null;
 +
 +                    grp.topology().initPartitions(fut);
 +                }
 +            }
 +        });
 +    }
 +
 +    public void mergeExchangesOnServerJoin(GridDhtPartitionsExchangeFuture fut, boolean
crd)
 +        throws IgniteCheckedException {
 +        final ExchangeDiscoveryEvents evts = fut.context().events();
 +
 +        log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version()
+ ']');
 +
 +        assert evts.serverJoin() && !evts.serverLeft();
 +
 +        WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, crd);
 +
 +        setWaitRebalanceInfo(waitRebalanceInfo, evts.waitRebalanceEventVersion(), crd);
 +    }
 +
 +    public  Map<Integer, CacheGroupAffinityMessage> mergeExchangesInitAffinityOnServerLeft(
 +        GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
 +    {
 +        final ExchangeDiscoveryEvents evts = fut.context().events();
 +
 +        assert evts.serverLeft();
 +
 +        log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version()
+ ']');
 +
 +        forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
 +            @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException
{
 +                AffinityTopologyVersion topVer = evts.topologyVersion();
 +
 +                CacheGroupHolder cache = groupHolder(topVer, desc);
 +
 +                cache.affinity().calculate(topVer, evts.lastEvent(), evts.discoveryCache());
 +            }
 +        });
 +
 +        Map<Integer, Map<Integer, List<Long>>> diff = initAffinityOnNodeLeft0(evts.topologyVersion(),
 +            fut,
 +            NODE_TO_ORDER,
 +            true);
 +
 +        return CacheGroupAffinityMessage.createAffinityDiffMessages(diff);
 +    }
 +
      /**
+      * @param fut Exchange future.
+      * @param msg Message.
+      */
+     public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage
msg) {
+         final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
+ 
+         if (F.isEmpty(affReq))
+             return;
+ 
+         final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+ 
+         final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
+ 
+         assert !F.isEmpty(joinedNodeAff) : msg;
+         assert joinedNodeAff.size() >= affReq.size();
+ 
+         forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>()
{
+             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException
{
+                 if (affReq.contains(aff.groupId())) {
+                     assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+ 
+                     CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
+ 
+                     assert affMsg != null;
+ 
+                     List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder,
fut.discoCache());
+ 
+                     // Calculate ideal assignments.
+                     if (!aff.centralizedAffinityFunction())
+                         aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+ 
+                     aff.initialize(fut.topologyVersion(), assignments);
+ 
+                     CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+ 
+                     assert grp != null;
+ 
+                     grp.topology().initPartitions(fut);
+                 }
+             }
+         });
+     }
+ 
+     /**
       * Called on exchange initiated by server node join.
       *
       * @param fut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 9ae4032,4f572df..17bea14
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1264,12 -1201,10 +1266,13 @@@ public class GridDhtPartitionsExchangeF
  
      /**
       * @param nodes Nodes.
+      * @param joinedNodeAff Affinity if was requested by some nodes.
       * @throws IgniteCheckedException If failed.
       */
 -    private void sendAllPartitions(Collection<ClusterNode> nodes, Map<Integer,
CacheGroupAffinityMessage> joinedNodeAff)
 +    private void sendAllPartitions(Collection<ClusterNode> nodes,
 +        Map<Integer, CacheGroupAffinityMessage> joinedNodeAff,
 +        Map<Integer, CacheGroupAffinityMessage> idealAffDiff,
 +        @Nullable GridDhtPartitionExchangeId msgExchId)
          throws IgniteCheckedException {
          boolean singleNode = nodes.size() == 1;
  
@@@ -2345,68 -2045,43 +2348,68 @@@
       * @param msg Message.
       */
      private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg)
{
 -        assert exchId.equals(msg.exchangeId()) : msg;
 -        assert msg.lastVersion() != null : msg;
 +        try {
 +            assert exchId.equals(msg.exchangeId()) : msg;
 +            assert msg.lastVersion() != null : msg;
  
 -        synchronized (this) {
 -            if (crd == null || finishState != null)
 -                return;
 +            synchronized (this) {
 +                if (crd == null || finishState != null)
 +                    return;
  
 -            if (!crd.equals(node)) {
 -                if (log.isDebugEnabled())
 -                    log.debug("Received full partition map from unexpected node [oldest="
+ crd.id() +
 -                        ", nodeId=" + node.id() + ']');
 +                if (!crd.equals(node)) {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Received full partition map from unexpected node [oldest="
+ crd.id() +
 +                            ", nodeId=" + node.id() + ']');
  
 -                if (node.order() > crd.order())
 -                    fullMsgs.put(node, msg);
 +                    if (node.order() > crd.order())
 +                        fullMsgs.put(node, msg);
  
 -                return;
 +                    return;
 +                }
 +
 +                finishState = new FinishState(crd.id(), msg.resultTopologyVersion());
              }
  
 -            finishState = new FinishState(crd.id());
 -        }
 +            if (exchCtx.mergeExchanges()) {
 +                if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion()))
{
 +                    log.info("Received full message, need merge [curFut=" + topologyVersion()
+
 +                        ", resVer=" + msg.resultTopologyVersion() + ']');
 +
 +                    cctx.exchange().mergeExchanges(this, msg.resultTopologyVersion());
 +                }
  
 -        Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
 +                if (localJoinExchange())
-                     cctx.affinity().onJoin(this, msg);
++                    cctx.affinity().onLocalJoin(this, msg);
 +                else {
 +                    if (exchCtx.events().serverLeft())
 +                        cctx.affinity().mergeExchangesOnServerLeft(this, msg);
 +                    else
 +                        cctx.affinity().mergeExchangesOnServerJoin(this, false);
  
 -        if (localJoinExchange() && affReq != null)
 -            cctx.affinity().onLocalJoin(this, msg);
 +                    for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
 +                        if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
 +                            continue;
  
 -        updatePartitionFullMap(msg);
 +                        grp.topology().beforeExchange(this, true);
 +                    }
 +                }
 +            }
  
 -        IgniteCheckedException err = null;
 +            updatePartitionFullMap(msg);
  
 -        if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
 -            err = new IgniteCheckedException("Cluster state change failed");
 +            IgniteCheckedException err = null;
  
 -            cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
 -        }
 +            if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
 +                err = new IgniteCheckedException("Cluster state change failed");
  
 -        onDone(exchId.topologyVersion(), err);
 +                cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
 +            }
 +
 +            onDone(exchCtx.events().topologyVersion(), err);
 +        }
 +        catch (IgniteCheckedException e) {
 +            onDone(e);
 +        }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------


Mime
View raw message