ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [21/50] ignite git commit: ignite-5446 Only lateAffinity logic in CacheAffinitySharedManager.
Date Thu, 13 Jul 2017 14:35:50 GMT
ignite-5446 Only lateAffinity logic in CacheAffinitySharedManager.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a54832e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a54832e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a54832e

Branch: refs/heads/ignite-5578
Commit: 7a54832e61e1696e96a9d323ec724ddc9ddd9761
Parents: f439649
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jul 11 17:42:31 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jul 11 17:42:31 2017 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   3 +-
 .../cache/CacheAffinitySharedManager.java       |  12 +-
 .../processors/cache/ExchangeContext.java       | 100 +++++++++
 .../cache/ExchangeDiscoveryEvents.java          |  80 +++++++
 .../processors/cache/ExchangeEvents.java        |  80 -------
 .../GridCachePartitionExchangeManager.java      |   6 +-
 .../GridDhtPartitionsExchangeFuture.java        | 216 +++++++++++++------
 .../preloader/GridDhtPartitionsFullMessage.java |  63 ++++--
 .../GridDhtPartitionsSingleMessage.java         |  14 ++
 9 files changed, 415 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 99ec08a..70672b0 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -170,7 +171,7 @@ public class MessageCodeGenerator {
 
 //        gen.generateAll(true);
 
-//        gen.generateAndWrite(GridChangeGlobalStateMessageResponse.class);
+        gen.generateAndWrite(GridDhtPartitionsFullMessage.class);
 
 //        gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f57a33a..a8cf249 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1223,6 +1223,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             (affNodes.isEmpty() || (affNodes.size() == 1 && affNodes.contains(cctx.localNode())));
     }
 
+    public List<List<ClusterNode>> affinity(AffinityTopologyVersion topVer, Integer
grpId) {
+        CacheGroupHolder grpHolder = grpHolders.get(grpId);
+
+        assert grpHolder != null : grpId;
+
+        return grpHolder.affinity().assignments(topVer);
+    }
+
     /**
      * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
@@ -1231,7 +1239,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
     }
 
-    public void processDiscoveryEvents(ExchangeEvents evts) {
+    public void processDiscoveryEvents(ExchangeDiscoveryEvents evts) {
         AffinityTopologyVersion topVer = evts.topologyVersion();
 
         if (evts.serverLeft()) {
@@ -1302,6 +1310,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
     }
 
+
+
     /**
      * @param grpIds Cache group IDs.
      * @return Cache names.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
new file mode 100644
index 0000000..8d880a6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ExchangeContext {
+    /** */
+    private final boolean coalescing;
+
+    /** */
+    private AffinityTopologyVersion resTopVer;
+
+    /** */
+    private final Map<Integer, List<List<ClusterNode>>> affMap = new HashMap<>();
+
+    /** */
+    private Set<Integer> cacheGrpsOnLocStart;
+//
+//    private Set<UUID> joinedNodes;
+//
+//    public boolean nodeJoined(UUID nodeId) {
+//        return joinedNodes != null && joinedNodes.contains(nodeId);
+//    }
+
+    /**
+     * @param coalescing
+     */
+    public ExchangeContext(AffinityTopologyVersion resTopVer, boolean coalescing) {
+        this.coalescing = coalescing;
+        this.resTopVer = resTopVer;
+    }
+
+    public AffinityTopologyVersion resultTopologyVersion() {
+        return resTopVer;
+    }
+
+    public boolean coalescing() {
+        return coalescing;
+    }
+
+    public void addCacheGroupOnLocalStart(Integer grpId) {
+        if (cacheGrpsOnLocStart == null)
+            cacheGrpsOnLocStart = new HashSet<>();
+
+        cacheGrpsOnLocStart.add(grpId);
+    }
+
+    @Nullable public Set<Integer> cacheGroupsOnLocalStart() {
+        return cacheGrpsOnLocStart;
+    }
+
+    public List<List<ClusterNode>> activeAffinity(GridCacheSharedContext cctx,
GridAffinityAssignmentCache aff) {
+        List<List<ClusterNode>> assignment = affMap.get(aff.groupId());
+
+        if (assignment != null)
+            return assignment;
+
+        AffinityTopologyVersion affTopVer = aff.lastVersion();
+
+        assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" +
aff.cacheOrGroupName() +
+            ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']';
+
+        List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
+
+        assert aff.idealAssignment() != null : "Previous assignment is not available.";
+
+        affMap.put(aff.groupId(), curAff);
+
+        return curAff;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
new file mode 100644
index 0000000..fced92e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ *
+ */
+public class ExchangeDiscoveryEvents {
+    /** */
+    private AffinityTopologyVersion topVer;
+
+    /** */
+    private DiscoCache discoCache;
+
+    /** */
+    private boolean srvJoin;
+
+    /** */
+    private boolean srvLeft;
+
+    /**
+     * @param fut Future.
+     */
+    void init(GridDhtPartitionsExchangeFuture fut) {
+        topVer = fut.topologyVersion();
+        discoCache = fut.discoCache();
+
+        ClusterNode node = fut.discoveryEvent().eventNode();
+
+        if (fut.discoveryEvent().type()== EVT_NODE_JOINED)
+            srvJoin = !CU.clientNode(node);
+        else {
+            assert fut.discoveryEvent().type() == EVT_NODE_LEFT || fut.discoveryEvent().type()
== EVT_NODE_FAILED;
+
+            srvLeft = !CU.clientNode(node);
+        }
+    }
+
+    DiscoCache discoveryCache() {
+        return discoCache;
+    }
+
+    AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    boolean serverJoin() {
+        return srvJoin;
+    }
+
+    boolean serverLeft() {
+        return srvLeft;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java
deleted file mode 100644
index 6928d85..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-
-/**
- *
- */
-public class ExchangeEvents {
-    /** */
-    private AffinityTopologyVersion topVer;
-
-    /** */
-    private DiscoCache discoCache;
-
-    /** */
-    private boolean srvJoin;
-
-    /** */
-    private boolean srvLeft;
-
-    /**
-     * @param fut Future.
-     */
-    void init(GridDhtPartitionsExchangeFuture fut) {
-        topVer = fut.topologyVersion();
-        discoCache = fut.discoCache();
-
-        ClusterNode node = fut.discoveryEvent().eventNode();
-
-        if (fut.discoveryEvent().type()== EVT_NODE_JOINED)
-            srvJoin = !CU.clientNode(node);
-        else {
-            assert fut.discoveryEvent().type() == EVT_NODE_LEFT || fut.discoveryEvent().type()
== EVT_NODE_FAILED;
-
-            srvLeft = !CU.clientNode(node);
-        }
-    }
-
-    DiscoCache discoveryCache() {
-        return discoCache;
-    }
-
-    AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    boolean serverJoin() {
-        return srvJoin;
-    }
-
-    boolean serverLeft() {
-        return srvLeft;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index aebd0ea..fa32b5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1738,8 +1738,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         this.coalesceTestWaitVer = coalesceTestWaitVer;
     }
 
-    public ExchangeEvents coalesceExchanges(GridDhtPartitionsExchangeFuture curFut) {
-        ExchangeEvents evts = null;
+    public ExchangeDiscoveryEvents coalesceExchanges(GridDhtPartitionsExchangeFuture curFut)
{
+        ExchangeDiscoveryEvents evts = null;
 
         AffinityTopologyVersion coalesceTestWaitVer = this.coalesceTestWaitVer;
 
@@ -1796,7 +1796,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     fut.mergeWithFuture(curFut);
 
                     if (evts == null)
-                        evts = new ExchangeEvents();
+                        evts = new ExchangeDiscoveryEvents();
 
                     evts.init(fut);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 907a032..d51e537 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.CacheEvent;
@@ -61,7 +62,8 @@ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerT
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
-import org.apache.ignite.internal.processors.cache.ExchangeEvents;
+import org.apache.ignite.internal.processors.cache.ExchangeContext;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -401,6 +403,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         return false;
     }
 
+    /** */
+    @GridToStringExclude
+    private ExchangeContext exchCtx;
+
     /**
      *
      */
@@ -426,7 +432,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         assert discoEvt != null : this;
         assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
 
-        boolean allowCoalescing = discoCache.minimumNodeVersion().compareTo(EXCHANGE_COALESCING_SINCE)
>= 0;
+        exchCtx = new ExchangeContext(topologyVersion(),
+            discoCache.minimumNodeVersion().compareTo(EXCHANGE_COALESCING_SINCE) >= 0);
 
         try {
             discoCache.updateAlives(cctx.discovery());
@@ -449,7 +456,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)discoEvt).customMessage()
: null) +
                 ']');
 
-            ExchangeType exchange;
+            ExchangeType exchange = null;
 
             if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                 DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
@@ -476,72 +483,89 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
             else {
-                if (discoEvt.type() == EVT_NODE_JOINED) {
-                    if (!discoEvt.eventNode().isLocal()) {
-                        Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
-                            discoEvt.eventNode().id(),
-                            topVer);
-
-                        cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
-                    }
-                    else {
-                        cctx.activate();
-
-                        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>>
caches =
-                            cctx.cache().cachesToStartOnLocalJoin();
+                if (exchCtx.coalescing()) {
+                    if (discoEvt.type() == EVT_NODE_JOINED) {
+                        if (discoEvt.eventNode().isLocal()) {
+                            localJoin();
 
-                        if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode())
{
-                            List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+                            if (crdNode) {
+                                exchange = ExchangeType.NONE;
+                            }
+                            else
+                                sendLocalJoinMessage(crd);
+                        }
+                        else {
+                            if (CU.clientNode(discoEvt.eventNode())) {
+                                onClientNodeEvent(crdNode);
 
-                            if (caches != null) {
-                                for (T2<DynamicCacheDescriptor, NearCacheConfiguration>
c : caches)
-                                    startDescs.add(c.get1());
+                                exchange = ExchangeType.NONE;
                             }
+                            else {
+                                if (cctx.kernalContext().clientNode())
+                                    exchange = ExchangeType.CLIENT;
+                                else {
 
-                            cctx.database().readCheckpointAndRestoreMemory(startDescs);
+                                }
+                            }
                         }
+                    }
+                    else {
 
-                        cctx.cache().startCachesOnLocalJoin(caches, topVer);
                     }
                 }
+                else {
+                    if (discoEvt.type() == EVT_NODE_JOINED) {
+                        if (!discoEvt.eventNode().isLocal()) {
+                            Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
+                                discoEvt.eventNode().id(),
+                                topVer);
+
+                            cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
+                        }
+                        else
+                            localJoin();
+                    }
 
-                exchange = CU.clientNode(discoEvt.eventNode()) ?
-                    onClientNodeEvent(crdNode) :
-                    onServerNodeEvent(crdNode);
+                    exchange = CU.clientNode(discoEvt.eventNode()) ?
+                        onClientNodeEvent(crdNode) :
+                        onServerNodeEvent(crdNode);
+                }
             }
 
             updateTopologies(crdNode);
 
-            switch (exchange) {
-                case ALL: {
-                    distributedExchange();
+            if (exchange != null) {
+                switch (exchange) {
+                    case ALL: {
+                        distributedExchange();
 
-                    break;
-                }
+                        break;
+                    }
 
-                case CLIENT: {
-                    initTopologies();
+                    case CLIENT: {
+                        initTopologies();
 
-                    clientOnlyExchange();
+                        clientOnlyExchange();
 
-                    break;
-                }
+                        break;
+                    }
 
-                case NONE: {
-                    initTopologies();
+                    case NONE: {
+                        initTopologies();
 
-                    onDone(topVer);
+                        onDone(topVer);
 
-                    break;
+                        break;
+                    }
+
+                    default:
+                        assert false;
                 }
 
-                default:
-                    assert false;
+                if (cctx.localNode().isClient())
+                    tryToPerformLocalSnapshotOperation();
             }
 
-            if (cctx.localNode().isClient())
-                tryToPerformLocalSnapshotOperation();
-
             exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode
+ ']');
         }
         catch (IgniteInterruptedCheckedException e) {
@@ -566,6 +590,35 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
     }
 
+    private void localJoin() throws IgniteCheckedException {
+        cctx.activate();
+
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
+            cctx.cache().cachesToStartOnLocalJoin();
+
+        if (caches != null) {
+            for (T2<DynamicCacheDescriptor, NearCacheConfiguration> cache : caches)
{
+                DynamicCacheDescriptor desc = cache.get1();
+
+                if (desc.cacheConfiguration().getCacheMode() != CacheMode.LOCAL)
+                    exchCtx.addCacheGroupOnLocalStart(cache.get1().groupId());
+            }
+        }
+
+        if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode())
{
+            List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+
+            if (caches != null) {
+                for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
+                    startDescs.add(c.get1());
+            }
+
+            cctx.database().readCheckpointAndRestoreMemory(startDescs);
+        }
+
+        cctx.cache().startCachesOnLocalJoin(caches, topologyVersion());
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
@@ -877,11 +930,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
 
         cctx.database().beforeExchange(this);
-
-        ExchangeEvents mergedEvts = null;
-
-        if (crd.isLocal())
-            mergedEvts = cctx.exchange().coalesceExchanges(this);
+//
+//        ExchangeDiscoveryEvents mergedEvts = null;
+//
+//        if (crd.isLocal())
+//            mergedEvts = cctx.exchange().coalesceExchanges(this);
 
         if (crd.isLocal()) {
             if (remaining.isEmpty())
@@ -1105,6 +1158,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param crd Current coordinator node
+     * @throws IgniteCheckedException If failed.
+     */
+    private void sendLocalJoinMessage(ClusterNode crd) throws IgniteCheckedException {
+        assert crd != null;
+
+        GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+            CU.clientNode(cctx.localNode()),
+            cctx.versions().last(),
+            true);
+
+        msg.cacheGroupsOnJoin(exchCtx.cacheGroupsOnLocalStart());
+
+        if (log.isDebugEnabled()) {
+            log.debug("Sending local partitions on local join [nodeId=" + crd.id() +
+                ", exchId=" + exchId + ", msg=" + msg + ']');
+        }
+
+        try {
+            cctx.io().send(crd, msg, SYSTEM_POOL);
+        }
+        catch (ClusterTopologyCheckedException ignored) {
+            if (log.isDebugEnabled())
+                log.debug("Node left during partition exchange [nodeId=" + crd.id() + ",
exchId=" + exchId + ']');
+        }
+    }
+
+    /**
      * @param node Node.
      * @throws IgniteCheckedException If failed.
      */
@@ -1775,21 +1856,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
-                if (msg instanceof GridDhtPartitionsSingleMessage) {
-                    GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;
+            Map<Integer, List<List<ClusterNode>>> affMap = null;
 
-                    for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet())
{
-                        Integer grpId = entry.getKey();
-                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+            for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet())
{
+                GridDhtPartitionsSingleMessage msg = e.getValue();
 
-                        GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                            cctx.exchange().clientTopology(grpId, this);
+                for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet())
{
+                    Integer grpId = entry.getKey();
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                        Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId);
+                    GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                        cctx.exchange().clientTopology(grpId, this);
 
-                        if (cntrs != null)
-                            top.applyUpdateCounters(cntrs);
+                    Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId);
+
+                    if (cntrs != null)
+                        top.applyUpdateCounters(cntrs);
+                }
+
+                Collection<Integer> grpsOnJoin = msg.cacheGroupsOnJoin();
+
+                if (grpsOnJoin != null) {
+                    if (affMap == null)
+                        affMap = new HashMap<>();
+
+                    for (Integer grpId : grpsOnJoin) {
+                        if (!affMap.containsKey(grpId)) {
+                            //cctx.affinity().affinity()
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 75609b8..6930c28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -25,12 +25,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -99,6 +101,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @GridDirectTransient
     private transient boolean compress;
 
+    /** */
+    private AffinityTopologyVersion resTopVer;
+
+    /** */
+    @GridDirectMap(keyType = Integer.class, valueType = GridLongList.class)
+    private Map<Integer, GridLongList> cachesAff;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -406,42 +415,54 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (writer.state()) {
             case 5:
-                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT,
MessageCollectionItemType.INT))
+                if (!writer.writeMap("cachesAff", cachesAff, MessageCollectionItemType.INT,
MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeByteArray("errsBytes", errsBytes))
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT,
MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeByteArray("errsBytes", errsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+                if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 11:
+                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeMessage("resTopVer", resTopVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -464,7 +485,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (reader.state()) {
             case 5:
-                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT,
MessageCollectionItemType.INT, false);
+                cachesAff = reader.readMap("cachesAff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG,
false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -472,7 +493,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 6:
-                errsBytes = reader.readByteArray("errsBytes");
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT,
MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -480,7 +501,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 7:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                errsBytes = reader.readByteArray("errsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -488,7 +509,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 8:
-                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -496,7 +517,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 9:
-                partsBytes = reader.readByteArray("partsBytes");
+                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -504,7 +525,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 10:
-                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+                partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -512,6 +533,22 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 11:
+                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                resTopVer = reader.readMessage("resTopVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -531,7 +568,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a54832e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index b4d25c4..ae01be7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.HashMap;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.io.Externalizable;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -88,6 +90,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @GridDirectTransient
     private transient boolean compress;
 
+    /** */
+    @GridDirectCollection(Integer.class)
+    private Collection<Integer> cacheGrpsOnJoin;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -111,6 +117,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         this.compress = compress;
     }
 
+    public void cacheGroupsOnJoin(Collection<Integer> cacheGrpsOnJoin) {
+        this.cacheGrpsOnJoin = cacheGrpsOnJoin;
+    }
+
+    @Nullable public Collection<Integer> cacheGroupsOnJoin() {
+        return cacheGrpsOnJoin;
+    }
+
     /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;


Mime
View raw message