ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ivasilin...@apache.org
Subject [22/36] incubator-ignite git commit: # ignite-901 client reconnect support
Date Thu, 16 Jul 2015 15:51:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
new file mode 100644
index 0000000..be3234d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -0,0 +1,1202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import junit.framework.*;
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    private static final int SRV_CNT = 3;
+
+    /** */
+    private static final String STATIC_CACHE = "static-cache";
+
+    /** */
+    private UUID nodeId;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestCommunicationSpi commSpi = new TestCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000);
+
+        if (nodeId != null) {
+            cfg.setNodeId(nodeId);
+
+            nodeId = null;
+        }
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(STATIC_CACHE);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(SRV_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        clientMode = true;
+
+        IgniteEx client = startGrid(SRV_CNT);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        final IgniteCache<Object, Object> staticCache = client.cache(STATIC_CACHE);
+
+        staticCache.put(1, 1);
+
+        assertEquals(1, staticCache.get(1));
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setName("nearCache");
+
+        final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>());
+
+        nearCache.put(1, 1);
+
+        assertEquals(1, nearCache.localPeek(1));
+
+        cache.put(1, 1);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>();
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    assertEquals(1, reconnectLatch.getCount());
+
+                    blockPutRef.set(GridTestUtils.runAsync(new Callable() {
+                        @Override public Object call() throws Exception {
+                            log.info("Start put.");
+
+                            try {
+                                cache.put(2, 2);
+
+                                fail();
+                            }
+                            catch (CacheException e) {
+                                log.info("Expected exception: " + e);
+
+                                IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException) e.getCause();
+
+                                e0.reconnectFuture().get();
+                            }
+
+                            cache.put(2, 2);
+
+                            log.info("Finish put.");
+
+                            return null;
+                        }
+                    }));
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    assertEquals(0, disconnectLatch.getCount());
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        log.info("Fail client.");
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        waitReconnectEvent(disconnectLatch);
+
+        IgniteInternalFuture putFut = blockPutRef.get();
+
+        assertNotDone(putFut);
+
+        U.sleep(5000);
+
+        assertNotDone(putFut);
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+        checkCacheDiscoveryData(srv, client, "nearCache", true, true, true);
+
+        checkCacheDiscoveryData(srv, client, STATIC_CACHE, true, true, false);
+
+        assertEquals(1, cache.get(1));
+
+        putFut.get();
+
+        assertEquals(2, cache.get(2));
+
+        cache.put(3, 3);
+
+        assertEquals(3, cache.get(3));
+
+        assertNull(nearCache.localPeek(1));
+
+        staticCache.put(10, 10);
+
+        assertEquals(10, staticCache.get(10));
+
+        nearCache.put(20, 20);
+
+        srv.cache(nearCache.getName()).put(20, 21);
+
+        assertEquals(21, nearCache.localPeek(20));
+
+        this.clientMode = false;
+
+        IgniteEx srv2 = startGrid(SRV_CNT + 1);
+
+        Integer key = primaryKey(srv2.cache(null));
+
+        cache.put(key, 4);
+
+        assertEquals(4, cache.get(key));
+
+        checkCacheDiscoveryData(srv2, client, null, true, true, false);
+
+        checkCacheDiscoveryData(srv2, client, "nearCache", true, true, true);
+
+        checkCacheDiscoveryData(srv2, client, STATIC_CACHE, true, true, false);
+
+        staticCache.put(20, 20);
+
+        assertEquals(20, staticCache.get(20));
+
+        srv.cache(nearCache.getName()).put(20, 22);
+
+        assertEquals(22, nearCache.localPeek(20));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectTransactions() throws Exception {
+        clientMode = true;
+
+        IgniteEx client = startGrid(SRV_CNT);
+
+        Ignite srv = clientRouter(client);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+        final IgniteTransactions txs = client.transactions();
+
+        final Transaction tx = txs.txStart(OPTIMISTIC, REPEATABLE_READ);
+
+        cache.put(1, 1);
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                try {
+                    tx.commit();
+
+                    fail();
+                } catch (IgniteClientDisconnectedException e) {
+                    log.info("Expected error: " + e);
+
+                    assertNotNull(e.reconnectFuture());
+                }
+
+                try {
+                    txs.txStart();
+
+                    fail();
+                } catch (IgniteClientDisconnectedException e) {
+                    log.info("Expected error: " + e);
+
+                    assertNotNull(e.reconnectFuture());
+                }
+            }
+        });
+
+        assertNull(txs.tx());
+
+        try (Transaction tx0 = txs.txStart(OPTIMISTIC, REPEATABLE_READ)) {
+            cache.put(1, 1);
+
+            assertEquals(1, cache.get(1));
+
+            tx0.commit();
+        }
+
+        try (Transaction tx0 = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            cache.put(2, 2);
+
+            assertEquals(2, cache.get(2));
+
+            tx0.commit();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectTransactionInProgress1() throws Exception {
+        clientMode = true;
+
+        IgniteEx client = startGrid(SRV_CNT);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+        reconnectTransactionInProgress1(client, OPTIMISTIC, cache);
+
+        reconnectTransactionInProgress1(client, PESSIMISTIC, cache);
+    }
+
+    /**
+     * @param client Client.
+     * @param txConcurrency Transaction concurrency mode.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void reconnectTransactionInProgress1(IgniteEx client,
+        final TransactionConcurrency txConcurrency,
+        final IgniteCache<Object, Object> cache)
+        throws Exception
+    {
+        Ignite srv = clientRouter(client);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+        final TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        final IgniteTransactions txs = client.transactions();
+
+        final CountDownLatch afterPut1 = new CountDownLatch(1);
+
+        final CountDownLatch afterPut2 = new CountDownLatch(1);
+
+        final CountDownLatch putFailed = new CountDownLatch(1);
+
+        IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                try {
+                    log.info("Start tx1: " + txConcurrency);
+
+                    try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+                        cache.put(1, 1);
+
+                        afterPut1.countDown();
+
+                        afterPut2.await();
+
+                        cache.put(2, 2);
+
+                        fail();
+                    }
+                    catch (CacheException e) {
+                        log.info("Expected exception: " + e);
+
+                        putFailed.countDown();
+
+                        IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
+
+                        e0.reconnectFuture().get();
+                    }
+
+                    log.info("Start tx2: " + txConcurrency);
+
+                    try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+                        cache.put(1, 1);
+
+                        cache.put(2, 2);
+
+                        tx.commit();
+                    }
+
+                    assertEquals(1, cache.get(1));
+                    assertEquals(2, cache.get(2));
+
+                    try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+                        cache.put(3, 3);
+
+                        cache.put(4, 4);
+
+                        tx.commit();
+                    }
+
+                    assertEquals(1, cache.get(1));
+                    assertEquals(2, cache.get(2));
+                    assertEquals(3, cache.get(3));
+                    assertEquals(4, cache.get(4));
+
+                    cache.removeAll();
+
+                    return true;
+                }
+                catch (AssertionFailedError e) {
+                    throw e;
+                }
+                catch (Throwable e) {
+                    log.error("Unexpected error", e);
+
+                    fail("Unexpected error: " + e);
+
+                    return false;
+                }
+            }
+        });
+
+        assertTrue(afterPut1.await(5000, MILLISECONDS));
+
+        assertNotDone(fut);
+
+        srvSpi.failNode(client.localNode().id(), null);
+
+        waitReconnectEvent(disconnectLatch);
+
+        afterPut2.countDown();
+
+        assertTrue(putFailed.await(5000, MILLISECONDS));
+
+        clientSpi.writeLatch.countDown();
+
+        waitReconnectEvent(reconnectLatch);
+
+        assertTrue(fut.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectTransactionInProgress2() throws Exception {
+        clientMode = true;
+
+        final IgniteEx client = startGrid(SRV_CNT);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        txInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, OPTIMISTIC, 1);
+
+        txInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, PESSIMISTIC, 2);
+
+        txInProgressFails(client, ccfg, GridNearTxFinishResponse.class, OPTIMISTIC, 3);
+
+        txInProgressFails(client, ccfg, GridNearTxFinishResponse.class, PESSIMISTIC, 4);
+
+        txInProgressFails(client, ccfg, GridNearLockResponse.class, PESSIMISTIC, 5);
+    }
+
+    /**
+     * @param client Client.
+     * @param ccfg Cache configuration.
+     * @param msgToBlock Message to block.
+     * @param txConcurrency Transaction concurrency mode.
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void txInProgressFails(final IgniteEx client,
+        final CacheConfiguration<Object, Object> ccfg,
+        Class<?> msgToBlock,
+        final TransactionConcurrency txConcurrency,
+        final Integer key) throws Exception {
+        log.info("Test tx failure [msg=" + msgToBlock + ", txMode=" + txConcurrency + ", key=" + key + ']');
+
+        checkOperationInProgressFails(client, ccfg, msgToBlock,
+            new CI1<IgniteCache<Object, Object>>() {
+                @Override public void apply(IgniteCache<Object, Object> cache) {
+                    try (Transaction tx = client.transactions().txStart(txConcurrency, REPEATABLE_READ)) {
+                        log.info("Put1: " + key);
+
+                        cache.put(key, key);
+
+                        Integer key2 = key + 1;
+
+                        log.info("Put2: " + key2);
+
+                        cache.put(key2, key2);
+
+                        log.info("Commit [key1=" + key + ", key2=" + key2 + ']');
+
+                        tx.commit();
+                    }
+                }
+            }
+        );
+
+        IgniteCache<Object, Object> cache = client.cache(ccfg.getName());
+
+        assertEquals(key, cache.get(key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectExchangeInProgress() throws Exception {
+        clientMode = true;
+
+        IgniteEx client = startGrid(SRV_CNT);
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        TestCommunicationSpi coordCommSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
+
+        coordCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, client.localNode().id());
+
+        clientMode = false;
+
+        startGrid(SRV_CNT + 1);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        try {
+            coordCommSpi.stopBlock(true);
+
+            fail();
+        }
+        catch (IgniteException e) {
+            log.info("Expected error: " + e);
+        }
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("newCache");
+
+        ccfg.setCacheMode(REPLICATED);
+
+        log.info("Start new cache.");
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+        cache.put(1, 1);
+
+        assertEquals(1, cache.get(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectInitialExchangeInProgress() throws Exception {
+        final UUID clientId = UUID.randomUUID();
+
+        Ignite srv = grid(0);
+
+        final CountDownLatch joinLatch = new CountDownLatch(1);
+
+        srv.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_NODE_JOINED && ((DiscoveryEvent)evt).eventNode().id().equals(clientId)) {
+                    info("Client joined: " + evt);
+
+                    joinLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_NODE_JOINED);
+
+        TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+        srvCommSpi.blockMessages(GridDhtPartitionsFullMessage.class, clientId);
+
+        clientMode = true;
+
+        nodeId = clientId;
+
+        IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                try {
+                    Ignition.start(getConfiguration(getTestGridName(SRV_CNT)));
+
+                    fail();
+
+                    return false;
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    log.info("Expected start error: " + e);
+
+                    try {
+                        e.reconnectFuture().get();
+
+                        fail();
+                    }
+                    catch (IgniteException e0) {
+                        log.info("Expected future error: " + e0);
+                    }
+
+                    return true;
+                }
+                catch (Throwable e) {
+                    log.error("Unexpected error: " + e, e);
+
+                    throw e;
+                }
+            }
+        });
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        assertTrue(joinLatch.await(5000, MILLISECONDS));
+
+        U.sleep(1000);
+
+        assertNotDone(fut);
+
+        srvSpi.failNode(clientId, null);
+
+        srvCommSpi.stopBlock(false);
+
+        assertTrue(fut.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectOperationInProgress() throws Exception {
+        clientMode = true;
+
+        IgniteEx client = startGrid(SRV_CNT);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+                    info("Client disconnected: " + evt);
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED)
+                    info("Client reconnected: " + evt);
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        IgniteInClosure<IgniteCache<Object, Object>> putOp = new CI1<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                cache.put(1, 1);
+            }
+        };
+
+        IgniteInClosure<IgniteCache<Object, Object>> getOp = new CI1<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                cache.get(1);
+            }
+        };
+
+        int cnt = 0;
+
+        for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
+            CacheAtomicWriteOrderMode[] writeOrders =
+                atomicityMode == ATOMIC ? CacheAtomicWriteOrderMode.values() :
+                new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK};
+
+            for (CacheAtomicWriteOrderMode writeOrder : writeOrders) {
+                for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+                    CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+                    ccfg.setAtomicityMode(atomicityMode);
+
+                    ccfg.setAtomicWriteOrderMode(writeOrder);
+
+                    ccfg.setName("cache-" + cnt++);
+
+                    ccfg.setWriteSynchronizationMode(syncMode);
+
+                    if (syncMode != FULL_ASYNC) {
+                        Class<?> cls = (ccfg.getAtomicityMode() == ATOMIC) ?
+                            GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class;
+
+                        log.info("Test cache put [atomicity=" + atomicityMode +
+                            ", writeOrder=" + writeOrder +
+                            ", syncMode=" + syncMode + ']');
+
+                        checkOperationInProgressFails(client, ccfg, cls, putOp);
+
+                        client.destroyCache(ccfg.getName());
+                    }
+
+                    log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']');
+
+                    checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp);
+
+                    client.destroyCache(ccfg.getName());
+                }
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectCacheDestroyed() throws Exception {
+        clientMode = true;
+
+        final IgniteEx client = startGrid(SRV_CNT);
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srv.destroyCache(null);
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return clientCache.get(1);
+            }
+        }, IllegalStateException.class, null);
+
+        checkCacheDiscoveryData(srv, client, null, false, false, false);
+
+        IgniteCache<Object, Object> clientCache0 = client.getOrCreateCache(new CacheConfiguration<>());
+
+        checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+        clientCache0.put(1, 1);
+
+        assertEquals(1, clientCache0.get(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectCacheDestroyedAndCreated() throws Exception {
+        clientMode = true;
+
+        final Ignite client = startGrid(SRV_CNT);
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        assertEquals(ATOMIC,
+            clientCache.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srv.destroyCache(null);
+
+                CacheConfiguration ccfg = new CacheConfiguration();
+
+                ccfg.setAtomicityMode(TRANSACTIONAL);
+
+                srv.getOrCreateCache(ccfg);
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return clientCache.get(1);
+            }
+        }, IllegalStateException.class, null);
+
+        checkCacheDiscoveryData(srv, client, null, true, false, false);
+
+        IgniteCache<Object, Object> clientCache0 = client.cache(null);
+
+        checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+        assertEquals(TRANSACTIONAL,
+            clientCache0.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+        clientCache0.put(1, 1);
+
+        assertEquals(1, clientCache0.get(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectMarshallerCache() throws Exception {
+        clientMode = true;
+
+        final Ignite client = startGrid(SRV_CNT);
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+        final IgniteCache<Object, Object> srvCache = srv.cache(null);
+
+        assertNotNull(srvCache);
+
+        clientCache.put(1, new TestClass1());
+        srvCache.put(2, new TestClass2());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertNotNull(srvCache.get(1));
+                assertNotNull(srvCache.get(2));
+
+                srvCache.put(3, new TestClass3());
+            }
+        });
+
+        srvCache.put(4, new TestClass4());
+
+        assertNotNull(clientCache.get(1));
+        assertNotNull(clientCache.get(2));
+        assertNotNull(clientCache.get(3));
+        assertNotNull(clientCache.get(4));
+
+        clientCache.put(5, new TestClass5());
+
+        assertNotNull(srvCache.get(5));
+        assertNotNull(clientCache.get(5));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectClusterRestart() throws Exception {
+        clientMode = true;
+
+        final Ignite client = startGrid(SRV_CNT);
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        clientCache.put(1, new TestClass1());
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        for (int i = 0; i < SRV_CNT; i++)
+            stopGrid(i);
+
+        assertTrue(disconnectLatch.await(30_000, MILLISECONDS));
+
+        clientMode = false;
+
+        Ignite srv = startGrid(0);
+
+        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return clientCache.get(1);
+            }
+        }, IllegalStateException.class, null);
+
+        IgniteCache<Object, Object> srvCache = srv.getOrCreateCache(new CacheConfiguration<>());
+
+        srvCache.put(1, new TestClass1());
+        srvCache.put(2, new TestClass2());
+
+        IgniteCache<Object, Object> clientCache2 = client.cache(null);
+
+        assertNotNull(clientCache2);
+
+        assertNotNull(clientCache2.get(1));
+        assertNotNull(clientCache2.get(2));
+    }
+
+    /**
+     *
+     */
+    static class TestClass1 implements Serializable {}
+
+    /**
+     *
+     */
+    static class TestClass2 implements Serializable {}
+
+    /**
+     *
+     */
+    static class TestClass3 implements Serializable {}
+
+    /**
+     *
+     */
+    static class TestClass4 implements Serializable {}
+
+    /**
+     *
+     */
+    static class TestClass5 implements Serializable {}
+
+    /**
+     * @param client Client.
+     * @param ccfg Cache configuration.
+     * @param msgToBlock Message to block.
+     * @param c Cache operation closure.
+     * @throws Exception If failed.
+     */
+    private void checkOperationInProgressFails(IgniteEx client,
+        final CacheConfiguration<Object, Object> ccfg,
+        Class<?> msgToBlock,
+        final IgniteInClosure<IgniteCache<Object, Object>> c)
+        throws Exception
+    {
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
+
+        for (int i = 0; i < SRV_CNT; i++) {
+            TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi();
+
+            srvCommSpi.blockMessages(msgToBlock, client.localNode().id());
+        }
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                IgniteClientDisconnectedException e0 = null;
+
+                try {
+                    c.apply(cache);
+
+                    fail();
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    log.info("Expected exception: " + e);
+
+                    e0 = e;
+                }
+                catch (CacheException e) {
+                    log.info("Expected exception: " + e);
+
+                    assertTrue("Unexpected cause: " + e.getCause(),
+                        e.getCause() instanceof IgniteClientDisconnectedException);
+
+                    e0 = (IgniteClientDisconnectedException)e.getCause();
+                }
+
+                assertNotNull(e0);
+                assertNotNull(e0.reconnectFuture());
+
+                e0.reconnectFuture().get();
+
+                c.apply(cache);
+
+                return null;
+            }
+        });
+
+        Thread.sleep(1000);
+
+        assertNotDone(fut);
+
+        log.info("Fail client: " + client.localNode().id());
+
+        srvSpi.failNode(client.localNode().id(), null);
+
+        fut.get();
+
+        for (int i = 0; i < SRV_CNT; i++)
+            ((TestCommunicationSpi)grid(i).configuration().getCommunicationSpi()).stopBlock(false);
+
+        cache.put(1, 1);
+
+        assertEquals(1, cache.get(1));
+    }
+
+    /**
+     * @param srv Server node.
+     * @param client Client node.
+     * @param cacheName Cache name.
+     * @param cacheExists Cache exists flag.
+     * @param clientCache {@code True} if client node has client cache.
+     * @param clientNear {@code True} if client node has near-enabled client cache.
+     */
+    private void checkCacheDiscoveryData(Ignite srv,
+        Ignite client,
+        String cacheName,
+        boolean cacheExists,
+        boolean clientCache,
+        boolean clientNear)
+    {
+        GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery();
+        GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery();
+
+        ClusterNode srvNode = ((IgniteKernal)srv).localNode();
+        ClusterNode clientNode = ((IgniteKernal)client).localNode();
+
+        assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName));
+        assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName));
+
+        assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName));
+
+        if (clientNear)
+            assertTrue(srvDisco.cacheNearNode(clientNode, cacheName));
+        else
+            assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName));
+
+        assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName));
+
+        if (clientNear)
+            assertTrue(clientDisco.cacheNearNode(clientNode, cacheName));
+        else
+            assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName));
+
+        if (cacheExists) {
+            if (clientCache || clientNear) {
+                assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+                assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+            }
+            else {
+                assertFalse(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+                assertFalse(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+            }
+        }
+        else {
+            assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty());
+            assertTrue(srv.cluster().forClientNodes(cacheName).nodes().isEmpty());
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+        /** */
+        private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                Object msg0 = ((GridIoMessage)msg).message();
+
+                synchronized (this) {
+                    Set<UUID> blockNodes = blockCls.get(msg0.getClass());
+
+                    if (F.contains(blockNodes, node.id())) {
+                        log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) +
+                            ", msg=" + msg0 + ']');
+
+                        blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
+
+                        return;
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg);
+        }
+
+        /**
+         * @param cls Message class.
+         * @param nodeId Node ID.
+         */
+        void blockMessages(Class<?> cls, UUID nodeId) {
+            synchronized (this) {
+                Set<UUID> set = blockCls.get(cls);
+
+                if (set == null) {
+                    set = new HashSet<>();
+
+                    blockCls.put(cls, set);
+                }
+
+                set.add(nodeId);
+            }
+        }
+
+        /**
+         * @param snd Send messages flag.
+         */
+        void stopBlock(boolean snd) {
+            synchronized (this) {
+                blockCls.clear();
+
+                if (snd) {
+                    for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+                        ClusterNode node = msg.get1();
+
+                        log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) +
+                            ", msg=" + msg.get2().message() + ']');
+
+                        super.sendMessage(msg.get1(), msg.get2());
+                    }
+                }
+
+                blockedMsgs.clear();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
new file mode 100644
index 0000000..ed811d9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueReconnect() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        queueReconnect(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        queueReconnect(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueReconnectRemoved() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        queueReconnectRemoved(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        queueReconnectRemoved(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueueReconnectInProgress() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        queueReconnectInProgress(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        queueReconnectInProgress(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSetReconnect() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        setReconnect(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        setReconnect(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSetReconnectRemoved() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        setReconnectRemove(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        setReconnectRemove(colCfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSetReconnectInProgress() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        setReconnectInProgress(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        setReconnectInProgress(colCfg);
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void setReconnect(CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final String setName = "set-" + colCfg.getAtomicityMode();
+
+        IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+        final IgniteSet<String> srvSet = srv.set(setName, null);
+
+        assertTrue(clientSet.add("1"));
+
+        assertFalse(srvSet.add("1"));
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertTrue(srvSet.add("2"));
+            }
+        });
+
+        assertFalse(clientSet.add("2"));
+
+        assertTrue(clientSet.remove("2"));
+
+        assertFalse(srvSet.contains("2"));
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void setReconnectRemove(CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(client);
+
+        final String setName = "set-rm-" + colCfg.getAtomicityMode();
+
+        final IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+        final IgniteSet<String> srvSet = srv.set(setName, null);
+
+        assertTrue(clientSet.add("1"));
+
+        assertFalse(srvSet.add("1"));
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvSet.close();
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                clientSet.add("fail");
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+
+        IgniteSet<String> newClientSet = client.set(setName, colCfg);
+
+        IgniteSet<String> newSrvSet = srv.set(setName, null);
+
+        assertTrue(newClientSet.add("1"));
+
+        assertFalse(newSrvSet.add("1"));
+
+        newSrvSet.close();
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void setReconnectInProgress(final CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(client);
+
+        final String setName = "set-in-progress-" + colCfg.getAtomicityMode();
+
+        final IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+        final IgniteSet<String> srvSet = srv.set(setName, null);
+
+        assertTrue(clientSet.add("1"));
+
+        assertFalse(srvSet.add("1"));
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        if (colCfg.getAtomicityMode() == ATOMIC)
+            commSpi.blockMessage(GridNearAtomicUpdateResponse.class);
+        else
+            commSpi.blockMessage(GridNearTxPrepareResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    for (int i = 0; i < 100; i++)
+                        clientSet.add("2");
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMessage();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+        assertTrue(clientSet.add("3"));
+
+        assertFalse(srvSet.add("3"));
+
+        srvSet.close();
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void queueReconnect(CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final String setName = "queue-" + colCfg.getAtomicityMode();
+
+        IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+        final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+        assertTrue(clientQueue.offer("1"));
+
+        assertTrue(srvQueue.contains("1"));
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                assertTrue(srvQueue.add("2"));
+            }
+        });
+
+        assertTrue(clientQueue.contains("2"));
+
+        assertEquals("1", clientQueue.poll());
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void queueReconnectRemoved(CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final String setName = "queue-rmv" + colCfg.getAtomicityMode();
+
+        final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+        final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+        assertTrue(clientQueue.add("1"));
+
+        assertTrue(srvQueue.add("2"));
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvQueue.close();
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                clientQueue.add("fail");
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+
+        IgniteQueue<String> newClientQueue = client.queue(setName, 10, colCfg);
+
+        IgniteQueue<String> newSrvQueue = srv.queue(setName, 10, null);
+
+        assertTrue(newClientQueue.add("1"));
+
+        assertTrue(newSrvQueue.add("2"));
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void queueReconnectInProgress(final CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final String setName = "queue-rmv" + colCfg.getAtomicityMode();
+
+        final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg);
+
+        final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null);
+
+        assertTrue(clientQueue.offer("1"));
+
+        assertTrue(srvQueue.contains("1"));
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        if (colCfg.getAtomicityMode() == ATOMIC)
+            commSpi.blockMessage(GridNearAtomicUpdateResponse.class);
+        else
+            commSpi.blockMessage(GridNearTxPrepareResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    clientQueue.add("2");
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMessage();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", (Boolean)fut.get());
+
+        assertTrue(clientQueue.add("3"));
+
+        assertEquals("1", clientQueue.poll());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
new file mode 100644
index 0000000..e9667a1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAffinityCallInProgress() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        IgniteCache<Integer, Integer> cache = client.getOrCreateCache("test-cache");
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMessage(GridJobExecuteResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    client.compute().affinityCall("test-cache", 40, new IgniteCallable<Object>() {
+                        @Override public Integer call() throws Exception {
+                            return 42;
+                        }
+                    });
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMessage();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectBroadcastInProgress() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMessage(GridJobExecuteResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    client.compute().broadcast(new IgniteCallable<Object>() {
+                        @Override public Object call() throws Exception {
+                            return 42;
+                        }
+                    });
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMessage();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectApplyInProgress() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMessage(GridJobExecuteResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    client.compute().apply(new IgniteClosure<Integer, Integer>() {
+                        @Override public Integer apply(Integer o) {
+                            return o + 1;
+                        }
+                    }, Arrays.asList(1, 2, 3));
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMessage();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
new file mode 100644
index 0000000..2bfdc85b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+
+import javax.cache.event.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    private static volatile CountDownLatch latch;
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventListenerReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        EventListener lsnr = new EventListener();
+
+        UUID opId = client.events().remoteListen(lsnr, null, EventType.EVT_JOB_STARTED);
+
+        lsnr.latch = new CountDownLatch(1);
+
+        log.info("Created remote listener: " + opId);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        waitReconnectEvent(reconnectLatch);
+
+        client.compute().run(new DummyJob());
+
+        assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+        lsnr.latch = new CountDownLatch(1);
+
+        srv.compute().run(new DummyJob());
+
+        assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+        lsnr.latch = new CountDownLatch(1);
+
+        log.info("Stop listen, should not get events anymore.");
+
+        client.events().stopRemoteListen(opId);
+
+        assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMessageListenerReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final String topic = "testTopic";
+
+        MessageListener locLsnr  = new MessageListener();
+
+        UUID opId = client.message().remoteListen(topic, new RemoteMessageListener());
+
+        client.message().localListen(topic, locLsnr);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        waitReconnectEvent(reconnectLatch);
+
+        locLsnr.latch = new CountDownLatch(1);
+        latch = new CountDownLatch(2);
+
+        client.message().send(topic, "msg1");
+
+        assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+        assertTrue(latch.await(5000, MILLISECONDS));
+
+        locLsnr.latch = new CountDownLatch(1);
+        latch = new CountDownLatch(2);
+
+        srv.message().send(topic, "msg2");
+
+        assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+        assertTrue(latch.await(5000, MILLISECONDS));
+
+        log.info("Stop listen, should not get remote messages anymore.");
+
+        client.message().stopRemoteListen(opId);
+
+        srv.message().send(topic, "msg3");
+
+        locLsnr.latch = new CountDownLatch(1);
+        latch = new CountDownLatch(1);
+
+        assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+        assertFalse(latch.await(3000, MILLISECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheContinuousQueryReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setAutoUnsubscribe(true);
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = clientCache.query(qry);
+
+        for (int i = 0; i < 5; i++) {
+            log.info("Iteration: " + i);
+
+            continuousQueryReconnect(client, clientCache, lsnr);
+        }
+
+        log.info("Close cursor, should not get cache events anymore.");
+
+        cur.close();
+
+        lsnr.latch = new CountDownLatch(1);
+
+        clientCache.put(3, 3);
+
+        assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+    }
+
+    /**
+     * @param client Client.
+     * @param clientCache Client cache.
+     * @param lsnr Continuous query listener.
+     * @throws Exception If failed.
+     */
+    private void continuousQueryReconnect(Ignite client,
+        IgniteCache<Object, Object> clientCache,
+        CacheEventListener lsnr)
+        throws Exception
+    {
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        };
+
+        client.events().localListen(p, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        waitReconnectEvent(reconnectLatch);
+
+        client.events().stopLocalListen(p);
+
+        lsnr.latch = new CountDownLatch(1);
+
+        clientCache.put(1, 1);
+
+        assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+        lsnr.latch = new CountDownLatch(1);
+
+        srv.cache(null).put(2, 2);
+
+        assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+    }
+
+    /**
+     *
+     */
+    private static class EventListener implements P2<UUID, Event> {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Event evt) {
+            assertTrue(ignite.cluster().localNode().isClient());
+
+            ignite.log().info("Received event: " + evt);
+
+            if (latch != null)
+                latch.countDown();
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MessageListener implements P2<UUID, Object> {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Object msg) {
+            assertTrue(ignite.cluster().localNode().isClient());
+
+            ignite.log().info("Local listener received message: " + msg);
+
+            if (latch != null)
+                latch.countDown();
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoteMessageListener implements P2<UUID, Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Object msg) {
+            ignite.log().info("Remote listener received message: " + msg);
+
+            if (latch != null)
+                latch.countDown();
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            int cnt = 0;
+
+            for (CacheEntryEvent<?, ?> evt : evts) {
+                ignite.log().info("Received cache event: " + evt);
+
+                cnt++;
+            }
+
+            assertEquals(1, cnt);
+
+            if (latch != null)
+                latch.countDown();
+        }
+    }
+
+    /**
+     *
+     */
+    static class DummyJob implements IgniteRunnable {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            ignite.log().info("Job run.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
new file mode 100644
index 0000000..feeebe5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        final Ignite client = ignite(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        long topVer = 4;
+
+        IgniteCluster cluster = client.cluster();
+
+        cluster.nodeLocalMap().put("locMapKey", 10);
+
+        Map<Integer, Integer> nodeCnt = new HashMap<>();
+
+        nodeCnt.put(1, 1);
+        nodeCnt.put(2, 2);
+        nodeCnt.put(3, 3);
+        nodeCnt.put(4, 4);
+
+        for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+            Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+            assertNotNull("No nodes for topology: " + e.getKey(), nodes);
+            assertEquals((int)e.getValue(), nodes.size());
+        }
+
+        ClusterNode locNode = cluster.localNode();
+
+        assertEquals(topVer, locNode.order());
+
+        TestTcpDiscoverySpi srvSpi = spi(clientRouter(client));
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    IgniteFuture<?> fut = client.cluster().clientReconnectFuture();
+
+                    assertNotNull(fut);
+                    assertFalse(fut.isDone());
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        waitReconnectEvent(reconnectLatch);
+
+        topVer += 2; // Client failed and rejoined.
+
+        locNode = cluster.localNode();
+
+        assertEquals(topVer, locNode.order());
+        assertEquals(topVer, cluster.topologyVersion());
+
+        nodeCnt.put(5, 3);
+        nodeCnt.put(6, 4);
+
+        for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+            Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+            assertNotNull("No nodes for topology: " + e.getKey(), nodes);
+            assertEquals((int)e.getValue(), nodes.size());
+        }
+
+        assertEquals(10, cluster.nodeLocalMap().get("locMapKey"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
new file mode 100644
index 0000000..1b6523a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    private static final Integer THREADS = 1;
+
+    /** */
+    private volatile CyclicBarrier barrier;
+
+    /** */
+    protected static final long TEST_TIME = 90_000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(30_000);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIME * 60_000;
+    }
+
+    /**
+     * @param c Test closure.
+     * @throws Exception If failed.
+     */
+    protected final void reconnectFailover(final Callable<Void> c) throws Exception {
+        final Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final AtomicBoolean stop = new AtomicBoolean(false);
+
+        final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    int iter = 0;
+
+                    while (!stop.get()) {
+                        try {
+                            c.call();
+                        }
+                        catch (CacheException e) {
+                            checkAndWait(e);
+                        }
+                        catch (IgniteClientDisconnectedException e) {
+                            checkAndWait(e);
+                        }
+
+                        if (++iter % 100 == 0)
+                            log.info("Iteration: " + iter);
+
+                        if (barrier != null)
+                            barrier.await();
+                    }
+
+                    return null;
+                } catch (Throwable e) {
+                    log.error("Unexpected error in operation thread: " + e, e);
+
+                    stop.set(true);
+
+                    throw e;
+                }
+            }
+        }, THREADS, "test-operation-thread");
+
+        final AtomicReference<CountDownLatch> disconnected = new AtomicReference<>();
+        final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>();
+
+        IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    CountDownLatch latch = reconnected.get();
+
+                    assertNotNull(latch);
+                    assertEquals(1, latch.getCount());
+
+                    latch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    CountDownLatch latch = disconnected.get();
+
+                    assertNotNull(latch);
+                    assertEquals(1, latch.getCount());
+
+                    latch.countDown();
+                }
+
+                return true;
+            }
+        };
+
+        client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        try {
+            long stopTime = System.currentTimeMillis() + TEST_TIME;
+
+            String err = null;
+
+            while (System.currentTimeMillis() < stopTime && !fut.isDone()) {
+                U.sleep(500);
+
+                CountDownLatch disconnectLatch = new CountDownLatch(1);
+                CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+                disconnected.set(disconnectLatch);
+                reconnected.set(reconnectLatch);
+
+                UUID nodeId = client.cluster().localNode().id();
+
+                log.info("Fail client: " + nodeId);
+
+                srvSpi.failNode(nodeId, null);
+
+                if (!disconnectLatch.await(10_000, MILLISECONDS)) {
+                    err = "Failed to wait for disconnect";
+
+                    break;
+                }
+
+                if (!reconnectLatch.await(10_000, MILLISECONDS)) {
+                    err = "Failed to wait for reconnect";
+
+                    break;
+                }
+
+                barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
+                    @Override public void run() {
+                        barrier = null;
+                    }
+                });
+
+                try {
+                    barrier.await(10, SECONDS);
+                }
+                catch (TimeoutException e) {
+                    err = "Operations hang or fail with unexpected error.";
+
+                    break;
+                }
+            }
+
+            if (err != null) {
+                log.error(err);
+
+                U.dumpThreads(log);
+
+                CyclicBarrier barrier0 = barrier;
+
+                if (barrier0 != null)
+                    barrier0.reset();
+
+                stop.set(true);
+
+                fut.get();
+
+                fail(err);
+            }
+
+            stop.set(true);
+
+            fut.get();
+        }
+        finally {
+            client.events().stopLocalListen(p);
+
+            stop.set(true);
+        }
+    }
+}


Mime
View raw message