ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5578 Affinity for local join
Date Tue, 11 Jul 2017 16:56:31 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578-locJoin [created] 6c52ee107


ignite-5578 Affinity for local join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c52ee10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c52ee10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c52ee10

Branch: refs/heads/ignite-5578-locJoin
Commit: 6c52ee107cde481f43bba1772267ab83361c9497
Parents: e93b284
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jul 11 18:04:49 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jul 11 19:56:23 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   9 +-
 .../internal/managers/discovery/DiscoCache.java |  39 +++
 .../cache/CacheAffinitySharedManager.java       |  33 ++-
 .../processors/cache/ExchangeContext.java       |  59 +++++
 .../GridCachePartitionExchangeManager.java      |   7 +-
 .../dht/preloader/CacheGroupAffinity.java       | 156 +++++++++++
 .../GridDhtPartitionsAbstractMessage.java       |   9 +
 .../GridDhtPartitionsExchangeFuture.java        | 259 +++++++++++++------
 .../preloader/GridDhtPartitionsFullMessage.java |  90 ++++++-
 .../GridDhtPartitionsSingleMessage.java         |  44 +++-
 .../GridCacheDatabaseSharedManager.java         |   1 +
 11 files changed, 593 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3dac18e..261a619 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinity;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -869,7 +870,13 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124..127] [-23..-27] [-36..-55]- this
+            case 128:
+                msg = new CacheGroupAffinity();
+
+                break;
+
+
+            // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             // [-54..-60] - Snapshots

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4c1077b..72f482a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -280,6 +280,45 @@ public class DiscoCache {
     }
 
     /**
+     * @param order Order.
+     * @return Server node instance.
+     */
+    @Nullable public ClusterNode serverNodeByOrder(long order) {
+        int idx = serverNodeBinarySearch(order);
+
+        if (idx >= 0)
+            return srvNodes.get(idx);
+
+        return null;
+    }
+
+    /**
+     * @param order Node order.
+     * @return Node index.
+     */
+    private int serverNodeBinarySearch(long order) {
+        int low = 0;
+        int high = srvNodes.size() - 1;
+
+        while (low <= high) {
+            int mid = (low + high) >>> 1;
+
+            ClusterNode midVal = srvNodes.get(mid);
+
+            int cmp = Long.compare(midVal.order(), order);
+
+            if (cmp < 0)
+                low = mid + 1;
+            else if (cmp > 0)
+                high = mid - 1;
+            else
+                return mid;
+        }
+
+        return -(low + 1);
+    }
+
+    /**
      * @param nodes Cluster nodes.
      * @return Empty collection if nodes list is {@code null}
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 79ab183..f72d0e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1224,6 +1224,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param topVer Topology version.
+     * @param grpId Cache group ID.
+     * @return Affinity assignments.
+     */
+    public List<List<ClusterNode>> affinity(AffinityTopologyVersion topVer, Integer grpId) {
+        CacheGroupHolder grpHolder = grpHolders.get(grpId);
+
+        assert grpHolder != null : grpId;
+
+        return grpHolder.affinity().assignments(topVer);
+    }
+
+    /**
      * Called on exchange initiated by server node join.
      *
      * @param fut Exchange future.
@@ -1319,18 +1332,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 grp.affinity().initialize(fut.topologyVersion(), assignment);
             }
             else {
-                CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
+                if (fut.context().fetchAffinityOnJoin()) {
+                    CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
 
-                assert grpDesc != null : grp.cacheOrGroupName();
+                    assert grpDesc != null : grp.cacheOrGroupName();
 
-                GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                    grpDesc.groupId(),
-                    topVer,
-                    fut.discoCache());
+                    GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+                        grpDesc.groupId(),
+                        topVer,
+                        fut.discoCache());
 
-                fetchFut.init(false);
+                    fetchFut.init(false);
 
-                fetchFuts.add(fetchFut);
+                    fetchFuts.add(fetchFut);
+                }
+                else
+                    fut.context().addGroupAffinityRequestOnJoin(grp.groupId());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
new file mode 100644
index 0000000..167ec4b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ExchangeContext {
+    /** */
+    private Set<Integer> requestGrpsAffOnJoin;
+
+    /** */
+    private boolean fetchAffOnJoin;
+
+    /**
+     * @return {@code True} if on local join need fetch affinity per-group (old protocol),
+     * otherwise affinity is sent in {@link GridDhtPartitionsFullMessage}.
+     */
+    public boolean fetchAffinityOnJoin() {
+        return false;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     */
+    public void addGroupAffinityRequestOnJoin(Integer grpId) {
+        if (requestGrpsAffOnJoin == null)
+            requestGrpsAffOnJoin = new HashSet<>();
+
+        requestGrpsAffOnJoin.add(grpId);
+    }
+
+    /**
+     * @return Groups to request affinity for.
+     */
+    @Nullable public Set<Integer> groupsAffinityRequestOnJoin() {
+        return requestGrpsAffOnJoin;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d4fe93f..69653a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1068,8 +1068,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param id ID.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
-        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
-            id,
+        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
             cctx.kernalContext().clientNode(),
             false);
 
@@ -1090,14 +1089,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param targetNode Target node.
      * @param exchangeId ID.
      * @param clientOnlyExchange Client exchange flag.
      * @param sndCounters {@code True} if need send partition update counters.
      * @return Message.
      */
-    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode,
-        @Nullable GridDhtPartitionExchangeId exchangeId,
+    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId,
         boolean clientOnlyExchange,
         boolean sndCounters)
     {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
new file mode 100644
index 0000000..e29ee06
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CacheGroupAffinity implements Message {
+    /** */
+    private int grpId;
+
+    /** */
+    @GridDirectCollection(GridLongList.class)
+    private List<GridLongList> assign;
+
+    /**
+     *
+     */
+    public CacheGroupAffinity() {
+        // No-op.
+    }
+
+    /**
+     * @param grpId Group ID.
+     * @param assign0 Assignment.
+     */
+    CacheGroupAffinity(int grpId, List<List<ClusterNode>> assign0) {
+        this.grpId = grpId;
+
+        assign = new ArrayList<>(assign0.size());
+
+        for (int i = 0; i < assign0.size(); i++) {
+            List<ClusterNode> nodes = assign0.get(i);
+
+            GridLongList l = new GridLongList(nodes.size());
+
+            for (int n = 0; n < nodes.size(); n++)
+                l.add(nodes.get(n).order());
+
+            assign.add(l);
+        }
+    }
+
+    /**
+     * @return Cache group ID.
+     */
+    int groupId() {
+        return grpId;
+    }
+
+    /**
+     * @return Assignments.
+     */
+    List<GridLongList> assignments() {
+        return assign;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeCollection("assign", assign, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("grpId", grpId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                assign = reader.readCollection("assign", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                grpId = reader.readInt("grpId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CacheGroupAffinity.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 128;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 441952d..20b33e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -64,6 +64,15 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
         this.lastVer = lastVer;
     }
 
+    /**
+     * @param msg Message.
+     */
+    void copyStateTo(GridDhtPartitionsAbstractMessage msg) {
+        msg.exchId = exchId;
+        msg.lastVer = lastVer;
+        msg.flags = flags;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean cacheGroupMessage() {
         return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8989aaa..72a3fe5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerT
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
+import org.apache.ignite.internal.processors.cache.ExchangeContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -75,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -187,9 +189,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
 
-    /** */
-    private boolean clientOnlyExchange;
-
     /** Init timestamp. Used to track the amount of time spent to complete the future. */
     private long initTs;
 
@@ -219,6 +218,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private final AtomicBoolean done = new AtomicBoolean();
 
+    /** */
+    @GridToStringExclude
+    private ExchangeContext exchCtx;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -254,6 +257,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @return Exchange context.
+     */
+    public ExchangeContext context() {
+        assert exchCtx != null : this;
+
+        return exchCtx;
+    }
+
+    /**
      * @param exchActions Exchange actions.
      */
     public void exchangeActions(ExchangeActions exchActions) {
@@ -423,6 +435,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         assert discoEvt != null : this;
         assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
 
+        exchCtx = new ExchangeContext();
+
         try {
             discoCache.updateAlives(cctx.discovery());
 
@@ -479,26 +493,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
                     }
-                    else {
-                        cctx.activate();
-
-                        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
-                            cctx.cache().cachesToStartOnLocalJoin();
-
-                        if (cctx.database().persistenceEnabled() &&
-                            !cctx.kernalContext().clientNode()) {
-                            List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
-
-                            if (caches != null) {
-                                for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
-                                    startDescs.add(c.get1());
-                            }
-
-                            cctx.database().readCheckpointAndRestoreMemory(startDescs);
-                        }
-
-                        cctx.cache().startCachesOnLocalJoin(caches, topVer);
-                    }
+                    else
+                        initCachesOnLocalJoin();
                 }
 
                 exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -565,6 +561,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * @throws IgniteCheckedException If failed.
      */
+    private void initCachesOnLocalJoin() throws IgniteCheckedException {
+        cctx.activate();
+
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches =
+            cctx.cache().cachesToStartOnLocalJoin();
+
+        if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) {
+            List<DynamicCacheDescriptor> startDescs = new ArrayList<>();
+
+            if (caches != null) {
+                for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches)
+                    startDescs.add(c.get1());
+            }
+
+            cctx.database().readCheckpointAndRestoreMemory(startDescs);
+        }
+
+        cctx.cache().startCachesOnLocalJoin(caches, topologyVersion());
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
     private void initTopologies() throws IgniteCheckedException {
         cctx.database().checkpointReadLock();
 
@@ -783,40 +802,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @throws IgniteCheckedException If failed.
      */
     private void clientOnlyExchange() throws IgniteCheckedException {
-        clientOnlyExchange = true;
-
         if (crd != null) {
-            if (crd.isLocal()) {
-                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    boolean updateTop = !grp.isLocal() &&
-                        exchId.topologyVersion().equals(grp.localStartVersion());
-
-                    if (updateTop) {
-                        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-                            if (top.groupId() == grp.groupId()) {
-                                GridDhtPartitionFullMap fullMap = top.partitionMap(true);
+            assert !crd.isLocal() : crd;
 
-                                assert fullMap != null;
-
-                                grp.topology().update(topologyVersion(),
-                                    fullMap,
-                                    top.updateCounters(false),
-                                    Collections.<Integer>emptySet());
-
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-            else {
-                if (!centralizedAff)
-                    sendLocalPartitions(crd);
+            if (!centralizedAff)
+                sendLocalPartitions(crd);
 
-                initDone();
+            initDone();
 
-                return;
-            }
+            return;
         }
         else {
             if (centralizedAff) { // Last server node failed.
@@ -869,7 +863,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
                 continue;
 
-            grp.topology().beforeExchange(this, !centralizedAff);
+            if (!localJoinExchange() || cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))
+                grp.topology().beforeExchange(this, !centralizedAff);
         }
 
         cctx.database().beforeExchange(this);
@@ -889,8 +884,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     private void tryToPerformLocalSnapshotOperation() {
         try {
-            IgniteInternalFuture fut = cctx.snapshot()
-                .tryStartLocalSnapshotOperation(discoEvt);
+            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
 
             if (fut != null)
                 fut.get();
@@ -1096,12 +1090,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @return {@code True} if exchange for local node join.
+     */
+    private boolean localJoinExchange() {
+        return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal();
+    }
+
+    /**
      * @param node Node.
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
         assert node != null;
 
+        GridDhtPartitionsSingleMessage msg;
+
         // Reset lost partition before send local partition to coordinator.
         if (exchActions != null) {
             Set<String> caches = exchActions.cachesToResetLostPartitions();
@@ -1110,22 +1113,33 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 resetLostPartitions(caches);
         }
 
-        GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
-            node, exchangeId(), clientOnlyExchange, true);
+        if (cctx.kernalContext().clientNode() || (localJoinExchange() && !cctx.database().persistenceEnabled())) {
+            msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+                cctx.kernalContext().clientNode(),
+                cctx.versions().last(),
+                true);
+        }
+        else {
+            msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
+                false,
+                true);
 
-        Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
+            Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 
-        if (partHistReserved0 != null)
-            m.partitionHistoryCounters(partHistReserved0);
+            if (partHistReserved0 != null)
+                msg.partitionHistoryCounters(partHistReserved0);
+        }
 
         if (stateChangeExchange() && changeGlobalStateE != null)
-            m.setError(changeGlobalStateE);
+            msg.setError(changeGlobalStateE);
+        else if (localJoinExchange())
+            msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
 
         if (log.isDebugEnabled())
-            log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
+            log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']');
 
         try {
-            cctx.io().send(node, m, SYSTEM_POOL);
+            cctx.io().send(node, msg, SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (log.isDebugEnabled())
@@ -1155,21 +1169,40 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @param nodes Nodes.
+     * @param cachesAff Affinity if was requested by some nodes.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage m = createPartitionsMessage(true);
+    private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinity> cachesAff)
+        throws IgniteCheckedException {
+        GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
+
+        GridDhtPartitionsFullMessage msgWithAff = null;
 
         assert !nodes.contains(cctx.localNode());
 
         if (log.isDebugEnabled()) {
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
-                ", exchId=" + exchId + ", msg=" + m + ']');
+                ", exchId=" + exchId + ", msg=" + msg + ']');
         }
 
+        msg.prepareMarshal(cctx);
+
         for (ClusterNode node : nodes) {
+            GridDhtPartitionsFullMessage sndMsg = msg;
+
+            if (cachesAff != null) {
+                GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
+
+                if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
+                    if (msgWithAff == null)
+                        msgWithAff = msg.copyWithAffinity(cachesAff);
+
+                    sndMsg = msgWithAff;
+                }
+            }
+
             try {
-                cctx.io().send(node, m, SYSTEM_POOL);
+                cctx.io().send(node, sndMsg, SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 if (cctx.io().checkNodeLeft(node.id(), e, false)) {
@@ -1687,21 +1720,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
-                if (msg instanceof GridDhtPartitionsSingleMessage) {
-                    GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;
+            Map<Integer, CacheGroupAffinity> cachesAff = null;
+
+            for (GridDhtPartitionsSingleMessage msg : msgs.values()) {
+                for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
+                    Integer grpId = entry.getKey();
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                    for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) {
-                        Integer grpId = entry.getKey();
-                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+                    GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                        cctx.exchange().clientTopology(grpId, this);
 
-                        GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                            cctx.exchange().clientTopology(grpId, this);
+                    Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId);
 
-                        Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId);
+                    if (cntrs != null)
+                        top.applyUpdateCounters(cntrs);
+                }
+
+                Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+                if (affReq != null) {
+                    if (cachesAff == null)
+                        cachesAff = U.newHashMap(affReq.size());
 
-                        if (cntrs != null)
-                            top.applyUpdateCounters(cntrs);
+                    for (Integer grpId : affReq) {
+                        List<List<ClusterNode>> assign = cctx.affinity().affinity(topologyVersion(), grpId);
+
+                        cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign));
                     }
                 }
             }
@@ -1777,7 +1821,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 if (!nodes.isEmpty())
-                    sendAllPartitions(nodes);
+                    sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null);
 
                 onDone(exchangeId().topologyVersion(), err);
             }
@@ -1813,7 +1857,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         try {
             if (n != null)
-                sendAllPartitions(F.asList(n));
+                sendAllPartitions(F.asList(n), null);
         }
         catch (IgniteCheckedException e) {
             if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
@@ -1907,6 +1951,59 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
         }
 
+        Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
+
+        if (localJoinExchange() && affReq != null) {
+            Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+            Collection<CacheGroupAffinity> cachesAff = msg.cachesAffinity();
+
+            assert !F.isEmpty(cachesAff) : cachesAff;
+            assert cachesAff.size() >= affReq.size();
+
+            int cnt = 0;
+
+            for (CacheGroupAffinity aff : cachesAff) {
+                CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+
+                if (grp != null) {
+                    assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
+
+                    List<GridLongList> assignments = aff.assignments();
+                    List<List<ClusterNode>> assignments0 = new ArrayList<>(assignments.size());
+
+                    for (int p = 0; p < assignments.size(); p++) {
+                        GridLongList assign = assignments.get(p);
+                        List<ClusterNode> assign0 = new ArrayList<>(assign.size());
+
+                        for (int n = 0; n < assign.size(); n++) {
+                            long order = assign.get(n);
+
+                            ClusterNode affNode = nodesByOrder.get(order);
+
+                            if (affNode == null) {
+                                affNode = discoCache.serverNodeByOrder(order);
+
+                                assert affNode != null : order;
+
+                                nodesByOrder.put(order, affNode);
+                            }
+
+                            assign0.add(affNode);
+                        }
+
+                        assignments0.add(assign0);
+                    }
+
+                    grp.affinity().initialize(topologyVersion(), assignments0);
+
+                    cnt++;
+                }
+            }
+
+            assert affReq.size() == cnt : cnt;
+        }
+
         updatePartitionFullMap(msg);
 
         IgniteCheckedException err = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 75609b8..0a723f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -19,19 +19,23 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,6 +103,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     @GridDirectTransient
     private transient boolean compress;
 
+    /** */
+    @GridDirectCollection(CacheGroupAffinity.class)
+    private Collection<CacheGroupAffinity> cachesAff;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -126,6 +134,48 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     /** {@inheritDoc} */
+    @Override void copyStateTo(GridDhtPartitionsAbstractMessage msg) {
+        super.copyStateTo(msg);
+
+        GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg;
+
+        cp.parts = parts;
+        cp.dupPartsData = dupPartsData;
+        cp.partsBytes = partsBytes;
+        cp.partCntrs = partCntrs;
+        cp.partCntrsBytes = partCntrsBytes;
+        cp.partHistSuppliers = partHistSuppliers;
+        cp.partHistSuppliersBytes = partHistSuppliersBytes;
+        cp.partsToReload = partsToReload;
+        cp.partsToReloadBytes = partsToReloadBytes;
+        cp.topVer = topVer;
+        cp.cachesAff = cachesAff;
+    }
+
+    /**
+     * @param cachesAff Affinity.
+     * @return Message copy.
+     */
+    public GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinity> cachesAff) {
+        assert !F.isEmpty(cachesAff) : cachesAff;
+
+        GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
+
+        copyStateTo(cp);
+
+        cp.cachesAff = cachesAff;
+
+        return cp;
+    }
+
+    /**
+     * @return Affinity.
+     */
+    @Nullable public Collection<CacheGroupAffinity> cachesAffinity() {
+        return cachesAff;
+    }
+
+    /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;
     }
@@ -406,42 +456,48 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (writer.state()) {
             case 5:
-                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+                if (!writer.writeCollection("cachesAff", cachesAff, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeByteArray("errsBytes", errsBytes))
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeByteArray("errsBytes", errsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+                if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 11:
+                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -464,7 +520,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (reader.state()) {
             case 5:
-                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+                cachesAff = reader.readCollection("cachesAff", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -472,7 +528,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 6:
-                errsBytes = reader.readByteArray("errsBytes");
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -480,7 +536,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 7:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                errsBytes = reader.readByteArray("errsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -488,7 +544,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 8:
-                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -496,7 +552,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 9:
-                partsBytes = reader.readByteArray("partsBytes");
+                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -504,7 +560,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 10:
-                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+                partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -512,6 +568,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 11:
+                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -531,7 +595,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 13;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index b4d25c4..4c98742 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.HashMap;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.io.Externalizable;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -88,6 +90,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @GridDirectTransient
     private transient boolean compress;
 
+    /** */
+    @GridDirectCollection(Integer.class)
+    private Collection<Integer> grpsAffRequest;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -111,6 +117,20 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         this.compress = compress;
     }
 
+    /**
+     * @param grpsAffRequest
+     */
+    public void cacheGroupsAffinityRequest(Collection<Integer> grpsAffRequest) {
+        this.grpsAffRequest = grpsAffRequest;
+    }
+
+    /**
+     * @return
+     */
+    @Nullable public Collection<Integer> cacheGroupsAffinityRequest() {
+        return grpsAffRequest;
+    }
+
     /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;
@@ -374,18 +394,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -432,7 +458,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 8:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -440,7 +466,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 9:
-                partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -448,6 +474,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 10:
+                partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -467,7 +501,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 5136731..1f56cc0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1540,6 +1540,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                     long partMetaId = pageMem.partitionMetaPageId(grpId, i);
                     long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
+
                     try {
                         long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
 


Mime
View raw message