ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/15] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-5578
Date Thu, 27 Jul 2017 16:39:26 GMT
Merge remote-tracking branch 'remotes/origin/master' into ignite-5578

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b3f44078
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b3f44078
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b3f44078

Branch: refs/heads/ignite-5578
Commit: b3f44078eac07425a828b9fff8a184e5dc503412
Parents: 18f4929 e9a0d69
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Jul 27 15:27:43 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jul 27 15:27:43 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  5 +-
 .../processors/cache/GridCacheIoManager.java    |  2 +-
 .../GridCachePartitionExchangeManager.java      | 44 ++++++++++-----
 .../dht/GridClientPartitionTopology.java        | 12 +++-
 .../dht/GridDhtPartitionTopology.java           |  9 ++-
 .../dht/GridDhtPartitionTopologyImpl.java       | 33 ++++++-----
 .../preloader/GridDhtPartitionExchangeId.java   |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        | 36 ++++++++----
 .../preloader/GridDhtPartitionsFullMessage.java |  4 +-
 ...arOptimisticSerializableTxPrepareFuture.java |  2 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  2 +-
 .../GridNearPessimisticTxPrepareFuture.java     |  2 +-
 .../cache/transactions/IgniteTxHandler.java     | 59 +++++++++++---------
 ...cingDelayedPartitionMapExchangeSelfTest.java | 58 +++++++++++++++----
 .../junits/common/GridCommonAbstractTest.java   |  6 +-
 15 files changed, 185 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4fdad7c,6a7258f..c0e6a11
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -130,10 -129,7 +130,10 @@@ public class GridCachePartitionExchange
      private static final int EXCHANGE_HISTORY_SIZE =
          IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE,
1_000);
  
 +    /** TODO IGNITE-5578. */
 +    private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.1.0");
 +
-     /** Atomic reference for pending timeout object. */
+     /** Atomic reference for pending partition resend timeout object. */
      private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
  
      /** Partition resend timeout after eviction. */
@@@ -1102,11 -1072,12 +1110,11 @@@
      }
  
      /**
-      * @param node Node.
-      * @param id ID.
+      * @param node Destination cluster node.
+      * @param id Exchange ID.
       */
      private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId
id) {
 -        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
 -            id,
 +        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
              cctx.kernalContext().clientNode(),
              false);
  
@@@ -1127,7 -1098,8 +1135,7 @@@
      }
  
      /**
-      * @param exchangeId ID.
 -     * @param targetNode Target node.
+      * @param exchangeId Exchange ID.
       * @param clientOnlyExchange Client exchange flag.
       * @param sndCounters {@code True} if need send partition update counters.
       * @return Message.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0dae4d2,a8e13a0..1b4bbcc
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -435,18 -433,14 +435,17 @@@ public class GridDhtPartitionTopologyIm
                      if (stopping)
                          return;
  
 -                    GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 +                    ExchangeDiscoveryEvents evts = exchFut.context().events();
  
 -                    assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version
[topVer=" + topVer +
 -                        ", exchId=" + exchId + ']';
 +                    assert topVer.equals(exchFut.initialVersion()) : "Invalid topology version
[topVer=" + topVer +
 +                        ", exchId=" + exchFut.exchangeId() + ']';
  
 -                    if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
 -                        removeNode(exchId.nodeId());
 +                    topVer = evts.topologyVersion();
  
 +                    for (DiscoveryEvent evt : evts.events()) {
 +                        if ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT)
&& !CU.clientNode(evt.eventNode()))
 +                            removeNode(evt.eventNode().id());
 +                    }
-     
                      ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
  
                      if (log.isDebugEnabled()) {
@@@ -1118,21 -1113,12 +1117,21 @@@
              if (stopping)
                  return false;
  
 +            if (exchangeVer == null && (topReadyFut == null || !topReadyFut.isDone()))
 +                return false;
 +
 +            if (exchangeVer != null) {
 +                assert exchangeVer.compareTo(topVer) >= 0 : exchangeVer;
 +
 +                topVer = exchangeVer;
 +            }
 +
-             if (cntrMap != null) {
+             if (incomeCntrMap != null) {
                  // update local map partition counters
-                 for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet())
{
-                     T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+                 for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet())
{
+                     T2<Long, Long> existCntr = this.cntrMap.get(e.getKey());
  
-                     if (cntr == null || cntr.get2() < e.getValue().get2())
+                     if (existCntr == null || existCntr.get2() < e.getValue().get2())
                          this.cntrMap.put(e.getKey(), e.getValue());
                  }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5888f77,71e41b0..f749833
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -723,10 -613,11 +726,11 @@@ public class GridDhtPartitionsExchangeF
                  boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion());
  
                  if (updateTop && clientTop != null) {
 -                    top.update(topologyVersion(),
 +                    top.update(initialVersion(),
                          clientTop.partitionMap(true),
                          clientTop.updateCounters(false),
-                         Collections.<Integer>emptySet());
+                         Collections.<Integer>emptySet(),
+                         null);
                  }
              }
  
@@@ -1223,14 -1096,7 +1227,14 @@@
      }
  
      /**
 +     * @return {@code True} if exchange for local node join.
 +     */
 +    boolean localJoinExchange() {
 +        return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal();
 +    }
 +
 +    /**
-      * @param node Node.
+      * @param node Target Node.
       * @throws IgniteCheckedException If failed.
       */
      private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
@@@ -1692,291 -1507,39 +1696,297 @@@
      }
  
      /**
 -     * @param top Topology to assign.
++     * Processing of received single message. Actual processing in future may be delayed
if init method was not
++     * completed, see {@link #initDone()}
++     *
 +     * @param node Sender node.
 +     * @param msg Single partition info.
       */
 -    private void assignPartitionStates(GridDhtPartitionTopology top) {
 -        Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
 -        Map<Integer, Long> minCntrs = new HashMap<>();
 +    public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage
msg) {
 +        assert !node.isDaemon() : node;
 +        assert msg != null;
 +        assert exchId.equals(msg.exchangeId()) : msg;
 +        assert !cctx.kernalContext().clientNode();
  
 -        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet())
{
 -            assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
 +        if (msg.restoreState()) {
 +            InitNewCoordinatorFuture newCrdFut0;
  
 -            for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet())
{
 -                int p = e0.getKey();
 +            synchronized (this) {
 +                assert newCrdFut != null;
  
 -                UUID uuid = e.getKey();
 +                newCrdFut0 = newCrdFut;
 +            }
  
 -                GridDhtPartitionState state = top.partitionState(uuid, p);
 +            newCrdFut0.onMessage(node, msg);
  
 -                if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
 -                    continue;
 +            return;
 +        }
  
 -                Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1()
: e0.getValue().get2();
 +        if (!msg.client()) {
 +            assert msg.lastVersion() != null : msg;
  
 -                if (cntr == null)
 -                    cntr = 0L;
 +            updateLastVersion(msg.lastVersion());
 +        }
  
 -                Long minCntr = minCntrs.get(p);
 +        GridDhtPartitionsExchangeFuture mergedWith0 = null;
  
 -                if (minCntr == null || minCntr > cntr)
 -                    minCntrs.put(p, cntr);
 +        synchronized (this) {
 +            if (state == ExchangeLocalState.MERGED) {
 +                assert mergedWith != null;
  
 -                if (state != GridDhtPartitionState.OWNING)
 -                    continue;
 +                mergedWith0 = mergedWith;
 +            }
 +            else {
 +                assert state != ExchangeLocalState.CLIENT;
  
 -                CounterWithNodes maxCntr = maxCntrs.get(p);
 +                if (exchangeId().isJoined() && node.id().equals(exchId.nodeId()))
 +                    pendingJoinMsg = msg;
 +            }
 +        }
 +
 +        if (mergedWith0 != null) {
 +            mergedWith0.processMergedMessage(node, msg);
 +
 +            return;
 +        }
 +
 +        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
 +            @Override public void apply(IgniteInternalFuture<Boolean> f) {
 +                try {
 +                    if (!f.get())
 +                        return;
 +                }
 +                catch (IgniteCheckedException e) {
 +                    U.error(log, "Failed to initialize exchange future: " + this, e);
 +
 +                    return;
 +                }
 +
 +                processSingleMessage(node.id(), msg);
 +            }
 +        });
 +    }
 +
 +    public void waitAndReplyToNode(final ClusterNode node, final GridDhtPartitionsSingleMessage
msg) {
 +        listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 +            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
 +                FinishState finishState0;
 +
 +                synchronized (GridDhtPartitionsExchangeFuture.this) {
 +                    finishState0 = finishState;
 +                }
 +
 +                assert finishState0 != null;
 +
 +                sendAllPartitionsToNode(finishState0, msg, node.id());
 +            }
 +        });
 +    }
 +
 +    /**
++     * Note this method performs heavy updatePartitionSingleMap operation, this operation
is moved out from the
++     * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed
under critical section.
++     *
 +     * @param nodeId Node ID.
 +     * @param msg Client's message.
 +     */
 +    private void waitAndReplyToClient(final UUID nodeId, final GridDhtPartitionsSingleMessage
msg) {
 +        assert msg.client();
 +
 +        listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 +            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
 +                FinishState finishState0;
 +
 +                synchronized (GridDhtPartitionsExchangeFuture.this) {
 +                    finishState0 = finishState;
 +                }
 +
 +                if (finishState0 == null) {
 +                    assert discoEvt.type() == EVT_NODE_JOINED && CU.clientNode(discoEvt.eventNode())
: discoEvt;
 +
 +                    finishState0 = new FinishState(cctx.localNodeId(),
 +                        initialVersion(),
 +                        createPartitionsMessage(false));
 +                }
 +
 +                sendAllPartitionsToNode(finishState0, msg, nodeId);
 +            }
 +        });
 +    }
 +
 +    /**
 +     * @param nodeId Sender node.
-      * @param msg Message.
++     * @param msg Partition single message.
 +     */
 +    void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
 +        if (msg.client()) {
 +            waitAndReplyToClient(nodeId, msg);
 +
 +            return;
 +        }
 +
-         boolean allReceived = false;
++        boolean allReceived = false; // Received all expected messages.
 +        boolean updateSingleMap = false;
 +
 +        FinishState finishState0 = null;
 +
 +        synchronized (this) {
 +            assert crd != null;
 +
 +            switch (state) {
 +                case DONE: {
 +                    log.info("Received single message, already done [ver=" + initialVersion()
+
 +                        ", node=" + nodeId + ']');
 +
 +                    assert finishState != null;
 +
 +                    finishState0 = finishState;
 +
 +                    break;
 +                }
 +
 +                case CRD: {
 +                    assert crd.isLocal() : crd;
 +
 +                    if (remaining.remove(nodeId)) {
 +                        updateSingleMap = true;
 +
 +                        pendingSingleUpdates++;
 +
 +                        if (stateChangeExchange() && msg.getError() != null)
 +                            changeGlobalStateExceptions.put(nodeId, msg.getError());
 +
 +                        allReceived = remaining.isEmpty();
 +
 +                        log.info("Coordinator received single message [ver=" + initialVersion()
+
 +                            ", node=" + nodeId +
 +                            ", allReceived=" + allReceived + ']');
 +                    }
 +
 +                    break;
 +                }
 +
 +                case SRV:
 +                case BECOME_CRD: {
 +                    log.info("Non-coordinator received single message [ver=" + initialVersion()
+
 +                        ", node=" + nodeId + ", state=" + state + ']');
 +
 +                    pendingSingleMsgs.put(nodeId, msg);
 +
 +                    break;
 +                }
 +
 +                default:
 +                    assert false : state;
 +            }
 +        }
 +
 +        if (finishState0 != null) {
 +            sendAllPartitionsToNode(finishState0, msg, nodeId);
 +
 +            return;
 +        }
 +
 +        if (updateSingleMap) {
 +            try {
 +                // Do not update partition map, in case cluster transitioning to inactive
state.
 +                if (!deactivateCluster())
 +                    updatePartitionSingleMap(nodeId, msg);
 +            }
 +            finally {
 +                synchronized (this) {
 +                    assert pendingSingleUpdates > 0;
 +
 +                    pendingSingleUpdates--;
 +
 +                    if (pendingSingleUpdates == 0)
 +                        notifyAll();
 +                }
 +            }
 +        }
 +
 +        if (allReceived) {
 +            if (!awaitSingleMapUpdates())
 +                return;
 +
 +            onAllReceived();
 +        }
 +    }
 +
 +    /**
 +     * @return {@code False} if interrupted.
 +     */
 +    private synchronized boolean awaitSingleMapUpdates() {
 +        try {
 +            while (pendingSingleUpdates > 0)
 +                U.wait(this);
 +
 +            return true;
 +        }
 +        catch (IgniteInterruptedCheckedException e) {
 +            U.warn(log, "Failed to wait for partition map updates, thread was interrupted:
" + e);
 +
 +            return false;
 +        }
 +    }
 +
 +    /**
 +     * @param fut Affinity future.
 +     */
 +    private void onAffinityInitialized(IgniteInternalFuture<Map<Integer, Map<Integer,
List<UUID>>>> fut) {
 +        try {
 +            assert fut.isDone();
 +
 +            Map<Integer, Map<Integer, List<UUID>>> assignmentChange =
fut.get();
 +
 +            GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
 +
 +            CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
 +
 +            if (log.isDebugEnabled())
 +                log.debug("Centralized affinity exchange, send affinity change message:
" + msg);
 +
 +            cctx.discovery().sendCustomEvent(msg);
 +        }
 +        catch (IgniteCheckedException e) {
 +            onDone(e);
 +        }
 +    }
 +
 +    /**
 +     * @param top Topology to assign.
 +     */
 +    private void assignPartitionStates(GridDhtPartitionTopology top) {
 +        Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
 +        Map<Integer, Long> minCntrs = new HashMap<>();
 +
 +        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet())
{
 +            assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
 +
 +            for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet())
{
 +                int p = e0.getKey();
 +
 +                UUID uuid = e.getKey();
 +
 +                GridDhtPartitionState state = top.partitionState(uuid, p);
 +
 +                if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
 +                    continue;
 +
 +                Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1()
: e0.getValue().get2();
 +
 +                if (cntr == null)
 +                    cntr = 0L;
 +
 +                Long minCntr = minCntrs.get(p);
 +
 +                if (minCntr == null || minCntr > cntr)
 +                    minCntrs.put(p, cntr);
 +
 +                if (state != GridDhtPartitionState.OWNING)
 +                    continue;
 +
 +                CounterWithNodes maxCntr = maxCntrs.get(p);
  
                  if (maxCntr == null || cntr > maxCntr.cnt)
                      maxCntrs.put(p, new CounterWithNodes(cntr, uuid));
@@@ -2363,235 -1786,41 +2373,235 @@@
  
                          err = new IgniteCheckedException("Cluster state change failed.");
  
 -                        cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions,
req);
 +                        cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions,
req);
 +                    }
 +
 +                    boolean active = !stateChangeErr && req.activate();
 +
 +                    ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage(
 +                        req.requestId(),
 +                        active);
 +
 +                    cctx.discovery().sendCustomEvent(stateFinishMsg);
 +                }
 +
 +                if (!nodes.isEmpty())
 +                    sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff);
 +
 +                onDone(exchCtx.events().topologyVersion(), err);
 +
 +                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : pendingSingleMsgs.entrySet())
 +                    processSingleMessage(e.getKey(), e.getValue());
 +            }
 +        }
 +        catch (IgniteCheckedException e) {
 +            if (reconnectOnError(e))
 +                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
 +            else
 +                onDone(e);
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private void assignPartitionsStates() {
 +        if (cctx.database().persistenceEnabled()) {
 +            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
 +                if (grp.isLocal())
 +                    continue;
 +
 +                assignPartitionStates(grp.topology());
 +            }
 +        }
 +    }
 +
 +    /**
 +     * @param msg Request.
 +     * @param nodeId Node ID.
 +     */
 +    private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsSingleMessage
msg, UUID nodeId) {
 +        ClusterNode node = cctx.node(nodeId);
 +
 +        if (node != null) {
 +            GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy();
 +            fullMsg.exchangeId(msg.exchangeId());
 +
 +            Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 +
 +            if (affReq != null) {
 +                Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages(
 +                    cctx,
 +                    finishState.resTopVer,
 +                    affReq,
 +                    null);
 +
 +                fullMsg.joinedNodeAffinity(aff);
 +            }
 +
 +            if (!fullMsg.exchangeId().equals(msg.exchangeId()))
 +                fullMsg.exchangeId(msg.exchangeId());
 +
 +            try {
 +                cctx.io().send(node, fullMsg, SYSTEM_POOL);
 +            }
 +            catch (ClusterTopologyCheckedException e) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Failed to send partitions, node failed: " + node);
 +            }
 +            catch (IgniteCheckedException e) {
 +                U.error(log, "Failed to send partitions [node=" + node + ']', e);
 +            }
 +        }
 +        else if (log.isDebugEnabled())
 +            log.debug("Failed to send partitions, node failed: " + nodeId);
 +
 +    }
 +
 +    /**
 +     * @param node Sender node.
 +     * @param msg Full partition info.
 +     */
 +    public void onReceiveFullMessage(final ClusterNode node, final GridDhtPartitionsFullMessage
msg) {
 +        assert msg != null;
 +        assert msg.exchangeId() != null : msg;
 +        assert !node.isDaemon() : node;
 +
 +        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
 +            @Override public void apply(IgniteInternalFuture<Boolean> f) {
 +                try {
 +                    if (!f.get())
 +                        return;
 +                }
 +                catch (IgniteCheckedException e) {
 +                    U.error(log, "Failed to initialize exchange future: " + this, e);
 +
 +                    return;
 +                }
 +
 +                processFullMessage(true, node, msg);
 +            }
 +        });
 +    }
 +
 +    /**
 +     * @param node Sender node.
-      * @param msg Message.
++     * @param msg Message with full partition info.
 +     */
 +    public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest
msg) {
 +        assert !cctx.kernalContext().clientNode() || msg.restoreState();
 +        assert !node.isDaemon() && !CU.clientNode(node) : node;
 +
 +        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
 +            @Override public void apply(IgniteInternalFuture<Boolean> fut) {
 +                processSinglePartitionRequest(node, msg);
 +            }
 +        });
 +    }
 +
 +    /**
 +     * @param node Sender node.
 +     * @param msg Message.
 +     */
 +    private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest
msg) {
 +        FinishState finishState0 = null;
 +
 +        synchronized (this) {
 +            if (crd == null) {
 +                log.info("Ignore partitions request, no coordinator [node=" + node.id()
+ ']');
 +
 +                return;
 +            }
 +
 +            switch (state) {
 +                case DONE: {
 +                    assert finishState != null;
 +
 +                    if (node.id().equals(finishState.crdId)) {
 +                        log.info("Ignore partitions request, finished exchange with this
coordinator: " + msg);
 +
 +                        return;
 +                    }
 +
 +                    finishState0 = finishState;
 +
 +                    break;
 +                }
 +
 +                case CRD:
 +                case BECOME_CRD: {
 +                    log.info("Ignore partitions request, node is coordinator: " + msg);
 +
 +                    return;
 +                }
 +
 +                case CLIENT:
 +                case SRV: {
 +                    if (!cctx.discovery().alive(node)) {
 +                        log.info("Ignore restore state request, node is not alive [node="
+ node.id() + ']');
 +
 +                        return;
                      }
  
 -                    boolean active = !stateChangeErr && req.activate();
 +                    if (msg.restoreState()) {
 +                        if (!node.equals(crd)) {
 +                            if (node.order() > crd.order()) {
 +                                log.info("Received restore state request, change coordinator
[oldCrd=" + crd.id() +
 +                                    "newCrd=" + node.id() + ']');
  
 -                    ChangeGlobalStateFinishMessage msg = new ChangeGlobalStateFinishMessage(req.requestId(),
active);
 +                                crd = node; // Do not allow to process FullMessage from
old coordinator.
 +                            }
 +                            else {
 +                                log.info("Ignore restore state request, coordinator changed
[oldCrd=" + crd.id() +
 +                                    "newCrd=" + node.id() + ']');
  
 -                    cctx.discovery().sendCustomEvent(msg);
 -                }
 +                                return;
 +                            }
 +                        }
 +                    }
  
 -                if (!nodes.isEmpty())
 -                    sendAllPartitions(nodes);
 +                    break;
 +                }
  
 -                onDone(exchangeId().topologyVersion(), err);
 +                default:
 +                    assert false : state;
              }
          }
 -        catch (IgniteCheckedException e) {
 -            if (reconnectOnError(e))
 -                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
 -            else
 -                onDone(e);
 -        }
 -    }
  
 -    /**
 -     *
 -     */
 -    private void assignPartitionsStates() {
 -        if (cctx.database().persistenceEnabled()) {
 -            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
 -                if (grp.isLocal())
 -                    continue;
 +        if (msg.restoreState()) {
 +            try {
 +                assert msg.restoreExchangeId() != null : msg;
  
 -                assignPartitionStates(grp.topology());
 +                GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage(
 +                    msg.restoreExchangeId(),
 +                    cctx.kernalContext().clientNode(),
 +                    true);
 +
 +                if (localJoinExchange() && finishState0 == null)
 +                    res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
 +
 +                res.restoreState(true);
 +
 +                res.finishMessage(finishState0 != null ? finishState0.msg : null);
 +
 +                cctx.io().send(node, res, SYSTEM_POOL);
 +            }
 +            catch (ClusterTopologyCheckedException ignored) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Node left during partition exchange [nodeId=" + node.id()
+ ", exchId=" + exchId + ']');
              }
 +            catch (IgniteCheckedException e) {
 +                U.error(log, "Failed to send partitions message [node=" + node + ", msg="
+ msg + ']', e);
 +            }
 +
 +            return;
 +        }
 +
 +        try {
 +            sendLocalPartitions(node);
 +        }
 +        catch (IgniteCheckedException e) {
 +            U.error(log, "Failed to send message to coordinator: " + e);
          }
      }
  
@@@ -2726,19 -1960,21 +2736,21 @@@
              CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
  
              if (grp != null) {
 -                grp.topology().update(topologyVersion(),
 +                grp.topology().update(exchCtx.events().topologyVersion(),
                      entry.getValue(),
                      cntrMap,
-                     msg.partsToReload(cctx.localNodeId(), grpId));
+                     msg.partsToReload(cctx.localNodeId(), grpId),
+                     null);
              }
              else {
                  ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
  
                  if (oldest != null && oldest.isLocal()) {
 -                    cctx.exchange().clientTopology(grpId, this).update(topologyVersion(),
 +                    cctx.exchange().clientTopology(grpId, this).update(exchCtx.events().topologyVersion(),
                          entry.getValue(),
                          cntrMap,
-                         Collections.<Integer>emptySet());
+                         Collections.<Integer>emptySet(),
+                         null);
                  }
              }
          }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------


Mime
View raw message