ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [12/17] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Date Thu, 18 May 2017 08:54:30 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..5d82171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private long futId;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     }
 
     /**
+     * @param futId Future ID.
      * @param cacheId Cache ID.
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment.
      */
-    public GridDhtAffinityAssignmentResponse(int cacheId,
+    public GridDhtAffinityAssignmentResponse(
+        long futId,
+        int cacheId,
         @NotNull AffinityTopologyVersion topVer,
         List<List<ClusterNode>> affAssignment) {
+        this.futId = futId;
         this.cacheId = cacheId;
         this.topVer = topVer;
 
         affAssignmentIds = ids(affAssignment);
     }
 
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean partitionExchangeMessage() {
         return true;
@@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /**
@@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 5:
+                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 5:
+                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 4f94ae2..741ca5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -32,12 +33,11 @@ import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -48,9 +48,6 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF
  * Future that fetches affinity assignment from remote cache nodes.
  */
 public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffinityAssignmentResponse> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
@@ -58,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private static IgniteLogger log;
 
     /** */
+    private static final AtomicLong idGen = new AtomicLong();
+
+    /** */
     private final GridCacheSharedContext ctx;
 
     /** List of available nodes this future can fetch data from. */
@@ -68,26 +68,33 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private ClusterNode pendingNode;
 
     /** */
-    @GridToStringInclude
-    private final T2<Integer, AffinityTopologyVersion> key;
+    private final long id;
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final int cacheId;
 
     /**
      * @param ctx Context.
-     * @param cacheName Cache name.
+     * @param cacheDesc Cache descriptor.
      * @param topVer Topology version.
      * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
-        String cacheName,
+        DynamicCacheDescriptor cacheDesc,
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
+        this.topVer = topVer;
+        this.cacheId = cacheDesc.cacheId();
         this.ctx = ctx;
-        int cacheId = CU.cacheId(cacheName);
-        this.key = new T2<>(cacheId, topVer);
 
-        Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheId);
+        id = idGen.getAndIncrement();
+
+        Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     }
 
     /**
-     * Initializes fetch future.
+     * @return Cache ID.
      */
-    public void init() {
-        ctx.affinity().addDhtAssignmentFetchFuture(this);
+    public int cacheId() {
+        return cacheId;
+    }
 
-        requestFromNextNode();
+    /**
+     * @return Future ID.
+     */
+    public long id() {
+        return id;
     }
 
     /**
-     * @return Future key.
+     * Initializes fetch future.
      */
-    public T2<Integer, AffinityTopologyVersion> key() {
-        return key;
+    public void init() {
+        ctx.affinity().addDhtAssignmentFetchFuture(this);
+
+        requestFromNextNode();
     }
 
     /**
@@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
      * @param res Response.
      */
     public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-        if (!res.topologyVersion().equals(key.get2())) {
-            if (log.isDebugEnabled())
-                log.debug("Received affinity assignment for wrong topology version (will ignore) " +
-                    "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
-
-            return;
-        }
-
         GridDhtAffinityAssignmentResponse res0 = null;
 
         synchronized (this) {
@@ -188,7 +194,8 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                         log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() +
                             ", node=" + node + ']');
 
-                    ctx.io().send(node, new GridDhtAffinityAssignmentRequest(key.get1(), key.get2()),
+                    ctx.io().send(node,
+                        new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 4e699b3..8e79eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         int num = cctx.affinity().partitions();
 
         if (cctx.rebalanceEnabled()) {
-            boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
+            boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom());
 
             boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
 
@@ -541,7 +541,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 cntrMap.clear();
 
                 // If this is the oldest node.
-                if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+                if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) {
                     if (node2part == null) {
                         node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -1156,10 +1156,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // If for some nodes current partition has a newer map,
                     // then we keep the newer value.
                     if (newPart != null &&
-                        (newPart.updateSequence() < part.updateSequence() || (
-                            cctx.startTopologyVersion() != null &&
-                                newPart.topologyVersion() != null && // Backward compatibility.
-                                cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+                        (newPart.updateSequence() < part.updateSequence() ||
+                        (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1169,7 +1167,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     }
                 }
 
-                //remove entry if node left
+                // Remove entry if node left.
                 for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
                     UUID nodeId = it.next();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 6fb7df6..579796d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -122,6 +122,8 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         boolean keepBinary,
         boolean skipStore
     ) {
+        assert topVer.topologyVersion() > 0 : topVer;
+
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futId = futId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 654d306..b4cb3c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -59,8 +59,8 @@ import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -110,9 +110,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
 
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Dummy flag. */
     private final boolean dummy;
 
@@ -190,8 +187,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Logger. */
     private final IgniteLogger log;
 
-    /** Dynamic cache change requests. */
-    private Collection<DynamicCacheChangeRequest> reqs;
+    /** Cache change requests. */
+    private ExchangeActions exchActions;
 
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
@@ -284,19 +281,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param cctx Cache context.
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change requests.
      * @param affChangeMsg Affinity change message.
      */
     public GridDhtPartitionsExchangeFuture(
         GridCacheSharedContext cctx,
         ReadWriteLock busyLock,
         GridDhtPartitionExchangeId exchId,
-        Collection<DynamicCacheChangeRequest> reqs,
+        ExchangeActions exchActions,
         CacheAffinityChangeMessage affChangeMsg
     ) {
         assert busyLock != null;
         assert exchId != null;
         assert exchId.topologyVersion() != null;
+        assert exchActions == null || !exchActions.empty();
 
         dummy = false;
         forcePreload = false;
@@ -305,7 +303,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         this.cctx = cctx;
         this.busyLock = busyLock;
         this.exchId = exchId;
-        this.reqs = reqs;
+        this.exchActions = exchActions;
         this.affChangeMsg = affChangeMsg;
 
         log = cctx.logger(getClass());
@@ -317,10 +315,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param reqs Cache change requests.
+     * @param exchActions Exchange actions.
      */
-    public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
-        this.reqs = reqs;
+    public void exchangeActions(ExchangeActions exchActions) {
+        assert exchActions == null || !exchActions.empty() : exchActions;
+        assert evtLatch != null && evtLatch.getCount() == 1L : this;
+
+        this.exchActions = exchActions;
     }
 
     /**
@@ -379,33 +380,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param cacheId Cache ID to check.
-     * @param topVer Topology version.
+     * @param rcvdFrom Topology version.
      * @return {@code True} if cache was added during this exchange.
      */
-    public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
-        if (cacheStarted(cacheId))
-            return true;
-
-        GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-        return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
+    public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
+        return dynamicCacheStarted(cacheId) || (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
     }
 
     /**
      * @param cacheId Cache ID.
      * @return {@code True} if non-client cache was added during this exchange.
      */
-    public boolean cacheStarted(int cacheId) {
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.start() && !req.clientStartOnly()) {
-                    if (CU.cacheId(req.cacheName()) == cacheId)
-                        return true;
-                }
-            }
-        }
-
-        return false;
+    public boolean dynamicCacheStarted(int cacheId) {
+        return exchActions != null && exchActions.cacheStarted(cacheId);
     }
 
     /**
@@ -435,14 +422,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      *
      */
     public ClusterState newClusterState() {
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.globalStateChange())
-                    return req.state();
-            }
-        }
-
-        return null;
+        return exchActions != null ? exchActions.newClusterState() : null;
     }
 
     /**
@@ -524,7 +504,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
 
                 if (msg instanceof DynamicCacheChangeBatch){
-                    assert !F.isEmpty(reqs);
+                    assert exchActions != null && !exchActions.empty();
 
                     exchange = onCacheChangeRequest(crdNode);
                 }
@@ -540,10 +520,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
             else {
                 if (discoEvt.type() == EVT_NODE_JOINED) {
-                    Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVer);
+                    if (!discoEvt.eventNode().isLocal()) {
+                        Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
+                            discoEvt.eventNode().id(),
+                            topVer);
 
-                    if (!discoEvt.eventNode().isLocal())
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
+                    }
+                    else
+                        cctx.cache().startCachesOnLocalJoin(topVer);
                 }
 
                 exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -553,20 +538,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             updateTopologies(crdNode);
 
-            if (!F.isEmpty(reqs)) {
-                boolean hasStop = false;
-
-                for (DynamicCacheChangeRequest req : reqs) {
-                    if (req.stop()) {
-                        hasStop = true;
-
-                        break;
-                    }
-                }
-
-                if (hasStop)
-                    cctx.cache().context().database().beforeCachesStop();
-            }
+            if (exchActions != null && exchActions.hasStop())
+                cctx.cache().context().database().beforeCachesStop();
 
             switch (exchange) {
                 case ALL: {
@@ -654,8 +627,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridDhtPartitionTopology top = cacheCtx.topology();
 
             if (crd) {
-                boolean updateTop = !cacheCtx.isLocal() &&
-                    exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
                 if (updateTop && clientTop != null)
                     top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
@@ -674,32 +646,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
-        assert !F.isEmpty(reqs) : this;
+        assert exchActions != null && !exchActions.empty() : this;
 
         GridClusterStateProcessor stateProc = cctx.kernalContext().state();
 
-        if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) {
+        if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(exchActions, topologyVersion())) {
             changeGlobalStateE = stateProc.onChangeGlobalState();
 
             if (crd && changeGlobalStateE != null)
                 changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
         }
 
-        boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
-
-        if (clientOnly) {
-            boolean clientCacheStarted = false;
-
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.start() && req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId())) {
-                    clientCacheStarted = true;
+        boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
 
-                    break;
-                }
-            }
-
-            return clientCacheStarted ? ExchangeType.CLIENT : ExchangeType.NONE;
-        }
+        if (clientOnly)
+            return exchActions.clientCacheStarted(cctx.localNodeId()) ? ExchangeType.CLIENT : ExchangeType.NONE;
         else
             return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
     }
@@ -768,7 +729,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private void clientOnlyExchange() throws IgniteCheckedException {
         clientOnlyExchange = true;
 
-        //todo checl invoke on client
         if (crd != null) {
             if (crd.isLocal()) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1046,19 +1006,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @return {@code True} if cache is stopping by this exchange.
      */
     public boolean stopping(int cacheId) {
-        boolean stopping = false;
-
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (cacheId == CU.cacheId(req.cacheName())) {
-                    stopping = req.stop();
-
-                    break;
-                }
-            }
-        }
-
-        return stopping;
+        return exchActions != null && exchActions.cacheStopped(cacheId);
     }
 
     /**
@@ -1069,13 +1017,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert node != null;
 
         // Reset lost partition before send local partition to coordinator.
-        if (!F.isEmpty(reqs)) {
-            Set<String> caches = new HashSet<>();
-
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.resetLostPartitions())
-                    caches.add(req.cacheName());
-            }
+        if (exchActions != null) {
+            Set<String> caches = exchActions.cachesToResetLostPartitions();
 
             if (!F.isEmpty(caches))
                 resetLostPartitions(caches);
@@ -1203,14 +1146,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             cacheValidRes = m;
         }
 
-        cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
+        cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err);
 
         cctx.exchange().onExchangeDone(this, err);
 
-        if (!F.isEmpty(reqs) && err == null) {
-            for (DynamicCacheChangeRequest req : reqs)
-                cctx.cache().completeStartFuture(req);
-        }
+        if (exchActions != null && err == null)
+            exchActions.completeRequestFutures(cctx);
 
         if (exchangeOnChangeGlobalState && err == null)
             cctx.kernalContext().state().onExchangeDone();
@@ -1227,7 +1168,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
             }
 
-            reqs = null;
+            exchActions = null;
 
             if (discoEvt instanceof DiscoveryCustomEvent)
                 ((DiscoveryCustomEvent)discoEvt).customMessage(null);
@@ -1615,20 +1556,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 assert discoEvt instanceof DiscoveryCustomEvent;
 
                 if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
-                    DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)((DiscoveryCustomEvent)discoEvt)
-                        .customMessage();
+                    if (exchActions != null) {
+                        if (exchActions.newClusterState() == ClusterState.ACTIVE)
+                            assignPartitionsStates();
 
-                    Set<String> caches = new HashSet<>();
+                        Set<String> caches = exchActions.cachesToResetLostPartitions();
 
-                    for (DynamicCacheChangeRequest req : batch.requests()) {
-                        if (req.resetLostPartitions())
-                            caches.add(req.cacheName());
-                        else if (req.globalStateChange() && req.state() != ClusterState.INACTIVE)
-                            assignPartitionsStates();
+                        if (!F.isEmpty(caches))
+                            resetLostPartitions(caches);
                     }
-
-                    if (!F.isEmpty(caches))
-                        resetLostPartitions(caches);
                 }
             }
             else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED)

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1b96e..57616ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
                 AffinityAssignment assignment = cctx.affinity().assignment(topVer);
 
-                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+                    req.futureId(),
+                    cctx.cacheId(),
                     topVer,
                     assignment.assignment());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 5e3dc3f..8b538ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -146,35 +146,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
         GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter);
 
         try {
-            for (KeyCacheObject key : keys) {
-                while (true) {
-                    GridLocalCacheEntry entry = null;
-
-                    try {
-                        entry = entryExx(key);
-
-                        entry.unswap(false);
-
-                        if (!ctx.isAll(entry, filter)) {
-                            fut.onFailed();
-
-                            return fut;
-                        }
-
-                        // Removed exception may be thrown here.
-                        GridCacheMvccCandidate cand = fut.addEntry(entry);
-
-                        if (cand == null && fut.isDone())
-                            return fut;
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        if (log().isDebugEnabled())
-                            log().debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
-                    }
-                }
-            }
+            if (!fut.addEntries(keys))
+                return fut;
 
             if (!ctx.mvcc().addFuture(fut))
                 fut.onError(new IgniteCheckedException("Duplicate future ID (internal error): " + fut));

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 59d0adb..9641533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -144,12 +144,51 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridLocalLockFuture.class);
+    }
+
+    /**
+     * @param keys Keys.
+     * @return {@code False} in case of error.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean addEntries(Collection<KeyCacheObject> keys) throws IgniteCheckedException {
+        for (KeyCacheObject key : keys) {
+            while (true) {
+                GridLocalCacheEntry entry = null;
+
+                try {
+                    entry = cache.entryExx(key);
+
+                    entry.unswap(false);
+
+                    if (!cctx.isAll(entry, filter)) {
+                        onFailed();
+
+                        return false;
+                    }
+
+                    // Removed exception may be thrown here.
+                    GridCacheMvccCandidate cand = addEntry(entry);
+
+                    if (cand == null && isDone())
+                        return false;
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+                }
+            }
+        }
 
         if (timeout > 0) {
             timeoutObj = new LockTimeoutObject();
 
             cctx.time().addTimeoutObject(timeoutObj);
         }
+
+        return true;
     }
 
     /** {@inheritDoc} */
@@ -216,7 +255,7 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
      * @return Lock candidate.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
+    private @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
         throws GridCacheEntryRemovedException {
         // Add local lock first, as it may throw GridCacheEntryRemovedException.
         GridCacheMvccCandidate c = entry.addLocal(

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0f7b0df..fcf534c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -272,8 +272,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         qryTopVer = cctx.startTopologyVersion();
 
-        if (qryTopVer == null)
-            qryTopVer = new AffinityTopologyVersion(cctx.localNode().order(), 0);
+        assert qryTopVer != null : cctx.name();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index efb02c6..e7706dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -561,16 +561,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
-     * Wait topology.
+     * @param ctx Context.
+     * @throws IgniteCheckedException In case of error.
      */
-    public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
+    void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
         GridCacheContext<K, V> cctx = cacheContext(ctx);
 
         if (!cctx.isLocal()) {
-            cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+            AffinityTopologyVersion topVer = initTopVer;
+
+            cacheContext(ctx).affinity().affinityReadyFuture(topVer).get();
 
             for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
-                getOrCreatePartitionRecovery(ctx, partId);
+                getOrCreatePartitionRecovery(ctx, partId, topVer);
         }
     }
 
@@ -736,7 +739,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
         }
 
-        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
 
         return rec.collectEntries(e, cctx, cache);
     }
@@ -869,37 +872,40 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      * @param ctx Context.
      * @param partId Partition id.
+     * @param topVer Topology version for current operation.
      * @return Partition recovery.
      */
-    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+        int partId,
+        AffinityTopologyVersion topVer) {
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
         PartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
             T2<Long, Long> partCntrs = null;
 
-            AffinityTopologyVersion initTopVer0 = initTopVer;
+            Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
 
-            if (initTopVer0 != null) {
+            if (initUpdCntrsPerNode != null) {
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 GridCacheAffinityManager aff = cctx.affinity();
 
-                if (initUpdCntrsPerNode != null) {
-                    for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
-                        Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
+                for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
+                    Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
 
-                        if (map != null) {
-                            partCntrs = map.get(partId);
+                    if (map != null) {
+                        partCntrs = map.get(partId);
 
-                            break;
-                        }
+                        break;
                     }
                 }
-                else if (initUpdCntrs != null)
-                    partCntrs = initUpdCntrs.get(partId);
             }
+            else if (initUpdCntrs != null)
+                partCntrs = initUpdCntrs.get(partId);
 
-            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
+            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
                 partCntrs != null ? partCntrs.get2() : null);
 
             PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 8377754..acf351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -340,8 +341,14 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 updateCntr,
                 topVer);
 
-            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
-                cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+            IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name());
+
+            assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() +
+                ", locStart=" + cctx.startTopologyVersion() +
+                ", locNode=" + cctx.localNode() +
+                ", stopping=" + cctx.kernalContext().isStopping();
+
+            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(jcache, cctx, e0);
 
             lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 1286ba9..b25b229 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
@@ -279,9 +280,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
                 List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 
                 DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
-                    requestId, null, ctx.localNodeId());
-
-                changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE);
+                    requestId, activate ? ACTIVE : INACTIVE, ctx.localNodeId());
 
                 reqs.add(changeGlobalStateReq);
 
@@ -329,26 +328,25 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param reqs Requests.
+     * @param exchActions Requests.
+     * @param topVer Exchange topology version.
      */
     public boolean changeGlobalState(
-        Collection<DynamicCacheChangeRequest> reqs,
+        ExchangeActions exchActions,
         AffinityTopologyVersion topVer
     ) {
-        assert !F.isEmpty(reqs);
+        assert exchActions != null;
         assert topVer != null;
 
-        for (DynamicCacheChangeRequest req : reqs)
-            if (req.globalStateChange()) {
-                ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
-                assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
+        if (exchActions.newClusterState() != null) {
+            ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 
-                cgsCtx.topologyVersion(topVer);
+            assert cgsCtx != null : exchActions;
 
-                return true;
-            }
+            cgsCtx.topologyVersion(topVer);
 
+            return true;
+        }
 
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 81f5c28..59c2656 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -427,7 +427,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
      */
     protected final void unregisterMBean() throws IgniteSpiException {
         // Unregister SPI MBean.
-        if (spiMBean != null) {
+        if (spiMBean != null && ignite != null) {
             MBeanServer jmx = ignite.configuration().getMBeanServer();
 
             assert jmx != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 96df255..803beed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -188,14 +188,15 @@ public class DiscoveryDataBag {
     }
 
     /**
-     *
+     * @return ID of joining node.
      */
     public UUID joiningNodeId() {
         return joiningNodeId;
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
+     * @return Discovery data for given component.
      */
     public GridDiscoveryData gridDiscoveryData(int cmpId) {
         if (gridData == null)
@@ -207,7 +208,8 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
+     * @return Joining node discovery data.
      */
     public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) {
         if (newJoinerData == null)
@@ -219,7 +221,7 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
      * @param data Data.
      */
     public void addJoiningNodeData(Integer cmpId, Serializable data) {
@@ -227,7 +229,7 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
      * @param data Data.
      */
     public void addGridCommonData(Integer cmpId, Serializable data) {
@@ -235,7 +237,7 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
      * @param data Data.
      */
     public void addNodeSpecificData(Integer cmpId, Serializable data) {
@@ -246,7 +248,8 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
+     * @return {@code True} if common data collected for given component.
      */
     public boolean commonDataCollectedFor(Integer cmpId) {
         assert cmnDataInitializedCmps != null;
@@ -295,5 +298,4 @@ public class DiscoveryDataBag {
     @Nullable public Map<Integer, Serializable> localNodeSpecificData() {
         return nodeSpecificData.get(DEFAULT_KEY);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index cc581e1..2a55412 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1981,7 +1981,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
      *
      */
     void printStopInfo() {
-        if (log.isDebugEnabled())
+        IgniteLogger log = this.log;
+
+        if (log != null && log.isDebugEnabled())
             log.debug(stopInfo());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/config/examples.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/examples.properties b/modules/core/src/test/config/examples.properties
index c584f72..ea0d8ed 100644
--- a/modules/core/src/test/config/examples.properties
+++ b/modules/core/src/test/config/examples.properties
@@ -22,3 +22,4 @@ ScalarCacheExample=examples/config/example-ignite.xml
 ScalarCacheQueryExample=examples/config/example-ignite.xml
 ScalarCountGraphTrianglesExample=examples/config/example-ignite.xml
 ScalarPopularNumbersRealTimeExample=examples/config/example-ignite.xml
+MemoryPolicyExample=examples/config/example-memory-policies.xml
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
index 2110c28..99e80ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -78,13 +78,13 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
 
                 Map<String, Integer> backupAssignedAttribute = getAttributeStatistic(assigned);
 
-                String nodeAttributeValue = node.attribute(SPLIT_ATTRIBUTE_NAME);
+                String nodeAttributeVal = node.attribute(SPLIT_ATTRIBUTE_NAME);
 
-                if (FIRST_NODE_GROUP.equals(nodeAttributeValue)
+                if (FIRST_NODE_GROUP.equals(nodeAttributeVal)
                     && backupAssignedAttribute.get(FIRST_NODE_GROUP) < 2)
                     return true;
 
-                return backupAssignedAttribute.get(nodeAttributeValue).equals(0);
+                return backupAssignedAttribute.get(nodeAttributeVal).equals(0);
             }
         };
 
@@ -107,10 +107,11 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
 
             String val = assignedNode.attribute(SPLIT_ATTRIBUTE_NAME);
 
-            Integer count = backupAssignedAttribute.get(val);
+            Integer cnt = backupAssignedAttribute.get(val);
 
-            backupAssignedAttribute.put(val, count + 1);
+            backupAssignedAttribute.put(val, cnt + 1);
         }
+
         return backupAssignedAttribute;
     }
 
@@ -157,6 +158,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
      */
     public void testPartitionDistribution() throws Exception {
         backups = 1;
+
         try {
             for (int i = 0; i < 3; i++) {
                 splitAttrVal = "A";
@@ -205,6 +207,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
      */
     public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
         backups = 3;
+
         try {
             for (int i = 0; i < 2; i++) {
                 splitAttrVal = FIRST_NODE_GROUP;

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
index 13fae24..b6114d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
@@ -35,12 +35,6 @@ import org.apache.log4j.WriterAppender;
 @SuppressWarnings({"ProhibitedExceptionDeclared"})
 @GridCommonTest(group = "Kernal")
 public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
-    /** */
-
-    public GridNodeMetricsLogSelfTest() {
-        super(false);
-    }
-
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -51,6 +45,7 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
     }
@@ -80,11 +75,11 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
 
         cache2.put(2, "two");
 
-        Thread.sleep(10000);
+        Thread.sleep(10_000);
 
         //Check that nodes are alie
-        assert cache1.get(1).equals("one");
-        assert cache2.get(2).equals("two");
+        assertEquals("one", cache1.get(1));
+        assertEquals("two", cache2.get(2));
 
         String fullLog = strWr.toString();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
index 7dce36b..ae9986d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
+import javax.cache.configuration.Factory;
 import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
 import javax.cache.integration.CacheLoaderException;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -169,18 +171,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
 
         cacheCfg.setRebalanceMode(SYNC);
 
-        if (igniteInstanceName.endsWith("1"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_1));
-        else if (igniteInstanceName.endsWith("2"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_2));
-        else if (igniteInstanceName.endsWith("3"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_3));
-        else if (igniteInstanceName.endsWith("4"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_4));
-        else if (igniteInstanceName.endsWith("5"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_5));
-        else
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_6));
+        cacheCfg.setCacheStoreFactory(new StoreFactory());
 
         cacheCfg.setWriteThrough(true);
         cacheCfg.setReadThrough(true);
@@ -840,4 +831,30 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
             map.clear();
         }
     }
+
+    /**
+     *
+     */
+    static class StoreFactory implements Factory<CacheStore> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite node;
+
+        @Override public CacheStore create() {
+            String igniteInstanceName = node.configuration().getIgniteInstanceName();
+
+            if (igniteInstanceName.endsWith("1"))
+                return LOCAL_STORE_1;
+            else if (igniteInstanceName.endsWith("2"))
+                return LOCAL_STORE_2;
+            else if (igniteInstanceName.endsWith("3"))
+                return LOCAL_STORE_3;
+            else if (igniteInstanceName.endsWith("4"))
+                return LOCAL_STORE_4;
+            else if (igniteInstanceName.endsWith("5"))
+                return LOCAL_STORE_5;
+            else
+                return LOCAL_STORE_6;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
index a4a831f..546ec06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
@@ -44,13 +44,13 @@ public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnma
 
     /** {@inheritDoc} */
     @Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException {
-        //GridCacheEvictionRequest unmarshalling failed test
-        readCnt.set(5); //2 for each put
+        //GridCacheEvictionRequest unmarshalling failed test.
+        readCnt.set(5); //2 for each put.
 
         jcache(0).put(new TestKey(String.valueOf(++key)), "");
         jcache(0).put(new TestKey(String.valueOf(++key)), "");
 
-        //Eviction request unmarshalling failed but ioManager does not hangs up.
+        // Eviction request unmarshalling failed but ioManager does not hangs up.
 
         // Wait for eviction complete.
         Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
new file mode 100644
index 0000000..eb8077f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheStartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE_NAME = "c1";
+
+    /** */
+    private boolean client;
+
+    /** */
+    private CacheConfiguration ccfg;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        if (ccfg != null)
+            cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testStartAndNodeJoin() throws Exception {
+        Ignite node0 = startGrid(0);
+
+        checkCache(0, CACHE_NAME, false);
+
+        node0.createCache(cacheConfiguration(CACHE_NAME));
+
+        checkCache(0, CACHE_NAME, true);
+
+        startGrid(1);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+
+        client = true;
+
+        startGrid(2);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+        checkCache(2, CACHE_NAME, false);
+
+        ignite(2).destroyCache(CACHE_NAME);
+
+        checkCache(0, CACHE_NAME, false);
+        checkCache(1, CACHE_NAME, false);
+        checkCache(2, CACHE_NAME, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartFromJoiningNode1() throws Exception {
+        checkStartFromJoiningNode(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartFromJoiningNode2() throws Exception {
+        checkStartFromJoiningNode(true);
+    }
+
+    /**
+     * @param joinClient {@code True} if client node joins.
+     * @throws Exception If failed.
+     */
+    private void checkStartFromJoiningNode(boolean joinClient) throws Exception {
+        startGrid(0);
+        startGrid(1);
+
+        client = true;
+
+        startGrid(2);
+
+        ccfg = cacheConfiguration(CACHE_NAME);
+        client = joinClient;
+
+        startGrid(3);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+        checkCache(2, CACHE_NAME, false);
+        checkCache(3, CACHE_NAME, true);
+
+        client = false;
+        ccfg = null;
+
+        startGrid(4);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+        checkCache(2, CACHE_NAME, false);
+        checkCache(3, CACHE_NAME, true);
+        checkCache(4, CACHE_NAME, true);
+
+        client = true;
+
+        startGrid(5);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+        checkCache(2, CACHE_NAME, false);
+        checkCache(3, CACHE_NAME, true);
+        checkCache(4, CACHE_NAME, true);
+        checkCache(5, CACHE_NAME, false);
+
+        ignite(5).destroyCache(CACHE_NAME);
+
+        for (int i = 0; i < 5; i++)
+            checkCache(i, CACHE_NAME, false);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        return new CacheConfiguration(cacheName);
+    }
+
+    /**
+     * @param idx Node index.
+     * @param cacheName Cache name.
+     * @param expCache {@code True} if cache should be created.
+     */
+    private void checkCache(int idx, final String cacheName, final boolean expCache) throws IgniteInterruptedCheckedException {
+        final IgniteKernal node = (IgniteKernal)ignite(idx);
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return expCache == (node.context().cache().cache(cacheName) != null);
+            }
+        }, 1000));
+
+        assertNotNull(node.context().cache().cache(CU.UTILITY_CACHE_NAME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 4a34a1d..e7c5ca5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -1027,7 +1027,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
      * @param nearOnly Near only flag.
      * @throws Exception If failed.
      */
-    public void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
+    private void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
         try {
             final AtomicInteger cnt = new AtomicInteger(nodeCount());
             final AtomicReference<Throwable> err = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
index fd77309..057b0d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
@@ -130,7 +130,8 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
 
     /**
      * Tests topology split scenario.
-     * @throws Exception
+     *
+     * @throws Exception If failed.
      */
     public void testTopologyValidator() throws Exception {
         assertTrue(initLatch.await(10, TimeUnit.SECONDS));
@@ -242,12 +243,15 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** */
         @CacheNameResource
         private String cacheName;
 
+        /** */
         @IgniteInstanceResource
         private Ignite ignite;
 
+        /** */
         @LoggerResource
         private IgniteLogger log;
 
@@ -263,7 +267,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
             }).isEmpty())
                 return false;
 
-            IgniteKernal kernal = (IgniteKernal)ignite.cache(cacheName).unwrap(Ignite.class);
+            IgniteKernal kernal = (IgniteKernal)ignite;
 
             GridDhtCacheAdapter<Object, Object> dht = kernal.context().cache().internalCache(cacheName).context().dht();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
new file mode 100644
index 0000000..a80830a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.GridAtomicInteger;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CacheDiscoveryDataConcurrentJoinTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Iteration. */
+    private static final int ITERATIONS = 3;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private ThreadLocal<Integer> staticCaches = new ThreadLocal<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
+            /** */
+            private boolean delay = true;
+
+            @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+                if (getTestIgniteInstanceName(0).equals(ignite.name())) {
+                    if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                        TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
+
+                        if (delay) {
+                            log.info("Delay join processing: " + msg0);
+
+                            delay = false;
+
+                            doSleep(5000);
+                        }
+                    }
+                }
+
+                super.startMessageProcess(msg);
+            }
+        };
+
+        testSpi.setIpFinder(ipFinder);
+        testSpi.setJoinTimeout(60_000);
+
+        cfg.setDiscoverySpi(testSpi);
+
+        cfg.setClientMode(client);
+
+        Integer caches = staticCaches.get();
+
+        if (caches != null) {
+            cfg.setCacheConfiguration(cacheConfigurations(caches).toArray(new CacheConfiguration[caches]));
+
+            staticCaches.remove();
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentJoin() throws Exception {
+        for (int iter = 0; iter < ITERATIONS; iter++) {
+            log.info("Iteration: " + iter);
+
+            final int NODES = 6;
+            final int MAX_CACHES = 10;
+
+            final GridAtomicInteger caches = new GridAtomicInteger();
+
+            startGrid(0);
+
+            final AtomicInteger idx = new AtomicInteger(1);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int c = ThreadLocalRandom.current().nextInt(MAX_CACHES) + 1;
+
+                    staticCaches.set(c);
+
+                    startGrid(idx.getAndIncrement());
+
+                    caches.setIfGreater(c);
+
+                    return null;
+                }
+            }, NODES - 1, "start-node");
+
+            assertTrue(caches.get() > 0);
+
+            for (int i = 0; i < NODES; i++) {
+                Ignite node = ignite(i);
+
+                for (int c = 0; c < caches.get(); c++) {
+                    Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
+
+                    assertEquals(NODES, nodes.size());
+
+                    checkCache(node, "cache-" + c);
+                }
+            }
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param caches Number of caches.
+     * @return Cache configurations.
+     */
+    private Collection<CacheConfiguration> cacheConfigurations(int caches) {
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        for (int i = 0; i < caches; i++)
+            ccfgs.add(cacheConfiguration("cache-" + i));
+
+        return ccfgs;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration ccfg = new CacheConfiguration(cacheName);
+
+        ccfg.setName(cacheName);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+        return ccfg;
+    }
+    /**
+     * @param node Node.
+     * @param cacheName Cache name.
+     */
+    private void checkCache(Ignite node, final String cacheName) {
+        assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index fed388a..bc435e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2165,7 +2165,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
         Map<String, List<List<ClusterNode>>> aff = new HashMap<>();
 
         for (Ignite node : nodes) {
-            log.info("Check node: " + node.name());
+            log.info("Check affinity [node=" + node.name() + ", topVer=" + topVer + ", expIdeal=" + expIdeal + ']');
 
             IgniteKernal node0 = (IgniteKernal)node;
 
@@ -2175,7 +2175,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
                 fut.get();
 
             for (GridCacheContext cctx : node0.context().cache().context().cacheContexts()) {
-                if (cctx.startTopologyVersion() != null && cctx.startTopologyVersion().compareTo(topVer) > 0)
+                if (cctx.startTopologyVersion().compareTo(topVer) > 0)
                     continue;
 
                 List<List<ClusterNode>> aff1 = aff.get(cctx.name());

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
index 321faf8..88df607 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -210,6 +211,8 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
                 Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
 
                 assertEquals(NODES, nodes.size());
+
+                checkCache(node, "cache-" + c);
             }
 
             for (int c = 0; c < 5; c++) {
@@ -247,4 +250,11 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
 
         return ccfg;
     }
+    /**
+     * @param node Node.
+     * @param cacheName Cache name.
+     */
+    private void checkCache(Ignite node, final String cacheName) {
+        assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index 4864a67..bf5ba61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -26,6 +26,7 @@ import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreSession;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -357,9 +359,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
      *
      */
     private static class FirstStoreFactory implements Factory<CacheStore> {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
         /** {@inheritDoc} */
         @Override public CacheStore create() {
-            String igniteInstanceName = startingIgniteInstanceName.get();
+            String igniteInstanceName = ignite.name();
 
             CacheStore store = firstStores.get(igniteInstanceName);
 
@@ -374,9 +379,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
      *
      */
     private static class SecondStoreFactory implements Factory<CacheStore> {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
         /** {@inheritDoc} */
         @Override public CacheStore create() {
-            String igniteInstanceName = startingIgniteInstanceName.get();
+            String igniteInstanceName = ignite.name();
 
             CacheStore store = secondStores.get(igniteInstanceName);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
index 372da32..a7128e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
@@ -22,7 +22,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.cache.configuration.Factory;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
@@ -102,11 +104,7 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
         cc.setAtomicityMode(TRANSACTIONAL);
         cc.setBackups(1);
 
-        GridCacheTestStore store = new GridCacheTestStore();
-
-        stores.add(store);
-
-        cc.setCacheStoreFactory(singletonFactory(store));
+        cc.setCacheStoreFactory(new StoreFactory());
         cc.setReadThrough(true);
         cc.setWriteThrough(true);
         cc.setLoadPreviousValue(true);
@@ -269,4 +267,18 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
         assertEquals(expPutAll, putAll);
         assertEquals(expTxs, txs);
     }
+
+    /**
+     *
+     */
+    static class StoreFactory implements Factory<CacheStore> {
+        /** {@inheritDoc} */
+        @Override public CacheStore create() {
+            GridCacheTestStore store = new GridCacheTestStore();
+
+            stores.add(store);
+
+            return store;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 0f4aa87..2096179 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.loadtests.hashmap;
 
 import java.util.IdentityHashMap;
+import java.util.UUID;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager;
 import org.apache.ignite.internal.processors.cache.CacheType;
@@ -78,6 +80,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
             ),
             defaultCacheConfiguration(),
             CacheType.USER,
+            AffinityTopologyVersion.ZERO,
+            UUID.randomUUID(),
             true,
             true,
             null,


Mime
View raw message