ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [6/6] ignite git commit: ignite-5272
Date Wed, 14 Jun 2017 11:46:26 GMT
ignite-5272


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/217f8dbd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/217f8dbd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/217f8dbd

Branch: refs/heads/ignite-5272
Commit: 217f8dbd7e609e7fe7be28a85379daac412e69dc
Parents: 896cb3c
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Jun 14 12:50:42 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Jun 14 14:46:12 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  12 +
 .../cache/CacheAffinitySharedManager.java       |  88 ++-
 .../ClientCacheChangeDiscoveryMessage.java      |  87 ++-
 .../cache/ClientCacheUpdateTimeout.java         |  44 ++
 .../GridCachePartitionExchangeManager.java      |   9 +
 .../processors/cache/GridCacheProcessor.java    |   7 +-
 .../dht/ClientCacheDhtTopologyFuture.java       |  50 +-
 .../dht/GridDhtTopologyFutureAdapter.java       | 195 +++++++
 .../GridDhtPartitionsExchangeFuture.java        | 157 +----
 .../IgniteClientCacheStartFailoverTest.java     | 585 +++++++++++++++++++
 .../testframework/junits/GridAbstractTest.java  |   2 +
 .../testsuites/IgniteCacheTestSuite2.java       |   2 +
 12 files changed, 1057 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 539f288..2121f38 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import javax.net.ssl.HostnameVerifier;
+import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
@@ -623,6 +624,17 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_SECURITY_COMPATIBILITY_MODE = "IGNITE_SECURITY_COMPATIBILITY_MODE";
 
     /**
+     * When client cache is started or closed special discovery message is send to notify cluster (for example this is
+     * needed for {@link ClusterGroup#forCacheNodes(String)} API. This timeout specifies how long to wait
+     * after client cache start/close before sending this message. If during this timeout another client
+     * cache changed, these events are combined into single message.
+     * <p>
+     * Default is 10 seconds.
+     */
+    public static final String IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT =
+        "IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 37a53b9..bf583f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -38,6 +39,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -74,6 +76,10 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
  */
 @SuppressWarnings("ForLoopReplaceableByForEach")
 public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
+    /** */
+    private final long clientCacheMsgTimeout =
+        IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, 10_000);
+
     /** Late affinity assignment flag. */
     private boolean lateAffAssign;
 
@@ -99,6 +105,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
         new ConcurrentHashMap8<>();
 
+    /** */
+    private final ThreadLocal<ClientCacheChangeDiscoveryMessage> clientCacheChanges = new ThreadLocal<>();
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -358,6 +367,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param crd Coordinator flag.
      * @param topVer Current topology version.
      * @param discoCache Discovery data cache.
+     * @return Map of started caches (cache ID to near enabled flag).
      */
     @Nullable private Map<Integer, Boolean> processClientCacheStartRequests(
         ClientCacheChangeDummyDiscoveryMessage msg,
@@ -459,24 +469,27 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     fetchFut);
 
                 GridDhtPartitionFullMap partMap;
+                ClientCacheDhtTopologyFuture topFut;
 
                 if (res != null) {
                     partMap = res.partitionMap();
 
                     assert partMap != null : res;
 
-                    ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer);
-
-                    grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
-
-                    grp.topology().update(topVer, partMap, null);
+                    topFut = new ClientCacheDhtTopologyFuture(topVer);
                 }
                 else {
-//                    cctx.cache().completeClientCacheChangeFuture(msg.requestId(), new IgniteCheckedException("test"));
-//
-//                    return;
-                    // TODO 5272: mark as 'no server nodes'
+                    partMap = new GridDhtPartitionFullMap(cctx.localNodeId(), cctx.localNode().order(), 1);
+
+                    topFut = new ClientCacheDhtTopologyFuture(topVer,
+                        new ClusterTopologyServerNotFoundException("All server nodes left grid."));
                 }
+
+                grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+
+                grp.topology().update(topVer, partMap, null);
+
+                topFut.validate(grp, discoCache.allNodes());
             }
             catch (IgniteCheckedException e) {
                 cctx.cache().closeCaches(startedCaches);
@@ -555,19 +568,70 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         Set<Integer> closedCaches = processCacheCloseRequests(msg, crd, topVer);
 
-        if (startedCaches != null || closedCaches != null) {
-            ClientCacheChangeDiscoveryMessage msg0 = new ClientCacheChangeDiscoveryMessage(startedCaches, closedCaches);
+        if (startedCaches != null || closedCaches != null)
+            scheduleClientChangeMessage(startedCaches, closedCaches);
+    }
+
+    /**
+     * @param timeoutObj Timeout object.
+     */
+    void sendClientCacheChangesMessage(ClientCacheUpdateTimeout timeoutObj) {
+        ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get();
+
+        if (msg != null && msg.updateTimeoutObject() == timeoutObj) {
+            assert !msg.empty() : msg;
 
             try {
-                cctx.discovery().sendCustomEvent(msg0);
+                cctx.discovery().sendCustomEvent(msg);
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to send discovery event: " + e, e);
             }
+
+            clientCacheChanges.remove();
         }
     }
 
     /**
+     * @param startedCaches Started caches.
+     * @param closedCaches Closed caches.
+     */
+    private void scheduleClientChangeMessage(Map<Integer, Boolean> startedCaches, Set<Integer> closedCaches) {
+        ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get();
+
+        if (msg == null) {
+            msg = new ClientCacheChangeDiscoveryMessage(startedCaches, closedCaches);
+
+            clientCacheChanges.set(msg);
+        }
+        else {
+            msg.merge(startedCaches, closedCaches);
+
+            if (msg.empty()) {
+                cctx.time().removeTimeoutObject(msg.updateTimeoutObject());
+
+                clientCacheChanges.remove();
+
+                return;
+            }
+        }
+
+        if (msg.updateTimeoutObject() != null)
+            cctx.time().removeTimeoutObject(msg.updateTimeoutObject());
+
+        long timeout = clientCacheMsgTimeout;
+
+        if (timeout <= 0)
+            timeout = 10_000;
+
+        ClientCacheUpdateTimeout timeoutObj = new ClientCacheUpdateTimeout(cctx, timeout);
+
+        msg.updateTimeoutObject(timeoutObj);
+
+        cctx.time().addTimeoutObject(timeoutObj);
+    }
+
+    /**
      * Called on exchange initiated for cache start/stop request.
      *
      * @param fut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
index 38d9921..8639ffe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -17,9 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -34,24 +39,97 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage
     private final IgniteUuid id = IgniteUuid.randomUuid();;
 
     /** */
+    @GridToStringInclude
     private Map<Integer, Boolean> startedCaches;
 
     /** */
+    @GridToStringInclude
     private Set<Integer> closedCaches;
 
+    /** Update timeout object, used to batch multiple starts/close into single discovery message. */
+    private transient ClientCacheUpdateTimeout updateTimeoutObj;
+
     /**
-     * @param startedCaches
-     * @param closedCaches
+     * @param startedCaches Started caches.
+     * @param closedCaches Closed caches.
      */
     public ClientCacheChangeDiscoveryMessage(Map<Integer, Boolean> startedCaches, Set<Integer> closedCaches) {
         this.startedCaches = startedCaches;
         this.closedCaches = closedCaches;
     }
 
+    /**
+     * @param startedCaches Started caches.
+     * @param closedCaches Closed caches.
+     */
+    public void merge(@Nullable Map<Integer, Boolean> startedCaches, @Nullable Set<Integer> closedCaches) {
+        Map<Integer, Boolean> startedCaches0 = this.startedCaches;
+        Set<Integer> closedCaches0 = this.closedCaches;
+
+        if (startedCaches != null) {
+            if (startedCaches0 == null)
+                startedCaches0 = new HashMap<>();
+
+            for (Map.Entry<Integer, Boolean> e : startedCaches.entrySet()) {
+                if (closedCaches0 != null && closedCaches0.remove(e.getKey()))
+                    continue;
+
+                Boolean old = startedCaches0.put(e.getKey(), e.getValue());
+
+                assert old == null : e.getKey();
+            }
+
+            this.startedCaches = startedCaches0;
+        }
+
+        if (closedCaches != null) {
+            if (closedCaches0 == null)
+                closedCaches0 = new HashSet<>();
+
+            for (Integer cacheId : closedCaches) {
+                if (startedCaches0 != null && startedCaches0.remove(cacheId))
+                    continue;
+
+                boolean add = closedCaches0.add(cacheId);
+
+                assert add : cacheId;
+            }
+
+            this.closedCaches = closedCaches0;
+        }
+    }
+
+    /**
+     * @return {@code True} if there are no info about started/closed caches.
+     */
+    public boolean empty() {
+        return F.isEmpty(startedCaches) && F.isEmpty(closedCaches);
+    }
+
+    /**
+     * @return Update timeout object.
+     */
+    public ClientCacheUpdateTimeout updateTimeoutObject() {
+        return updateTimeoutObj;
+    }
+
+    /**
+     * @param updateTimeoutObj Update timeout object.
+     */
+    public void updateTimeoutObject(ClientCacheUpdateTimeout updateTimeoutObj) {
+        this.updateTimeoutObj = updateTimeoutObj;
+    }
+
+    /**
+     * @return Started caches map (cache ID to near enabled flag).
+     */
     @Nullable public Map<Integer, Boolean> startedCaches() {
         return startedCaches;
     }
 
+    /**
+     * @return Closed caches.
+     */
     @Nullable public Set<Integer> closedCaches() {
         return closedCaches;
     }
@@ -70,4 +148,9 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage
     @Override public boolean isMutable() {
         return false;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClientCacheChangeDiscoveryMessage.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
new file mode 100644
index 0000000..aab3a3e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+
+/**
+ *
+ */
+class ClientCacheUpdateTimeout extends GridTimeoutObjectAdapter implements CachePartitionExchangeWorkerTask {
+    /** */
+    private final GridCacheSharedContext cctx;
+
+    /**
+     * @param cctx Context.
+     * @param timeout Timeout.
+     */
+    ClientCacheUpdateTimeout(GridCacheSharedContext cctx, long timeout) {
+        super(timeout);
+
+        this.cctx = cctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (!cctx.kernalContext().isStopping())
+            cctx.exchange().addCustomTask(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2b8d491..901667f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -337,6 +337,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param task Task to run in exchange worker thread.
+     */
+    public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+        assert task != null;
+
+        exchWorker.addCustomTask(task);
+    }
+
+    /**
      * @return Reconnect partition exchange future.
      */
     public IgniteInternalFuture<?> reconnectExchangeFuture() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2f7fef2..b1e6b7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -377,7 +377,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param task Task.
      */
-    public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
+    void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
         if (task instanceof SchemaExchangeWorkerTask) {
             SchemaAbstractDiscoveryMessage msg = ((SchemaExchangeWorkerTask)task).message();
 
@@ -399,6 +399,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             sharedCtx.affinity().processClientCachesChanges(task0);
         }
+        else if (task instanceof ClientCacheUpdateTimeout) {
+            ClientCacheUpdateTimeout task0 = (ClientCacheUpdateTimeout)task;
+
+            sharedCtx.affinity().sendClientCacheChangesMessage(task0);
+        }
         else
             U.warn(log, "Unsupported custom exchange task: " + task);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
index d374f29..ac0efa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -18,37 +18,57 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
  */
-public class ClientCacheDhtTopologyFuture extends GridFinishedFuture<AffinityTopologyVersion>
+public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter
     implements GridDhtTopologyFuture {
+    /** */
+    final AffinityTopologyVersion topVer;
+
     /**
-     * @param topVer Exchange topology version.
+     * @param topVer Topology version.
      */
     public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer) {
-        super(topVer);
+        assert topVer != null;
+
+        this.topVer = topVer;
+
+        onDone(topVer);
+    }
 
+    /**
+     * @param topVer Topology version.
+     * @param e Error.
+     */
+    public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer, IgniteCheckedException e) {
+        assert e != null;
         assert topVer != null;
+
+        this.topVer = topVer;
+
+        onDone(e);
     }
 
-    /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return result();
+    /**
+     * @param grp Cache group.
+     * @param topNodes Topology nodes.
+     */
+    public void validate(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
+        grpValidRes = U.newHashMap(1);
+
+        grpValidRes.put(grp.groupId(), validateCacheGroup(grp,topNodes));
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Throwable validateCache(GridCacheContext cctx,
-        boolean recovery,
-        boolean read,
-        @Nullable Object key,
-        @Nullable Collection<?> keys) {
-        return null;
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
new file mode 100644
index 0000000..e70f383
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+
+/**
+ *
+ */
+public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<AffinityTopologyVersion>
+    implements GridDhtTopologyFuture {
+    /** Cache groups validation results. */
+    protected volatile Map<Integer, CacheValidation> grpValidRes;
+
+    /**
+     * @param grp Cache group.
+     * @param topNodes Topology nodes.
+     * @return Validation result.
+     */
+    protected final CacheValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
+        Collection<Integer> lostParts = grp.isLocal() ?
+            Collections.<Integer>emptyList() : grp.topology().lostPartitions();
+
+        boolean valid = true;
+
+        if (!grp.systemCache()) {
+            TopologyValidator validator = grp.topologyValidator();
+
+            if (validator != null)
+                valid = validator.validate(topNodes);
+        }
+
+        return new CacheValidation(valid, lostParts);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public final Throwable validateCache(
+        GridCacheContext cctx,
+        boolean recovery,
+        boolean read,
+        @Nullable Object key,
+        @Nullable Collection<?> keys
+    ) {
+        assert isDone() : this;
+
+        Throwable err = error();
+
+        if (err != null)
+            return err;
+
+        if (!cctx.shared().kernalContext().state().active())
+            return new CacheInvalidStateException(
+                "Failed to perform cache operation (cluster is not activated): " + cctx.name());
+
+        CacheGroupContext grp = cctx.group();
+
+        PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy();
+
+        if (grp.needsRecovery() && !recovery) {
+            if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL))
+                return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " +
+                    cctx.name());
+        }
+
+        if (grp.needsRecovery() || grp.topologyValidator() != null) {
+            CacheValidation validation = grpValidRes.get(grp.groupId());
+
+            if (validation == null)
+                return null;
+
+            if (!validation.valid && !read)
+                return new IgniteCheckedException("Failed to perform cache operation " +
+                    "(cache topology is not valid): " + cctx.name());
+
+            if (recovery || !grp.needsRecovery())
+                return null;
+
+            if (key != null) {
+                int p = cctx.affinity().partition(key);
+
+                CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p,
+                    validation.lostParts, partLossPlc);
+
+                if (ex != null)
+                    return ex;
+            }
+
+            if (keys != null) {
+                for (Object k : keys) {
+                    int p = cctx.affinity().partition(k);
+
+                    CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p,
+                        validation.lostParts, partLossPlc);
+
+                    if (ex != null)
+                        return ex;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param read Read flag.
+     * @param key Key to check.
+     * @param part Partition this key belongs to.
+     * @param lostParts Collection of lost partitions.
+     * @param plc Partition loss policy.
+     * @return Invalid state exception if this operation is disallowed.
+     */
+    private CacheInvalidStateException validatePartitionOperation(
+        String cacheName,
+        boolean read,
+        Object key,
+        int part,
+        Collection<Integer> lostParts,
+        PartitionLossPolicy plc
+    ) {
+        if (lostParts.contains(part)) {
+            if (!read) {
+                assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE;
+
+                if (plc == READ_WRITE_SAFE) {
+                    return new CacheInvalidStateException("Failed to execute cache operation " +
+                        "(all partition owners have left the grid, partition data has been lost) [" +
+                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+                }
+            }
+            else {
+                // Read.
+                if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE)
+                    return new CacheInvalidStateException("Failed to execute cache operation " +
+                        "(all partition owners have left the grid, partition data has been lost) [" +
+                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Cache validation result.
+     */
+    protected static class CacheValidation {
+        /** Topology validation result. */
+        private boolean valid;
+
+        /** Lost partitions on this topology version. */
+        private Collection<Integer> lostParts;
+
+        /**
+         * @param valid Valid flag.
+         * @param lostParts Lost partitions.
+         */
+        private CacheValidation(boolean valid, Collection<Integer> lostParts) {
+            this.valid = valid;
+            this.lostParts = lostParts;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cd99df7..571cc3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -57,7 +56,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -66,11 +64,11 @@ import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
@@ -95,10 +93,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DU
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -109,8 +103,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  * Future for exchanging partition maps.
  */
 @SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
-public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
-    implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware {
+public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapter
+    implements Comparable<GridDhtPartitionsExchangeFuture>, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware {
     /** Dummy flag. */
     private final boolean dummy;
 
@@ -194,9 +188,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
 
-    /** Cache groups validation results. */
-    private volatile Map<Integer, CacheValidation> grpValidRes;
-
     /** Skip preload flag. */
     private boolean skipPreload;
 
@@ -1199,19 +1190,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 discoEvt.type() == EVT_NODE_JOINED)
                 detectLostPartitions();
 
-            Map<Integer, CacheValidation> m = new HashMap<>(cctx.cache().cacheGroups().size());
-
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                Collection<Integer> lostParts = grp.isLocal() ?
-                    Collections.<Integer>emptyList() : grp.topology().lostPartitions();
-
-                boolean valid = true;
+            Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
 
-                if (grp.topologyValidator() != null && !grp.systemCache())
-                    valid = grp.topologyValidator().validate(discoEvt.topologyNodes());
-
-                m.put(grp.groupId(), new CacheValidation(valid, lostParts));
-            }
+            for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                m.put(grp.groupId(), validateCacheGroup(grp, discoEvt.topologyNodes()));
 
             grpValidRes = m;
         }
@@ -1251,113 +1233,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         return dummy;
     }
 
-    /** {@inheritDoc} */
-    @Nullable @Override public Throwable validateCache(
-        GridCacheContext cctx,
-        boolean recovery,
-        boolean read,
-        @Nullable Object key,
-        @Nullable Collection<?> keys
-    ) {
-        assert isDone() : this;
-
-        Throwable err = error();
-
-        if (err != null)
-            return err;
-
-        if (!cctx.shared().kernalContext().state().active())
-            return new CacheInvalidStateException(
-                "Failed to perform cache operation (cluster is not activated): " + cctx.name());
-
-        CacheGroupContext grp = cctx.group();
-
-        PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy();
-
-        if (grp.needsRecovery() && !recovery) {
-            if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL))
-                return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " +
-                    cctx.name());
-        }
-
-        if (grp.needsRecovery() || grp.topologyValidator() != null) {
-            CacheValidation validation = grpValidRes.get(grp.groupId());
-
-            if (validation == null)
-                return null;
-
-            if (!validation.valid && !read)
-                return new IgniteCheckedException("Failed to perform cache operation " +
-                    "(cache topology is not valid): " + cctx.name());
-
-            if (recovery || !grp.needsRecovery())
-                return null;
-
-            if (key != null) {
-                int p = cctx.affinity().partition(key);
-
-                CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p,
-                    validation.lostParts, partLossPlc);
-
-                if (ex != null)
-                    return ex;
-            }
-
-            if (keys != null) {
-                for (Object k : keys) {
-                    int p = cctx.affinity().partition(k);
-
-                    CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p,
-                        validation.lostParts, partLossPlc);
-
-                    if (ex != null)
-                        return ex;
-                }
-            }
-        }
-
-        return null;
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @param read Read flag.
-     * @param key Key to check.
-     * @param part Partition this key belongs to.
-     * @param lostParts Collection of lost partitions.
-     * @param plc Partition loss policy.
-     * @return Invalid state exception if this operation is disallowed.
-     */
-    private CacheInvalidStateException validatePartitionOperation(
-        String cacheName,
-        boolean read,
-        Object key,
-        int part,
-        Collection<Integer> lostParts,
-        PartitionLossPolicy plc
-    ) {
-        if (lostParts.contains(part)) {
-            if (!read) {
-                assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE;
-
-                if (plc == READ_WRITE_SAFE) {
-                    return new CacheInvalidStateException("Failed to execute cache operation " +
-                        "(all partition owners have left the grid, partition data has been lost) [" +
-                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
-                }
-            }
-            else {
-                // Read.
-                if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE)
-                    return new CacheInvalidStateException("Failed to execute cache operation " +
-                        "(all partition owners have left the grid, partition data has been lost) [" +
-                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
-            }
-        }
-
-        return null;
-    }
-
     /**
      * Cleans up resources to avoid excessive memory usage.
      */
@@ -2149,26 +2024,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         NONE
     }
 
-    /**
-     * Cache validation result.
-     */
-    private static class CacheValidation {
-        /** Topology validation result. */
-        private boolean valid;
-
-        /** Lost partitions on this topology version. */
-        private Collection<Integer> lostParts;
-
-        /**
-         * @param valid Valid flag.
-         * @param lostParts Lost partitions.
-         */
-        private CacheValidation(boolean valid, Collection<Integer> lostParts) {
-            this.valid = valid;
-            this.lostParts = lostParts;
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) {
         if (!isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
new file mode 100644
index 0000000..f32e15f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheServerNotFoundException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteClientCacheStartFailoverTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientStartCoordinatorFailsAtomic() throws Exception {
+        clientStartCoordinatorFails(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientStartCoordinatorFailsTx() throws Exception {
+        clientStartCoordinatorFails(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void clientStartCoordinatorFails(CacheAtomicityMode atomicityMode) throws Exception {
+        Ignite srv0 = startGrids(3);
+
+        final int KEYS = 500;
+
+        IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration(DEFAULT_CACHE_NAME, atomicityMode, 1));
+
+        for (int i = 0; i < KEYS; i++)
+            cache.put(i, i);
+
+        client = true;
+
+        final Ignite c = startGrid(3);
+
+        TestRecordingCommunicationSpi.spi(srv0).blockMessages(GridDhtAffinityAssignmentResponse.class, c.name());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                c.cache(DEFAULT_CACHE_NAME);
+
+                return null;
+            }
+        }, "start-cache");
+
+        U.sleep(1000);
+
+        assertFalse(fut.isDone());
+
+        stopGrid(0);
+
+        fut.get();
+
+        cache = c.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < KEYS; i++) {
+            assertEquals(i, cache.get(i));
+
+            cache.put(i, i + 1);
+
+            assertEquals(i + 1, cache.get(i));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientStartLastServerFailsAtomic() throws Exception {
+        clientStartLastServerFails(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientStartLastServerFailsTx() throws Exception {
+        clientStartLastServerFails(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void clientStartLastServerFails(CacheAtomicityMode atomicityMode) throws Exception {
+        startGrids(3);
+
+        CacheConfiguration<Object, Object> cfg = cacheConfiguration(DEFAULT_CACHE_NAME, atomicityMode, 1);
+
+        cfg.setNodeFilter(new TestNodeFilter(getTestIgniteInstanceName(1)));
+
+        Ignite srv1 = ignite(1);
+
+        srv1.createCache(cfg);
+
+        client = true;
+
+        final Ignite c = startGrid(3);
+
+        client = false;
+
+        TestRecordingCommunicationSpi.spi(srv1).blockMessages(GridDhtAffinityAssignmentResponse.class, c.name());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                c.cache(DEFAULT_CACHE_NAME);
+
+                return null;
+            }
+        }, "start-cache");
+
+        U.sleep(1000);
+
+        assertFalse(fut.isDone());
+
+        stopGrid(1);
+
+        fut.get();
+
+        final IgniteCache<Object, Object> clientCache = c.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 10; i++) {
+            final int k = i;
+
+            GridTestUtils.assertThrows(log, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    clientCache.get(k);
+
+                    return null;
+                }
+            }, CacheServerNotFoundException.class, null);
+
+            GridTestUtils.assertThrows(log, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    clientCache.put(k, k);
+
+                    return null;
+                }
+            }, CacheServerNotFoundException.class, null);
+
+            GridTestUtils.assertThrows(log, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    clientCache.remove(k);
+
+                    return null;
+                }
+            }, CacheServerNotFoundException.class, null);
+        }
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        for (int i = 0; i < 100; i++) {
+            assertNull(clientCache.get(i));
+
+            clientCache.put(i, i);
+
+            assertEquals(i, clientCache.get(i));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRebalanceState() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        List<String> cacheNames = startCaches(ignite(0), 100);
+
+        client = true;
+
+        Ignite c = startGrid(SRVS);
+
+        assertTrue(c.configuration().isClientMode());
+
+        awaitPartitionMapExchange();
+
+        client = false;
+
+        TestRecordingCommunicationSpi.spi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode clusterNode, Message msg) {
+                return msg instanceof GridDhtPartitionsFullMessage &&
+                    ((GridDhtPartitionsFullMessage) msg).exchangeId() == null;
+            }
+        });
+
+        startGrid(SRVS + 1);
+
+        for (String cacheName : cacheNames)
+            c.cache(cacheName);
+
+        U.sleep(1000);
+
+        for (int i = 0; i < SRVS + 1; i++) {
+            AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS + 2);
+
+            IgniteKernal node = (IgniteKernal)ignite(i);
+
+            for (String cacheName : cacheNames) {
+                GridDhtPartitionTopology top = node.context().cache().internalCache(cacheName).context().topology();
+
+                assertEquals(topVer, top.topologyVersion());
+
+                assertFalse(top.rebalanceFinished(topVer));
+            }
+        }
+
+        TestRecordingCommunicationSpi.spi(ignite(0)).stopBlock();
+
+        for (int i = 0; i < SRVS + 1; i++) {
+            final AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS + 2, 1);
+
+            final IgniteKernal node = (IgniteKernal)ignite(i);
+
+            for (String cacheName : cacheNames) {
+                final GridDhtPartitionTopology top = node.context().cache().internalCache(cacheName).context().topology();
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return top.rebalanceFinished(topVer);
+                    }
+                }, 5000);
+
+                assertTrue(top.rebalanceFinished(topVer));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRebalanceStateConcurrentStart() throws Exception {
+        final int SRVS1 = 3;
+        final int CLIENTS = 5;
+        final int SRVS2 = 5;
+
+        startGrids(SRVS1);
+
+        Ignite srv0 = ignite(0);
+
+        final int KEYS = 1000;
+
+        final List<String> cacheNames = startCaches(srv0, KEYS);
+
+        client = true;
+
+        final List<Ignite> clients = new ArrayList<>();
+
+        for (int i = 0; i < CLIENTS; i++)
+            clients.add(startGrid(SRVS1 + i));
+
+        client = false;
+
+        final CyclicBarrier barrier = new CyclicBarrier(clients.size() + SRVS2);
+
+        final AtomicInteger clientIdx = new AtomicInteger();
+
+        final Set<Integer> keys = new HashSet<>();
+
+        for (int i = 0; i < KEYS; i++)
+            keys.add(i);
+
+        IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                barrier.await();
+
+                Ignite client = clients.get(clientIdx.getAndIncrement());
+
+                for (String cacheName : cacheNames)
+                    client.cache(cacheName);
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                for (int i = 0; i < 10; i++) {
+                    for (String cacheName : cacheNames) {
+                        IgniteCache<Object, Object> cache = client.cache(cacheName);
+
+                        Map<Object, Object> map0 = cache.getAll(keys);
+
+                        assertEquals(KEYS, map0.size());
+
+                        cache.put(rnd.nextInt(KEYS), i);
+                    }
+                }
+
+                return null;
+            }
+        }, clients.size(), "client-cache-start");
+
+        final AtomicInteger srvIdx = new AtomicInteger(SRVS1 + CLIENTS);
+
+        IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                barrier.await();
+
+                startGrid(srvIdx.incrementAndGet());
+
+                return null;
+            }
+        }, SRVS2, "node-start");
+
+        fut1.get();
+        fut2.get();
+
+        final AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS1 + SRVS2 + CLIENTS, 1);
+
+        for (Ignite client : clients) {
+            for (String cacheName : cacheNames) {
+                final GridDhtPartitionTopology top =
+                    ((IgniteKernal)client).context().cache().internalCache(cacheName).context().topology();
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return top.rebalanceFinished(topVer);
+                    }
+                }, 5000);
+
+                assertTrue(top.rebalanceFinished(topVer));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientStartCloseServersRestart() throws Exception {
+        final int SRVS = 4;
+        final int CLIENTS = 4;
+
+        startGrids(SRVS);
+
+        final List<String> cacheNames = startCaches(ignite(0), 1000);
+
+        client = true;
+
+        final List<Ignite> clients = new ArrayList<>();
+
+        for (int i = 0; i < CLIENTS; i++)
+            clients.add(startGrid(SRVS + i));
+
+        client = false;
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while (!stop.get()) {
+                    int nodeIdx = rnd.nextInt(SRVS);
+
+                    stopGrid(nodeIdx);
+
+                    U.sleep(rnd.nextLong(200) + 1);
+
+                    startGrid(nodeIdx);
+
+                    U.sleep(rnd.nextLong(200) + 1);
+                }
+
+                return null;
+            }
+        }, "restart");
+
+        final AtomicInteger clientIdx = new AtomicInteger();
+
+        IgniteInternalFuture<?> clientsFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                Ignite client = clients.get(clientIdx.getAndIncrement());
+
+                assertTrue(client.configuration().isClientMode());
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while (!stop.get()) {
+                    for (String cacheName : cacheNames)
+                        client.cache(cacheName);
+
+                    for (String cacheName : cacheNames) {
+                        IgniteCache<Object, Object> cache = client.cache(cacheName);
+
+                        cache.put(rnd.nextInt(1000), rnd.nextInt());
+
+                        cache.get(rnd.nextInt(1000));
+                    }
+
+                    for (String cacheName : cacheNames) {
+                        IgniteCache<Object, Object> cache = client.cache(cacheName);
+
+                        cache.close();
+                    }
+                }
+
+                return null;
+            }
+        }, CLIENTS, "client-thread");
+
+        try {
+            U.sleep(10_000);
+
+            stop.set(true);
+
+            restartFut.get();
+            clientsFut.get();
+        }
+        finally {
+            stop.set(true);
+        }
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (Ignite client : clients) {
+            for (String cacheName : cacheNames) {
+                IgniteCache<Object, Object> cache = client.cache(cacheName);
+
+                for (int i = 0; i < 10; i++) {
+                    Integer key = rnd.nextInt(1000);
+
+                    cache.put(key, i);
+
+                    assertEquals(i, cache.get(key));
+                }
+            }
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param keys Number of keys to put in caches.
+     * @return Cache names.
+     */
+    private List<String> startCaches(Ignite node, int keys) {
+        List<String> cacheNames = new ArrayList<>();
+
+        final Map<Integer, Integer> map = new HashMap<>();
+
+        for (int i = 0; i < keys; i++)
+            map.put(i, i);
+
+        for (int i = 0; i < 3; i++) {
+            CacheConfiguration<Object, Object> ccfg = cacheConfiguration("atomic-" + i, ATOMIC, i);
+
+            IgniteCache<Object, Object> cache = node.createCache(ccfg);
+
+            cacheNames.add(ccfg.getName());
+
+            cache.putAll(map);
+        }
+
+        for (int i = 0; i < 3; i++) {
+            CacheConfiguration<Object, Object> ccfg = cacheConfiguration("tx-" + i, TRANSACTIONAL, i);
+
+            IgniteCache<Object, Object> cache = node.createCache(ccfg);
+
+            cacheNames.add(ccfg.getName());
+
+            cache.putAll(map);
+        }
+
+        return cacheNames;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Cache atomicity mode.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(String name, CacheAtomicityMode atomicityMode, int backups) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+    /**
+     *
+     */
+    private static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** */
+        private final String includeName;
+
+        /**
+         * @param includeName Node to include.
+         */
+        public TestNodeFilter(String includeName) {
+            this.includeName = includeName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return includeName.equals(node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index aea7a2f..58eceaa 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -115,6 +115,7 @@ import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.support.FileSystemXmlApplicationContext;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -193,6 +194,7 @@ public abstract class GridAbstractTest extends TestCase {
         System.setProperty(IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "10000");
         System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false");
         System.setProperty(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, "1");
+        System.setProperty(IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, "1000");
 
         if (BINARY_MARSHALLER)
             GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());

http://git-wip-us.apache.org/repos/asf/ignite/blob/217f8dbd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 5851551..c69cac6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNode
 import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
+import org.apache.ignite.internal.processors.cache.IgniteClientCacheStartFailoverTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
 import org.apache.ignite.internal.processors.cache.IgniteNearClientCacheCloseTest;
 import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest;
@@ -274,6 +275,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
 
         suite.addTest(new TestSuite(IgniteOnePhaseCommitNearReadersTest.class));
         suite.addTest(new TestSuite(IgniteNearClientCacheCloseTest.class));
+        suite.addTest(new TestSuite(IgniteClientCacheStartFailoverTest.class));
 
         return suite;
     }


Mime
View raw message