ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject incubator-ignite git commit: IGNITE-901 Added tests.
Date Mon, 06 Jul 2015 12:39:02 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 401efd7c3 -> 92908b91c


IGNITE-901 Added tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/92908b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/92908b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/92908b91

Branch: refs/heads/ignite-901
Commit: 92908b91ce067415af9e6e682b05fc602cc87da9
Parents: 401efd7
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Mon Jul 6 15:38:55 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Mon Jul 6 15:38:55 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientReconnectAbstractTest.java      |  48 ++-
 .../IgniteClientReconnectApiBlockTest.java      |  23 --
 .../IgniteClientReconnectAtomicsTest.java       | 138 +++++--
 .../IgniteClientReconnectCollectionsTest.java   |  63 +++-
 .../IgniteClientReconnectComputeTest.java       |  87 +++--
 .../IgniteClientReconnectQueriesTest.java       |  28 --
 .../IgniteClientReconnectServicesTest.java      | 171 +++++++--
 .../IgniteClientReconnectStreamerTest.java      | 137 ++++++-
 .../IgniteClientReconnectTestSuite.java         |   1 -
 .../IgniteClientReconnectQueriesTest.java       | 356 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   4 +
 11 files changed, 885 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 0f8aadd..99dddf9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.spi.discovery.tcp.messages.*;
 import org.apache.ignite.testframework.junits.common.*;
-import org.eclipse.jetty.util.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -212,6 +211,32 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     }
 
     /**
+     * @param e Client disconnected exception.
+     */
+    protected void checkAndWait(Exception e) {
+        log.info("Expected exception: " + e);
+
+        if (e instanceof IgniteClientDisconnectedException){
+            ((IgniteClientDisconnectedException)e).reconnectFuture().get();
+
+            return;
+        }
+
+        IgniteClientDisconnectedException discException = X.cause(e, IgniteClientDisconnectedException.class);
+
+        if (discException != null)
+            discException.reconnectFuture().get();
+
+        IgniteClientDisconnectedCheckedException discCheckedException =
+            X.cause(e, IgniteClientDisconnectedCheckedException.class);
+
+        if (discCheckedException != null)
+            discCheckedException.reconnectFuture().get();
+        else
+            fail("Unexpected exception: " + e);
+    }
+
+    /**
      *
      */
     protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
@@ -242,9 +267,11 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         /** */
         volatile Class msgClass;
 
+        /** */
         AtomicBoolean collectStart = new AtomicBoolean(false);
 
-        ConcurrentHashSet<String> classes = new ConcurrentHashSet<>();
+        /** */
+        ConcurrentHashMap<String, ClusterNode> classes = new ConcurrentHashMap<>();
 
         /** */
         @LoggerResource
@@ -255,7 +282,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
             Class msgClass0 = msgClass;
 
             if (collectStart.get() && msg instanceof GridIoMessage)
-                classes.add(((GridIoMessage)msg).message().getClass().getName());
+                classes.put(((GridIoMessage)msg).message().getClass().getName(), node);
 
             if (msgClass0 != null && msg instanceof GridIoMessage
                 && ((GridIoMessage)msg).message().getClass().equals(msgClass)) {
@@ -280,5 +307,20 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         public void unblockMsg() {
             msgClass = null;
         }
+
+        /**
+         * Start collect messages.
+         */
+        public void start() {
+            collectStart.set(true);
+        }
+
+        /**
+         * Print collected messages.
+         */
+        public void print() {
+            for (String s : classes.keySet())
+                log.error(s);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
index 9aed13b..e056641 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java
@@ -510,29 +510,6 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst
     }
 
     /**
-     * @param e Client disconnected exception.
-     */
-    private void checkAndWait(Exception e) {
-        log.info("Expected exception: " + e);
-
-        if (e instanceof IgniteClientDisconnectedException){
-            ((IgniteClientDisconnectedException)e).reconnectFuture().get();
-
-            return;
-        }
-
-        if (e.getCause() instanceof IgniteClientDisconnectedException) {
-            IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
-
-            e0.reconnectFuture().get();
-
-            return;
-        }
-
-        fail("Unexpected exception: " + e);
-    }
-
-    /**
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 884d5f2..91311ef 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.testframework.*;
 import java.util.concurrent.*;
 
 /**
- * TODO IGNITE-901 create another after removed.
+ *
  */
 public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest {
     /** {@inheritDoc} */
@@ -66,6 +66,8 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
         assertEquals(1003L, srvAtomicSeq.incrementAndGet());
 
         assertEquals(3L, clientAtomicSeq.incrementAndGet());
+
+        clientAtomicSeq.close();
     }
 
     /**
@@ -106,6 +108,14 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
                 return null;
             }
         }, IllegalStateException.class, null);
+
+        IgniteAtomicSequence newClientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
+
+        assertEquals(0, newClientAtomicSeq.get());
+
+        assertEquals(1, newClientAtomicSeq.incrementAndGet());
+
+        newClientAtomicSeq.close();
     }
 
     /**
@@ -132,10 +142,18 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                for (int i = 0; i < 3000; i++)
-                    clientAtomicSeq.incrementAndGet();
-
-                return null;
+                for (int i = 0; i < 3000; i++) {
+                    try {
+                        clientAtomicSeq.incrementAndGet();
+                    }
+                    catch (Exception e) {
+                        checkAndWait(e);
+
+                        return true;
+                    }
+                }
+
+                return false;
             }
         });
 
@@ -150,13 +168,9 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         commSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull(fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
 
         // Check that after reconnect working.
         assert clientAtomicSeq.incrementAndGet() >= 0;
@@ -242,6 +256,21 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
                 return null;
             }
         }, IllegalStateException.class, null);
+
+        IgniteAtomicReference<String> newClientAtomicRef =
+            client.atomicReference("atomicRefRemoved", "1st value", true);
+
+        IgniteAtomicReference<String> newSrvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false);
+
+        assertEquals("1st value", newClientAtomicRef.get());
+        assertTrue(newClientAtomicRef.compareAndSet("1st value", "2st value"));
+        assertEquals("2st value", newClientAtomicRef.get());
+
+        assertEquals("2st value", newSrvAtomicRef.get());
+        assertTrue(newSrvAtomicRef.compareAndSet("2st value", "3st value"));
+        assertEquals("3st value", newSrvAtomicRef.get());
+
+        newClientAtomicRef.close();
     }
 
     /**
@@ -273,7 +302,16 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return clientAtomicRef.compareAndSet("3st value", "4st value");
+                try {
+                    clientAtomicRef.compareAndSet("3st value", "4st value");
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
             }
         });
 
@@ -288,13 +326,9 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         servCommSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull(fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
 
         // Check that after reconnect working.
         assertEquals("3st value", clientAtomicRef.get());
@@ -384,6 +418,20 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
                 return null;
             }
         }, IllegalStateException.class, null);
+
+        IgniteAtomicStamped newClientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
+
+        assertEquals(true, newClientAtomicStamped.compareAndSet(0, 1, 0, 1));
+        assertEquals(1, newClientAtomicStamped.value());
+        assertEquals(1, newClientAtomicStamped.stamp());
+
+        IgniteAtomicStamped newSrvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false);
+
+        assertEquals(true, newSrvAtomicStamped.compareAndSet(1, 2, 1, 2));
+        assertEquals(2, newSrvAtomicStamped.value());
+        assertEquals(2, newSrvAtomicStamped.stamp());
+
+        newClientAtomicStamped.close();
     }
 
     /**
@@ -414,7 +462,16 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return clientAtomicStamped.compareAndSet(2, 3, 2, 3);
+                try {
+                    clientAtomicStamped.compareAndSet(2, 3, 2, 3);
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
             }
         });
 
@@ -429,13 +486,9 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         servCommSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull(fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
 
         // Check that after reconnect working.
         assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3));
@@ -511,6 +564,14 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
                 return null;
             }
         }, IllegalStateException.class, null);
+
+        IgniteAtomicLong newClientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
+
+        assertEquals(0L, newClientAtomicLong.getAndAdd(1));
+
+        IgniteAtomicLong newSrvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
+
+        assertEquals(1L, newSrvAtomicLong.getAndAdd(1));
     }
 
     /**
@@ -533,7 +594,16 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return clientAtomicLong.getAndAdd(1);
+                try {
+                    clientAtomicLong.getAndAdd(1);
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
             }
         });
 
@@ -548,13 +618,9 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         commSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull(fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
 
         // Check that after reconnect working.
         assertEquals(1, clientAtomicLong.addAndGet(1));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index 77e9c03..f24a8ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -29,7 +29,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 
 /**
- * TODO IGNITE-901 create another after removed.
+ *
  */
 public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest {
     /** {@inheritDoc} */
@@ -224,6 +224,16 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
                 return null;
             }
         }, IllegalStateException.class, null);
+
+        IgniteSet<String> newClientSet = client.set(setName, colCfg);
+
+        IgniteSet<String> newSrvSet = srv.set(setName, null);
+
+        assertTrue(newClientSet.add("1"));
+
+        assertFalse(newSrvSet.add("1"));
+
+        newSrvSet.close();
     }
 
     /**
@@ -256,7 +266,17 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return clientSet.add("2");
+                try {
+                    for (int i = 0; i < 100; i++)
+                        clientSet.add("2");
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
             }
         });
 
@@ -271,13 +291,9 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         commSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
 
         assertTrue(clientSet.add("3"));
 
@@ -352,6 +368,14 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
                 return null;
             }
         }, IllegalStateException.class, null);
+
+        IgniteQueue<String> newClientQueue = client.queue(setName, 10, colCfg);
+
+        IgniteQueue<String> newSrvQueue = srv.queue(setName, 10, null);
+
+        assertTrue(newClientQueue.add("1"));
+
+        assertTrue(newSrvQueue.add("2"));
     }
 
     /**
@@ -384,7 +408,16 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return clientQueue.add("2");
+                try {
+                    clientQueue.add("2");
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
             }
         });
 
@@ -399,13 +432,9 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         commSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", (Boolean)fut.get());
 
         assertTrue(clientQueue.add("3"));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
index 186459e..212fdc1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
@@ -59,11 +59,20 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return client.compute().affinityCall("test-cache", 40, new IgniteCallable<Object>() {
-                    @Override public Integer call() throws Exception {
-                        return 42;
-                    }
-                });
+                try {
+                    client.compute().affinityCall("test-cache", 40, new IgniteCallable<Object>() {
+                        @Override public Integer call() throws Exception {
+                            return 42;
+                        }
+                    });
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
             }
         });
 
@@ -78,13 +87,9 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         commSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull(fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
     }
 
     /**
@@ -103,11 +108,20 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return client.compute().broadcast(new IgniteCallable<Object>() {
-                    @Override public Object call() throws Exception {
-                        return 42;
-                    }
-                });
+                try {
+                    client.compute().broadcast(new IgniteCallable<Object>() {
+                        @Override public Object call() throws Exception {
+                            return 42;
+                        }
+                    });
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
             }
         });
 
@@ -122,13 +136,9 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         commSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull(fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
     }
 
     /**
@@ -147,11 +157,20 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                return client.compute().apply(new IgniteClosure<Integer, Integer>() {
-                    @Override public Integer apply(Integer o) {
-                        return o + 1;
-                    }
-                }, Arrays.asList(1, 2, 3));
+                try {
+                    client.compute().apply(new IgniteClosure<Integer, Integer>() {
+                        @Override public Integer apply(Integer o) {
+                            return o + 1;
+                        }
+                    }, Arrays.asList(1, 2, 3));
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
             }
         });
 
@@ -166,12 +185,8 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         commSpi.unblockMsg();
 
-        reconnectClientNode(client, srv, new Runnable() {
-            @Override public void run() {
-                // Check that future failed.
-                assertNotNull(fut.error());
-                assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass());
-            }
-        });
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectQueriesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectQueriesTest.java
deleted file mode 100644
index 813d06c..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectQueriesTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-/**
- * TODO IGNITE-901.
- */
-public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected int serverCount() {
-        return 1;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
index e79f2aa..681efa4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
@@ -18,19 +18,16 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
-import org.apache.ignite.events.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.processors.service.*;
-import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.services.*;
+import org.apache.ignite.testframework.*;
 
 import java.util.concurrent.*;
 
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-
 /**
- * TODO IGNITE-901: fail after disconnect, disconnect when operation in progress, service deployed on client.
+ *
  */
 public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbstractTest {
     /** {@inheritDoc} */
@@ -53,9 +50,9 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
 
         IgniteServices services = client.services();
 
-        services.deployClusterSingleton("s1", new TestServiceImpl());
+        services.deployClusterSingleton("testReconnect", new TestServiceImpl());
 
-        TestService srvc = services.serviceProxy("s1", TestService.class, false);
+        TestService srvc = services.serviceProxy("testReconnect", TestService.class, false);
 
         assertNotNull(srvc);
 
@@ -63,37 +60,159 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        reconnectClientNode(client, srv, null);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        DummyService.exeLatch("testReconnect2", latch);
+
+        services.deployClusterSingleton("testReconnect2", new DummyService());
+
+        assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+
+        assertEquals((Object) 4L, srvc.test());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServiceRemove() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
 
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+        IgniteServices clnServices = client.services();
 
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
-                    info("Disconnected: " + evt);
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
+        final IgniteServices srvServices = srv.services();
 
-                    reconnectLatch.countDown();
+        srvServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl());
+
+        final TestService srvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false);
+
+        assertNotNull(srvc);
+
+        assertNotNull(srvc.test());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvServices.cancel("testServiceRemove");
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return srvc.test();
+            }
+        }, IgniteException.class, null);
+
+        clnServices.deployClusterSingleton("testServiceRemove", new TestServiceImpl());
+
+        TestService newSrvc = clnServices.serviceProxy("testServiceRemove", TestService.class, false);
+
+        assertNotNull(newSrvc);
+        assertNotNull(newSrvc.test());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectInDeploying() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final IgniteServices services = client.services();
+
+        Ignite srv = clientRouter(client);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMsg(GridContinuousMessage.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    services.deployClusterSingleton("testReconnectInDeploying", new TestServiceImpl());
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
                 }
 
-                return true;
+                return false;
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+        });
 
-        srvSpi.failNode(client.cluster().localNode().id(), null);
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
 
-        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+        assertNotDone(fut);
 
-        CountDownLatch latch = new CountDownLatch(1);
+        commSpi.unblockMsg();
 
-        DummyService.exeLatch("s2", latch);
+        reconnectClientNode(client, srv, null);
 
-        services.deployClusterSingleton("s2", new DummyService());
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
 
-        assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectInProgress() throws Exception {
+        Ignite client = grid(serverCount());
 
-        assertEquals((Object) 4L, srvc.test());
+        assertTrue(client.cluster().localNode().isClient());
+
+        final IgniteServices services = client.services();
+
+        final Ignite srv = clientRouter(client);
+
+        services.deployClusterSingleton("testReconnectInProgress", new TestServiceImpl());
+
+        final TestService srvc = services.serviceProxy("testReconnectInProgress", TestService.class, false);
+
+        assertNotNull(srvc);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMsg(GridJobExecuteResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    srvc.test();
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMsg();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
index 1fa5e73..8112ce5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
@@ -17,12 +17,147 @@
 
 package org.apache.ignite.internal;
 
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.datastreamer.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+
 /**
- * TODO IGNITE-901.
+ *
  */
 public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    public static final String CACHE_NAME = "streamer";
+
     /** {@inheritDoc} */
     @Override protected int serverCount() {
         return 1;
     }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>(CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setCacheMode(CacheMode.PARTITIONED);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStreamerReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME);
+
+        IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer("streamer");
+
+        for (int i = 0; i < 50; i++)
+            streamer.addData(i, i);
+
+        streamer.flush();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return srvCache.localSize() == 50;
+            }
+        }, 2000L);
+        reconnectClientNode(client, srv, null);
+
+        for (int i = 0; i < 50; i++)
+            streamer.addData(i, i);
+
+        streamer.flush();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return srvCache.localSize() == 100;
+            }
+        }, 2000L);
+
+        streamer.close();
+
+        streamer.future().get(2, TimeUnit.SECONDS);
+
+        srvCache.removeAll();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStreamerReconnectInProgress() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> srvCache = srv.cache(CACHE_NAME);
+
+        final IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer("streamer");
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMsg(DataStreamerResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    for (int i = 0; i < 50; i++)
+                        streamer.addData(i, i);
+
+                    streamer.flush();
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMsg();
+
+        reconnectClientNode(client, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+
+        for (int i = 0; i < 50; i++)
+            streamer.addData(i, i);
+
+        streamer.flush();
+
+        assertTrue(srv.cache(CACHE_NAME).localSize() >= 0);
+
+        srvCache.removeAll();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index 17e33c7..fb41f0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -41,7 +41,6 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
         suite.addTestSuite(IgniteClientReconnectAtomicsTest.class);
         suite.addTestSuite(IgniteClientReconnectCollectionsTest.class);
         suite.addTestSuite(IgniteClientReconnectServicesTest.class);
-        suite.addTestSuite(IgniteClientReconnectQueriesTest.class);
         suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
 
         return suite;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
new file mode 100644
index 0000000..f75e780
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    public static final String QUERY_CACHE = "query";
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>(QUERY_CACHE)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setIndexedTypes(Integer.class, Person.class);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryReconnect() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.removeAll();
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key<>0");
+        qry.setPageSize(1);
+
+        QueryCursor<Cache.Entry<Integer, Person>> queryCursor = clnCache.query(qry);
+
+        reconnectClientNode(cln, srv, new Runnable() {
+            @Override public void run() {
+                srvCache.put(4, new Person(4, "name4", "surname4"));
+            }
+        });
+
+        List<Cache.Entry<Integer, Person>> result = queryCursor.getAll();
+
+        assertNotNull(result);
+        assertEquals(4, result.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryReconnectInProgress() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.removeAll();
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key<>0");
+        qry.setPageSize(1);
+
+        final QueryCursor<Cache.Entry<Integer, Person>> queryCursor = clnCache.query(qry);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMsg(GridQueryNextPageResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    queryCursor.getAll();
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMsg();
+
+        reconnectClientNode(cln, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnect() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.removeAll();
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+
+        scanQry.setPageSize(1);
+        scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
+            @Override public boolean apply(Integer integer, Person person) {
+                return true;
+            }
+        });
+
+        final QueryCursor<Cache.Entry<Integer, Person>> queryCursor = clnCache.query(scanQry);
+
+        reconnectClientNode(cln, srv, new Runnable() {
+            @Override public void run() {
+                srvCache.put(4, new Person(4, "name4", "surname4"));
+            }
+        });
+
+        IgniteInternalFuture<List<Cache.Entry<Integer, Person>>> f = GridTestUtils
+            .runAsync(new Callable<List<Cache.Entry<Integer, Person>>>() {
+                @Override public List<Cache.Entry<Integer, Person>> call() throws Exception {
+                    return queryCursor.getAll();
+                }
+            });
+
+        List<Cache.Entry<Integer, Person>> result = f.get(2, TimeUnit.SECONDS);
+
+        assertEquals(4, result.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnectInProgress() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+
+        scanQry.setPageSize(1);
+        scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
+            @Override public boolean apply(Integer integer, Person person) {
+                return true;
+            }
+        });
+
+        final QueryCursor<Cache.Entry<Integer, Person>> queryCursor = clnCache.query(scanQry);
+
+        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+
+        commSpi.blockMsg(GridCacheQueryResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    queryCursor.getAll();
+                }
+                catch (Exception e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        commSpi.unblockMsg();
+
+        reconnectClientNode(cln, srv, null);
+
+        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+    }
+
+    /**
+     *
+     */
+    public static class Person {
+        /** */
+        @QuerySqlField
+        public int id;
+
+        /** */
+        @QuerySqlField
+        public String name;
+
+        /** */
+        @QuerySqlField
+        public String surname;
+
+        /**
+         * @param id Id.
+         * @param name Name.
+         * @param surname Surname.
+         */
+        public Person(int id, String name, String surname) {
+            this.id = id;
+            this.name = name;
+            this.surname = surname;
+        }
+
+        /**
+         * @return Id.
+         */
+        public int getId() {
+            return id;
+        }
+
+        /**
+         * @param id Set id.
+         */
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * @param name Name.
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /**
+         * @return Surname.
+         */
+        public String getSurname() {
+            return surname;
+        }
+
+        /**
+         * @param surname Surname.
+         */
+        public void setSurname(String surname) {
+            this.surname = surname;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/92908b91/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 181ff0c..9f0f850 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.*;
+import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
@@ -122,6 +123,9 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         //Unmarshallig query test.
         suite.addTestSuite(IgniteCacheP2pUnmarshallingQueryErrorTest.class);
 
+        // Reconnect client query test.
+        suite.addTestSuite(IgniteClientReconnectQueriesTest.class);
+
         return suite;
     }
 }


Mime
View raw message