ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [21/35] incubator-ignite git commit: # ignite-901 client reconnect support
Date Thu, 16 Jul 2015 16:09:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
new file mode 100644
index 0000000..7cfc329
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectFailoverAbstractTest {
+    /** */
+    protected static final String ATOMIC_CACHE = "ATOMIC_CACHE";
+
+    /** */
+    protected static final String TX_CACHE = "TX_CACHE";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg1 = new CacheConfiguration();
+
+        ccfg1.setName(ATOMIC_CACHE);
+        ccfg1.setBackups(1);
+        ccfg1.setAtomicityMode(ATOMIC);
+
+        CacheConfiguration ccfg2 = new CacheConfiguration();
+
+        ccfg2.setName(TX_CACHE);
+        ccfg2.setBackups(1);
+        ccfg2.setAtomicityMode(TRANSACTIONAL);
+
+        cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAtomicCache() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        final IgniteCache<Integer, Integer> cache = client.cache(ATOMIC_CACHE);
+
+        assertNotNull(cache);
+
+        assertEquals(ATOMIC, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                TreeMap<Integer, Integer> map = new TreeMap<>();
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                for (int i = 0; i < 10; i++) {
+                    Integer key = rnd.nextInt(0, 100_000);
+
+                    cache.put(key, key);
+
+                    assertEquals(key, cache.get(key));
+
+                    map.put(key, key);
+                }
+
+                cache.putAll(map);
+
+                Map<Integer, Integer> res = cache.getAll(map.keySet());
+
+                assertEquals(map, res);
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectTxCache() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        final IgniteCache<Integer, Integer> cache = client.cache(TX_CACHE);
+
+        assertNotNull(cache);
+
+        assertEquals(TRANSACTIONAL, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode());
+
+        final IgniteTransactions txs = client.transactions();
+
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                TreeMap<Integer, Integer> map = new TreeMap<>();
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                for (int i = 0; i < 5; i++) {
+                    Integer key = rnd.nextInt(0, 100_000);
+
+                    cache.put(key, key);
+
+                    assertEquals(key, cache.get(key));
+
+                    map.put(key, key);
+                }
+
+                for (TransactionConcurrency txConcurrency : TransactionConcurrency.values()) {
+                    try (Transaction tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+                        for (Map.Entry<Integer, Integer> e : map.entrySet()) {
+                            cache.put(e.getKey(), e.getValue());
+
+                            assertNotNull(cache.get(e.getKey()));
+                        }
+
+                        tx.commit();
+                    }
+                }
+
+                cache.putAll(map);
+
+                Map<Integer, Integer> res = cache.getAll(map.keySet());
+
+                assertEquals(map, res);
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectComputeApi() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        final IgniteCompute comp = client.compute();
+
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                comp.call(new DummyClosure());
+
+                comp.broadcast(new DummyClosure());
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectStreamerApi() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                stream(ATOMIC_CACHE);
+
+                stream(TX_CACHE);
+
+                return null;
+            }
+
+            private void stream(String cacheName) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(cacheName)) {
+                    streamer.allowOverwrite(true);
+
+                    streamer.perNodeBufferSize(10);
+
+                    for (int i = 0; i < 100; i++)
+                        streamer.addData(rnd.nextInt(100_000), 0);
+                }
+            }
+        });
+    }
+
+    /**
+     *
+     */
+    public static class DummyClosure implements IgniteCallable<Object> {
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
new file mode 100644
index 0000000..31b4192
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.service.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.services.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        IgniteServices services = client.services();
+
+        services.deployClusterSingleton("testReconnect", new TestServiceImpl());
+
+        TestService srvc = services.serviceProxy("testReconnect", TestService.class, false);
+
+        assertNotNull(srvc);
+
+        long topVer = grid(0).cluster().topologyVersion();
+
+        assertEquals((Object)topVer, srvc.test());
+
+        Ignite srv = clientRouter(client);
+
+        reconnectClientNode(client, srv, null);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        DummyService.exeLatch("testReconnect2", latch);
+
+        services.deployClusterSingleton("testReconnect2", new DummyService());
+
+        assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+
+        assertEquals((Object)(topVer + 2), srvc.test());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServiceRemove() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        IgniteServices clnServices = client.services();
+
+        final IgniteServices srvServices = srv.services();
+
+        srvServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl());
+
+        final TestService srvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false);
+
+        assertNotNull(srvc);
+
+        assertNotNull(srvc.test());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvServices.cancel("testServiceRemove");
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return srvc.test();
+            }
+        }, IgniteException.class, null);
+
+        clnServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl());
+
+        TestService newSrvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false);
+
+        assertNotNull(newSrvc);
+        assertNotNull(newSrvc.test());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectInDeploying() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final IgniteServices services = client.services();
+
+        Ignite srv = clientRouter(client);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMessage(GridNearTxPrepareResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    services.deployClusterSingleton("testReconnectInDeploying", new TestServiceImpl());
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMessage();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectInProgress() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final IgniteServices services = client.services();
+
+        final Ignite srv = clientRouter(client);
+
+        services.deployClusterSingleton("testReconnectInProgress", new TestServiceImpl());
+
+        final TestService srvc = services.serviceProxy("testReconnectInProgress", TestService.class, false);
+
+        assertNotNull(srvc);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMessage(GridJobExecuteResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    srvc.test();
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMessage();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
+
+    /**
+     *
+     */
+    public static interface TestService {
+        /**
+         * @return Topology version.
+         */
+        public Long test();
+    }
+
+    /**
+     *
+     */
+    public static class TestServiceImpl implements Service, TestService {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void cancel(ServiceContext ctx) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void init(ServiceContext ctx) throws Exception {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(ServiceContext ctx) throws Exception {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Long test() {
+            assertFalse(ignite.cluster().localNode().isClient());
+
+            return ignite.cluster().topologyVersion();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
new file mode 100644
index 0000000..98c3d0f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectStopTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStopWhenDisconnected() throws Exception {
+        clientMode = true;
+
+        Ignite client = startGrid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        waitReconnectEvent(disconnectLatch);
+
+        IgniteFuture<?> reconnectFut = null;
+
+        try {
+            client.getOrCreateCache(new CacheConfiguration<>());
+
+            fail();
+        }
+        catch (IgniteClientDisconnectedException e) {
+            log.info("Expected operation exception: " + e);
+
+            reconnectFut = e.reconnectFuture();
+        }
+
+        assertNotNull(reconnectFut);
+
+        client.close();
+
+        try {
+            reconnectFut.get();
+
+            fail();
+        }
+        catch (IgniteException e) {
+            log.info("Expected reconnect exception: " + e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
new file mode 100644
index 0000000..a4cf77f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.datastreamer.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    public static final String CACHE_NAME = "streamer";
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>(CACHE_NAME)
+            .setAtomicityMode(ATOMIC)
+            .setCacheMode(PARTITIONED);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStreamerReconnect() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME);
+
+        IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME);
+
+        for (int i = 0; i < 50; i++)
+            streamer.addData(i, i);
+
+        streamer.flush();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return srvCache.localSize() == 50;
+            }
+        }, 2000L);
+
+        assertEquals(50, srvCache.localSize());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                try {
+                    client.dataStreamer(CACHE_NAME);
+
+                    fail();
+                }
+                catch (IgniteClientDisconnectedException e) {
+                    assertNotNull(e.reconnectFuture());
+                }
+            }
+        });
+
+        checkStreamerClosed(streamer);
+
+        streamer = client.dataStreamer(CACHE_NAME);
+
+        for (int i = 50; i < 100; i++)
+            streamer.addData(i, i);
+
+        streamer.flush();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return srvCache.localSize() == 100;
+            }
+        }, 2000L);
+
+        assertEquals(100, srvCache.localSize());
+
+        streamer.close();
+
+        streamer.future().get(2, TimeUnit.SECONDS);
+
+        srvCache.removeAll();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStreamerReconnectInProgress() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME);
+
+        final IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMessage(DataStreamerResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    for (int i = 0; i < 50; i++)
+                        streamer.addData(i, i);
+
+                    streamer.flush();
+                }
+                catch (CacheException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+                finally {
+                    streamer.close();
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMessage();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+        checkStreamerClosed(streamer);
+
+        IgniteDataStreamer<Integer, Integer> streamer2 = client.dataStreamer(CACHE_NAME);
+
+        for (int i = 0; i < 50; i++)
+            streamer2.addData(i, i);
+
+        streamer2.close();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return srvCache.localSize() == 50;
+            }
+        }, 2000L);
+
+        assertEquals(50, srvCache.localSize());
+    }
+
+    /**
+     * @param streamer Streamer.
+     */
+    private void checkStreamerClosed(IgniteDataStreamer<Integer, Integer> streamer) {
+        try {
+            streamer.addData(100, 100);
+
+            fail();
+        }
+        catch (CacheException e) {
+            checkAndWait(e);
+        }
+
+        try {
+            streamer.flush();
+
+            fail();
+        }
+        catch (CacheException e) {
+            checkAndWait(e);
+        }
+
+        try {
+            streamer.future().get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            checkAndWait(e);
+        }
+
+        streamer.tryFlush();
+
+        streamer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 27c2a61..a392245 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -62,6 +62,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientReconnectDisabled(true);
 
         if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
             cfg.setClientMode(true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
index 9780080..62f5d41 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.deployment;
 
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.resource.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.jdk.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.deployment.*;
@@ -95,5 +96,11 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public boolean unregister(String rsrcName) { return false; }
+
+        /** {@inheritDoc} */
+        @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { /* No-op. */ }
+
+        /** {@inheritDoc} */
+        @Override public void onClientReconnected(boolean clusterRestarted) { /* No-op. */ }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
index 074f6ff..9c30f23 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
@@ -263,7 +263,7 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst
 
         e.printStackTrace(pw);
 
-        assertTrue(sw.toString().contains("grid is stopping"));
+        assertTrue(sw.toString().contains("node is stopping"));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
index 071341e..8703d32 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -27,7 +26,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
-import javax.cache.Cache;
+import javax.cache.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -89,7 +88,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
                             @Override public void apply(IgniteFuture<?> f) {
                                 try {
                                     f.get();
-                                } catch (IgniteException ignore) {
+                                }
+                                catch (CacheException ignore) {
                                     // This may be debugged.
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
index af3ea9d..30bf5dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.transactions.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
 import org.apache.ignite.testframework.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
index d78add6..53404cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
@@ -75,7 +75,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf
             jcache.get("1");
             jcache.put("1", "11");
 
-            GridCacheAdapter<Object, Object> utilityCache = ignite.context().cache().utilityCache();
+            IgniteInternalCache<Object, Object> utilityCache = ignite.context().cache().utilityCache();
 
             utilityCache.getAndPutIfAbsent("2", "2");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
index 19e40bf..7a2e8b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java
@@ -220,7 +220,8 @@ public class GridCacheReplicatedInvalidateSelfTest extends GridCommonAbstractTes
             Object msg0 = ((GridIoMessage)msg).message();
 
             if (!(msg0 instanceof GridClockDeltaSnapshotMessage)) {
-                info("Sending message [locNodeId=" + getLocalNodeId() + ", destNodeId= " + destNode.id()
+                info("Sending message [locNodeId=" + ignite.cluster().localNode().id() +
+                    ", destNodeId= " + destNode.id()
                     + ", msg=" + msg + ']');
 
                 synchronized (msgCntMap) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index e9d7a45..9a883b3 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -55,8 +55,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
                 new GridCacheDeploymentManager<K, V>(),
                 new GridCachePartitionExchangeManager<K, V>(),
                 new GridCacheIoManager(),
-                null,
-                new CacheNoopJtaManager()
+                new CacheNoopJtaManager(),
+                null
             ),
             defaultCacheConfiguration(),
             CacheType.USER,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index ec6a526..63db0c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -111,6 +111,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /** */
     private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
 
+    /** */
+    private boolean reconnectDisabled;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -159,6 +162,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         disco.setJoinTimeout(joinTimeout);
         disco.setNetworkTimeout(netTimeout);
 
+        disco.setClientReconnectDisabled(reconnectDisabled);
+
         disco.afterWrite(afterWrite);
 
         cfg.setDiscoverySpi(disco);
@@ -524,7 +529,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
 
-        spi.pauseAll();
+        spi.pauseAll(false);
 
         try {
             spi.brakeConnection();
@@ -568,7 +573,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
 
-        spi.pauseAll();
+        spi.pauseAll(false);
 
         try {
             spi.brakeConnection();
@@ -606,7 +611,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         attachListeners(2, 2);
 
-        ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
+        ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(true);
 
         stopGrid("server-2");
 
@@ -633,6 +638,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     public void testClientSegmentation() throws Exception {
         clientsPerSrv = 1;
 
+        reconnectDisabled = true;
+
         startServerNodes(3);
         startClientNodes(3);
 
@@ -656,6 +663,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi();
 
         try {
+            log.info("Fail server: " + 2);
+
             failServer(2);
 
             await(srvFailedLatch);
@@ -886,8 +895,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         try {
             startClientNodes(1);
 
-            assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0")
-                .cluster().localNode()).clientRouterNodeId());
+            assertEquals(G.ignite("server-0").cluster().localNode().id(),
+                ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId());
 
             checkNodes(2, 1);
 
@@ -1206,6 +1215,528 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAfterFail() throws Exception {
+        reconnectAfterFail(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAfterFailTopologyChanged() throws Exception {
+        reconnectAfterFail(true);
+    }
+
+    /**
+     * @param changeTop If {@code true} topology is changed after client disconnects.
+     * @throws Exception If failed.
+     */
+    private void reconnectAfterFail(final boolean changeTop) throws Exception {
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+        Ignite client = G.ignite("client-0");
+
+        final ClusterNode clientNode = client.cluster().localNode();
+
+        final UUID clientId = clientNode.id();
+
+        final TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+        assertEquals(2L, clientNode.order());
+
+        final CountDownLatch failLatch = new CountDownLatch(1);
+
+        final CountDownLatch joinLatch = new CountDownLatch(1);
+
+        srv.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                info("Server event: " + evt);
+
+                DiscoveryEvent evt0 = (DiscoveryEvent)evt;
+
+                if (evt0.eventNode().id().equals(clientId) && (evt.type() == EVT_NODE_FAILED)) {
+                    if (evt.type() == EVT_NODE_FAILED)
+                        failLatch.countDown();
+                }
+                else if (evt.type() == EVT_NODE_JOINED) {
+                    TcpDiscoveryNode node = (TcpDiscoveryNode)evt0.eventNode();
+
+                    if ("client-0".equals(node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME))) {
+                        assertEquals(changeTop ? 5L : 4L, node.order());
+
+                        joinLatch.countDown();
+                    }
+                }
+
+                return true;
+            }
+        }, EVT_NODE_FAILED, EVT_NODE_JOINED);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                info("Client event: " + evt);
+
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    assertEquals(1, reconnectLatch.getCount());
+
+                    disconnectLatch.countDown();
+
+                    if (changeTop)
+                        clientSpi.pauseAll(false);
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    assertEquals(0, disconnectLatch.getCount());
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        if (changeTop) {
+            Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+            srvNodeIds.add(g.cluster().localNode().id());
+
+            clientSpi.resumeAll();
+        }
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        assertTrue(failLatch.await(5000, MILLISECONDS));
+        assertTrue(joinLatch.await(5000, MILLISECONDS));
+
+        long topVer = changeTop ? 5L : 4L;
+
+        assertEquals(topVer, client.cluster().localNode().order());
+
+        assertEquals(topVer, client.cluster().topologyVersion());
+
+        Collection<ClusterNode> clientTop = client.cluster().topology(topVer);
+
+        assertEquals(changeTop ? 3 : 2, clientTop.size());
+
+        clientNodeIds.remove(clientId);
+
+        clientNodeIds.add(client.cluster().localNode().id());
+
+        checkNodes(changeTop ? 2 : 1, 1);
+
+        Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+        srvNodeIds.add(g.cluster().localNode().id());
+
+        checkNodes(changeTop ? 3 : 2, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAfterFailConcurrentJoin() throws Exception {
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+        Ignite client = G.ignite("client-0");
+
+        final ClusterNode clientNode = client.cluster().localNode();
+
+        assertEquals(2L, clientNode.order());
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                info("Client event: " + evt);
+
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    assertEquals(1, reconnectLatch.getCount());
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    assertEquals(0, disconnectLatch.getCount());
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        final int CLIENTS = 20;
+
+        clientsPerSrv = CLIENTS + 1;
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                latch.await();
+
+                Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+                clientNodeIds.add(g.cluster().localNode().id());
+
+                return null;
+            }
+        }, CLIENTS, "start-client");
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        latch.countDown();
+
+        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+        clientNodeIds.add(client.cluster().localNode().id());
+
+        fut.get();
+
+        checkNodes(1, CLIENTS + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientFailReconnectDisabled() throws Exception {
+        reconnectDisabled = true;
+
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+        Ignite client = G.ignite("client-0");
+
+        final CountDownLatch segmentedLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_NODE_SEGMENTED)
+                    segmentedLatch.countDown();
+
+                return false;
+            }
+        }, EVT_NODE_SEGMENTED);
+
+        srvFailedLatch = new CountDownLatch(1);
+
+        attachListeners(1, 0);
+
+        log.info("Fail client node.");
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(srvFailedLatch.await(5000, MILLISECONDS));
+        assertTrue(segmentedLatch.await(5000, MILLISECONDS));
+
+        checkNodes(1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
+        reconnectSegmentedAfterJoinTimeout(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
+        reconnectSegmentedAfterJoinTimeout(false);
+    }
+
+    /**
+     * @param failSrv If {@code true} fails server, otherwise server does not send join message.
+     * @throws Exception If failed.
+     */
+    private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
+        netTimeout = 4000;
+        joinTimeout = 5000;
+
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        final Ignite srv = G.ignite("server-0");
+        Ignite client = G.ignite("client-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+        TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch segmentedLatch = new CountDownLatch(1);
+        final AtomicBoolean err = new AtomicBoolean(false);
+
+        if (!failSrv) {
+            srvFailedLatch = new CountDownLatch(1);
+
+            attachListeners(1, 0);
+        }
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected event.");
+
+                    assertEquals(1, segmentedLatch.getCount());
+                    assertEquals(1, disconnectLatch.getCount());
+                    assertFalse(err.get());
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_NODE_SEGMENTED) {
+                    log.info("Segmented event.");
+
+                    assertEquals(1, segmentedLatch.getCount());
+                    assertEquals(0, disconnectLatch.getCount());
+                    assertFalse(err.get());
+
+                    segmentedLatch.countDown();
+                }
+                else {
+                    log.error("Unexpected event: " + evt);
+
+                    err.set(true);
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+        if (failSrv) {
+            log.info("Fail server.");
+
+            failServer(0);
+        }
+        else {
+            log.info("Fail client connection.");
+
+            srvSpi.failClientReconnect.set(1_000_000);
+            srvSpi.skipNodeAdded = true;
+
+            clientSpi.brakeConnection();
+        }
+
+        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+        assertTrue(segmentedLatch.await(10_000, MILLISECONDS));
+
+        waitSegmented(client);
+
+        assertFalse(err.get());
+
+        if (!failSrv) {
+            await(srvFailedLatch);
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return srv.cluster().nodes().size() == 1;
+                }
+            }, 10_000);
+
+            checkNodes(1, 0);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectClusterRestart() throws Exception {
+        netTimeout = 3000;
+        joinTimeout = 60_000;
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+        final AtomicBoolean err = new AtomicBoolean(false);
+
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+        Ignite client = G.ignite("client-0");
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected event.");
+
+                    assertEquals(1, reconnectLatch.getCount());
+                    assertEquals(1, disconnectLatch.getCount());
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    log.info("Reconnected event.");
+
+                    assertEquals(1, reconnectLatch.getCount());
+                    assertEquals(0, disconnectLatch.getCount());
+                    assertFalse(err.get());
+
+                    reconnectLatch.countDown();
+                }
+                else {
+                    log.error("Unexpected event: " + evt);
+
+                    err.set(true);
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+        log.info("Stop server.");
+
+        srv.close();
+
+        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+        srvNodeIds.clear();
+        srvIdx.set(0);
+
+        Thread.sleep(3000);
+
+        log.info("Restart server.");
+
+        startServerNodes(1);
+
+        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+        clientNodeIds.clear();
+        clientNodeIds.add(client.cluster().localNode().id());
+
+        checkNodes(1, 1);
+
+        assertFalse(err.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDisconnectAfterNetworkTimeout() throws Exception {
+        netTimeout = 5000;
+        joinTimeout = 60_000;
+        maxMissedClientHbs = 2;
+
+        startServerNodes(1);
+
+        startClientNodes(1);
+
+        final Ignite srv = G.ignite("server-0");
+        Ignite client = G.ignite("client-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+        TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+        final AtomicBoolean err = new AtomicBoolean(false);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override
+            public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    log.info("Disconnected event.");
+
+                    assertEquals(1, reconnectLatch.getCount());
+                    assertEquals(1, disconnectLatch.getCount());
+                    assertFalse(err.get());
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    log.info("Reconnected event.");
+
+                    assertEquals(1, reconnectLatch.getCount());
+                    assertEquals(0, disconnectLatch.getCount());
+                    assertFalse(err.get());
+
+                    reconnectLatch.countDown();
+                }
+                else {
+                    log.error("Unexpected event: " + evt);
+
+                    err.set(true);
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+        log.info("Fail client connection1.");
+
+        srvSpi.failClientReconnect.set(1_000_000);
+        srvSpi.skipNodeAdded = true;
+
+        clientSpi.brakeConnection();
+
+        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+        log.info("Fail client connection2.");
+
+        srvSpi.failClientReconnect.set(0);
+        srvSpi.skipNodeAdded = false;
+
+        clientSpi.brakeConnection();
+
+        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+        clientNodeIds.clear();
+
+        clientNodeIds.add(client.cluster().localNode().id());
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override
+            public boolean apply() {
+                return srv.cluster().nodes().size() == 2;
+            }
+        }, 10_000);
+
+        checkNodes(1, 1);
+
+        assertFalse(err.get());
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @throws Exception If failed.
+     */
+    private void waitSegmented(final Ignite ignite) throws Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return IgniteState.STOPPED_ON_SEGMENTATION == Ignition.state(ignite.name());
+            }
+        }, 5000);
+
+        assertEquals(IgniteState.STOPPED_ON_SEGMENTATION, Ignition.state(ignite.name()));
+    }
+
+    /**
      * @param clientIdx Client index.
      * @param srvIdx Server index.
      * @throws Exception In case of error.
@@ -1401,7 +1932,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     private void checkRemoteNodes(Ignite ignite, int expCnt) {
         Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
 
-        assertEquals(expCnt, nodes.size());
+        assertEquals("Unexpected state for node: " + ignite.name(), expCnt, nodes.size());
 
         for (ClusterNode node : nodes) {
             UUID id = node.id();
@@ -1420,7 +1951,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @throws InterruptedException If interrupted.
      */
     private void await(CountDownLatch latch) throws InterruptedException {
-        assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+        assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS));
     }
 
     /**
@@ -1471,6 +2002,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         /** */
         private volatile String delayJoinAckFor;
 
+        /** */
+        private volatile boolean skipNodeAdded;
+
         /**
          * @param lock Lock.
          */
@@ -1543,6 +2077,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
             boolean fail = false;
 
+            if (skipNodeAdded &&
+                (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage)) {
+                log.info("Skip message: " + msg);
+
+                return;
+            }
+
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 fail = failNodeAdded.getAndDecrement() > 0;
             else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
@@ -1577,12 +2118,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         }
 
         /**
-         *
+         * @param suspend If {@code true} suspends worker threads.
          */
-        public void pauseAll() {
+        public void pauseAll(boolean suspend) {
             pauseResumeOperation(true, openSockLock, writeLock);
 
-            impl.workerThread().suspend();
+            if (suspend)
+                impl.workerThread().suspend();
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 159c451..dacbf55 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -317,4 +317,9 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
     @Override public ClusterMetrics metrics() throws IgniteException {
         throw new UnsupportedOperationException("Operation is not supported yet.");
     }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteFuture<?> clientReconnectFuture() {
+        throw new UnsupportedOperationException("Operation is not supported yet.");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
new file mode 100644
index 0000000..66c9835
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.*;
+import org.apache.ignite.internal.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception In case of error.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Ignite Client Reconnect Test Suite");
+
+        suite.addTestSuite(IgniteClientReconnectStopTest.class);
+        suite.addTestSuite(IgniteClientReconnectApiExceptionTest.class);
+        suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
+        suite.addTestSuite(IgniteClientReconnectCacheTest.class);
+        suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class);
+        suite.addTestSuite(IgniteClientReconnectComputeTest.class);
+        suite.addTestSuite(IgniteClientReconnectAtomicsTest.class);
+        suite.addTestSuite(IgniteClientReconnectCollectionsTest.class);
+        suite.addTestSuite(IgniteClientReconnectServicesTest.class);
+        suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
+        suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 06c0961..c76dbe7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1439,6 +1439,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fut.get();
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+        rdcQryExec.onDisconnected(reconnectFut);
+    }
+
     /**
      * Wrapper to store connection and flag is schema set or not.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index af29647..2b2996d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -26,6 +26,7 @@ import org.h2.table.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
+import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -40,7 +41,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
 
     /** All rows number. */
-    private final AtomicInteger expectedRowsCnt = new AtomicInteger(0);
+    private final AtomicInteger expRowsCnt = new AtomicInteger(0);
 
     /** Remaining rows per source node ID. */
     private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>();
@@ -75,8 +76,8 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public long getRowCount(Session session) {
-        return expectedRowsCnt.get();
+    @Override public long getRowCount(Session ses) {
+        return expRowsCnt.get();
     }
 
     /** {@inheritDoc} */
@@ -93,6 +94,23 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
+     * @param e Error.
+     */
+    public void fail(final CacheException e) {
+        for (UUID nodeId0 : remainingRows.keySet()) {
+            addPage0(new GridResultPage(null, nodeId0, null) {
+                @Override public boolean isFail() {
+                    return true;
+                }
+
+                @Override public void fetchNextPage() {
+                    throw e;
+                }
+            });
+        }
+    }
+
+    /**
      * @param nodeId Node ID.
      */
     public void fail(UUID nodeId) {
@@ -120,7 +138,7 @@ public abstract class GridMergeIndex extends BaseIndex {
             assert !cnt.initialized : "Counter is already initialized.";
 
             cnt.addAndGet(allRows);
-            expectedRowsCnt.addAndGet(allRows);
+            expRowsCnt.addAndGet(allRows);
 
             // We need this separate flag to handle case when the first source contains only one page
             // and it will signal that all remaining counters are zero and fetch is finished.
@@ -162,7 +180,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+    @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
         if (fetched == null)
             throw new IgniteException("Fetched result set was too large.");
 
@@ -176,7 +194,7 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @return {@code true} If we have fetched all the remote rows.
      */
     public boolean fetchedAll() {
-        return fetchedCnt == expectedRowsCnt.get();
+        return fetchedCnt == expRowsCnt.get();
     }
 
     /**
@@ -200,32 +218,32 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public void close(Session session) {
+    @Override public void close(Session ses) {
         // No-op.
     }
 
     /** {@inheritDoc} */
-    @Override public void add(Session session, Row row) {
+    @Override public void add(Session ses, Row row) {
         throw DbException.getUnsupportedException("add");
     }
 
     /** {@inheritDoc} */
-    @Override public void remove(Session session, Row row) {
+    @Override public void remove(Session ses, Row row) {
         throw DbException.getUnsupportedException("remove row");
     }
 
     /** {@inheritDoc} */
-    @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
+    @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) {
         return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
     }
 
     /** {@inheritDoc} */
-    @Override public void remove(Session session) {
+    @Override public void remove(Session ses) {
         throw DbException.getUnsupportedException("remove index");
     }
 
     /** {@inheritDoc} */
-    @Override public void truncate(Session session) {
+    @Override public void truncate(Session ses) {
         throw DbException.getUnsupportedException("truncate");
     }
 
@@ -235,7 +253,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor findFirstOrLast(Session session, boolean first) {
+    @Override public Cursor findFirstOrLast(Session ses, boolean first) {
         throw DbException.getUnsupportedException("findFirstOrLast");
     }
 
@@ -299,6 +317,7 @@ public abstract class GridMergeIndex extends BaseIndex {
         private Iterator<Row> stream;
 
         /**
+         * @param stream Iterator.
          */
         public FetchingCursor(Iterator<Row> stream) {
             super(new FetchedIterator());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 32d1c95..cde3288 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.h2.command.*;
@@ -47,6 +48,7 @@ import org.h2.table.*;
 import org.h2.tools.*;
 import org.h2.util.*;
 import org.h2.value.*;
+import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import javax.cache.*;
@@ -234,10 +236,15 @@ public class GridReduceQueryExecutor {
                     Object errState = r.state.get();
 
                     if (errState != null) {
+                        CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;
+
+                        if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException)
+                            throw err0;
+
                         CacheException e = new CacheException("Failed to fetch data from node: " + node.id());
 
-                        if (errState instanceof CacheException)
-                            e.addSuppressed((Throwable)errState);
+                        if (err0 != null)
+                            e.addSuppressed(err0);
 
                         throw e;
                     }
@@ -301,6 +308,7 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param cctx Cache context.
      * @return {@code true} If cache context
      */
     private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
@@ -481,6 +489,12 @@ public class GridReduceQueryExecutor {
             runs.put(qryReqId, r);
 
             try {
+                if (ctx.clientDisconnected()) {
+                    throw new CacheException("Query was cancelled, client node disconnected.",
+                        new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
+                        "Client node disconnected."));
+                }
+
                 Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
 
                 if (qry.explain()) {
@@ -506,8 +520,14 @@ public class GridReduceQueryExecutor {
                     Object state = r.state.get();
 
                     if (state != null) {
-                        if (state instanceof CacheException)
-                            throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+                        if (state instanceof CacheException) {
+                            CacheException err = (CacheException)state;
+
+                            if (err.getCause() instanceof IgniteClientDisconnectedException)
+                                throw err;
+
+                            throw new CacheException("Failed to run map query remotely.", err);
+                        }
 
                         if (state instanceof AffinityTopologyVersion) {
                             retry = true;
@@ -550,7 +570,20 @@ public class GridReduceQueryExecutor {
             catch (IgniteCheckedException | RuntimeException e) {
                 U.closeQuiet(r.conn);
 
-                throw new CacheException("Failed to run reduce query locally.", e);
+                if (e instanceof CacheException)
+                    throw (CacheException)e;
+
+                Throwable cause = e;
+
+                if (e instanceof IgniteCheckedException) {
+                    Throwable disconnectedErr =
+                        ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class);
+
+                    if (disconnectedErr != null)
+                        cause = disconnectedErr;
+                }
+
+                throw new CacheException("Failed to run reduce query locally.", cause);
             }
             finally {
                 if (!runs.remove(qryReqId, r))
@@ -1082,6 +1115,17 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param reconnectFut Reconnect future.
+     */
+    public void onDisconnected(IgniteFuture<?> reconnectFut) {
+        CacheException err = new CacheException("Query was cancelled, client node disconnected.",
+            new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));
+
+        for (Map.Entry<Long, QueryRun> e : runs.entrySet())
+            e.getValue().disconnected(err);
+    }
+
+    /**
      *
      */
     private static class QueryRun {
@@ -1104,7 +1148,7 @@ public class GridReduceQueryExecutor {
          * @param o Fail state object.
          * @param nodeId Node ID.
          */
-        void state(Object o, UUID nodeId) {
+        void state(Object o, @Nullable UUID nodeId) {
             assert o != null;
             assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
 
@@ -1117,6 +1161,20 @@ public class GridReduceQueryExecutor {
             for (GridMergeTable tbl : tbls) // Fail all merge indexes.
                 tbl.getScanIndex(null).fail(nodeId);
         }
+
+        /**
+         * @param e Error.
+         */
+        void disconnected(CacheException e) {
+            if (!state.compareAndSet(null, e))
+                return;
+
+            while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
+                latch.countDown();
+
+            for (GridMergeTable tbl : tbls) // Fail all merge indexes.
+                tbl.getScanIndex(null).fail(e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
new file mode 100644
index 0000000..23320ae
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCacheQueriesFailoverTest extends IgniteClientReconnectFailoverAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setIndexedTypes(Integer.class, Person.class);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        final IgniteCache<Integer, Person> cache = grid(serverCount()).cache(null);
+
+        assertNotNull(cache);
+
+        for (int i = 0; i <= 10_000; i++)
+            cache.put(i, new Person(i, "name-" + i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectCacheQueries() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        final IgniteCache<Integer, Person> cache = client.cache(null);
+
+        assertNotNull(cache);
+
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                SqlQuery<Integer, Person> sqlQry = new SqlQuery<>(Person.class, "where id > 1");
+
+                try {
+                    assertEquals(9999, cache.query(sqlQry).getAll().size());
+                }
+                catch (CacheException e) {
+                    if (e.getCause() instanceof IgniteClientDisconnectedException)
+                        throw e;
+                    else
+                        log.info("Ignore error: " + e);
+                }
+
+                try {
+                    SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select avg(p.id) from Person p");
+
+                    List<List<?>> res = cache.query(fieldsQry).getAll();
+
+                    assertEquals(1, res.size());
+
+                    Double avg = (Double)res.get(0).get(0);
+
+                    assertEquals(5_000, avg.intValue());
+                }
+                catch (CacheException e) {
+                    if (e.getCause() instanceof IgniteClientDisconnectedException)
+                        throw e;
+                    else
+                        log.info("Ignore error: " + e);
+                }
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectScanQuery() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        final IgniteCache<Integer, Person> cache = client.cache(null);
+
+        assertNotNull(cache);
+
+        final Affinity<Integer> aff = client.affinity(null);
+
+        final Map<Integer, Integer> partMap = new HashMap<>();
+
+        for (int i = 0; i < aff.partitions(); i++)
+            partMap.put(i, 0);
+
+        for (int i = 0; i <= 10_000; i++) {
+            Integer part = aff.partition(i);
+
+            Integer size = partMap.get(part);
+
+            partMap.put(part, size + 1);
+        }
+
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ScanQuery<Integer, Person> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Person>() {
+                    @Override public boolean apply(Integer key, Person val) {
+                        return val.getId() % 2 == 1;
+                    }
+                });
+
+                assertEquals(5000, cache.query(qry).getAll().size());
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                Integer part = rnd.nextInt(0, aff.partitions());
+
+                qry = new ScanQuery<>(part);
+
+                assertEquals((int)partMap.get(part), cache.query(qry).getAll().size());
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     *
+     */
+    public static class Person {
+        /** */
+        @QuerySqlField
+        public int id;
+
+        /** */
+        @QuerySqlField
+        public String name;
+
+        /**
+         * @param id Id.
+         * @param name Name.
+         */
+        public Person(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * @return Id.
+         */
+        public int getId() {
+            return id;
+        }
+
+        /**
+         * @param id Set id.
+         */
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * @param name Name.
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+}


Mime
View raw message