ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/3] incubator-ignite git commit: # ignite-901
Date Tue, 07 Jul 2015 14:47:38 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 55bd754f1 -> 89fb3951a


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/89fb3951/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
index 303b892..b0dc965 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.query.annotations.*;
 import org.apache.ignite.configuration.*;
@@ -33,6 +32,10 @@ import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
 
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
 /**
  *
  */
@@ -42,7 +45,7 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
 
     /** {@inheritDoc} */
     @Override protected int serverCount() {
-        return 1;
+        return 3;
     }
 
     /** {@inheritDoc} */
@@ -55,8 +58,9 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer,
Person>(QUERY_CACHE)
-            .setCacheMode(CacheMode.PARTITIONED)
-            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setCacheMode(PARTITIONED)
+            .setAtomicityMode(ATOMIC)
+            .setBackups(1)
             .setIndexedTypes(Integer.class, Person.class);
 
         cfg.setCacheConfiguration(ccfg);
@@ -64,6 +68,13 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        grid(0).getOrCreateCache(QUERY_CACHE).removeAll();
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -78,24 +89,31 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
 
         final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
 
-        clnCache.removeAll();
-
         clnCache.put(1, new Person(1, "name1", "surname1"));
         clnCache.put(2, new Person(2, "name2", "surname2"));
         clnCache.put(3, new Person(3, "name3", "surname3"));
 
-        SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key<>0");
+        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key
<> 0");
+
         qry.setPageSize(1);
 
-        QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(qry);
+        QueryCursor<Cache.Entry<Integer, Person>> cur = clnCache.query(qry);
 
         reconnectClientNode(cln, srv, new Runnable() {
             @Override public void run() {
                 srvCache.put(4, new Person(4, "name4", "surname4"));
+
+                try {
+                    clnCache.query(qry);
+
+                    fail();
+                } catch (CacheException e) {
+                    check(e);
+                }
             }
         });
 
-        List<Cache.Entry<Integer, Person>> res = qryCursor.getAll();
+        List<Cache.Entry<Integer, Person>> res = cur.getAll();
 
         assertNotNull(res);
         assertEquals(4, res.size());
@@ -104,7 +122,7 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
     /**
      * @throws Exception If failed.
      */
-    public void testQueryReconnectInProgress() throws Exception {
+    public void testReconnectQueryInProgress() throws Exception {
         Ignite cln = grid(serverCount());
 
         assertTrue(cln.cluster().localNode().isClient());
@@ -113,27 +131,24 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
 
         final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
 
-        clnCache.removeAll();
-
         clnCache.put(1, new Person(1, "name1", "surname1"));
         clnCache.put(2, new Person(2, "name2", "surname2"));
         clnCache.put(3, new Person(3, "name3", "surname3"));
 
-        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key<>0");
-        qry.setPageSize(1);
+        blockMessage(GridQueryNextPageResponse.class);
 
-        final QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(qry);
+        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key
<> 0");
 
-        BlockTpcCommunicationSpi commSpi = commSpi(srv);
+        qry.setPageSize(1);
 
-        commSpi.blockMsg(GridQueryNextPageResponse.class);
+        final QueryCursor<Cache.Entry<Integer, Person>> cur1 = clnCache.query(qry);
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>()
{
             @Override public Object call() throws Exception {
                 try {
-                    qryCursor.getAll();
+                    cur1.getAll();
                 }
-                catch (Exception e) {
+                catch (CacheException e) {
                     checkAndWait(e);
 
                     return true;
@@ -152,12 +167,17 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
 
         assertNotDone(fut);
 
-        commSpi.unblockMsg();
+        unblockMessage();
 
         reconnectClientNode(cln, srv, null);
 
-        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+        assertTrue((Boolean) fut.get(2, SECONDS));
+
+        QueryCursor<Cache.Entry<Integer, Person>> cur2 = clnCache.query(qry);
+
+        assertEquals(3, cur2.getAll().size());
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -172,45 +192,69 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
 
         final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
 
-        clnCache.removeAll();
-
-        clnCache.put(1, new Person(1, "name1", "surname1"));
-        clnCache.put(2, new Person(2, "name2", "surname2"));
-        clnCache.put(3, new Person(3, "name3", "surname3"));
+        for (int i = 0; i < 100; i++)
+            clnCache.put(i, new Person(i, "name-" + i, "surname-" + i));
 
-        ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+        final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
 
         scanQry.setPageSize(1);
+
         scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
             @Override public boolean apply(Integer integer, Person person) {
                 return true;
             }
         });
 
-        final QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+        QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
 
         reconnectClientNode(cln, srv, new Runnable() {
             @Override public void run() {
-                srvCache.put(4, new Person(4, "name4", "surname4"));
+                srvCache.put(1000, new Person(1000, "name", "surname"));
+
+                try {
+                    clnCache.query(scanQry);
+
+                    fail();
+                }
+                catch (CacheException e) {
+                    check(e);
+                }
             }
         });
 
-        IgniteInternalFuture<List<Cache.Entry<Integer, Person>>> f = GridTestUtils
-            .runAsync(new Callable<List<Cache.Entry<Integer, Person>>>()
{
-                @Override public List<Cache.Entry<Integer, Person>> call() throws
Exception {
-                    return qryCursor.getAll();
-                }
-            });
+        try {
+            qryCursor.getAll();
 
-        List<Cache.Entry<Integer, Person>> res = f.get(2, TimeUnit.SECONDS);
+            fail();
+        }
+        catch (CacheException e) {
+            checkAndWait(e);
+        }
 
-        assertEquals(4, res.size());
+        qryCursor = clnCache.query(scanQry);
+
+        assertEquals(101, qryCursor.getAll().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnectInProgress1() throws Exception {
+        scanQueryReconnectInProgress(false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testScanQueryReconnectInProgress() throws Exception {
+    public void testScanQueryReconnectInProgress2() throws Exception {
+        scanQueryReconnectInProgress(true);
+    }
+
+    /**
+     * @param setPart If {@code true} sets partition for scan query.
+     * @throws Exception If failed.
+     */
+    private void scanQueryReconnectInProgress(boolean setPart) throws Exception {
         Ignite cln = grid(serverCount());
 
         assertTrue(cln.cluster().localNode().isClient());
@@ -223,27 +267,29 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
         clnCache.put(2, new Person(2, "name2", "surname2"));
         clnCache.put(3, new Person(3, "name3", "surname3"));
 
-        ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+        final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
 
         scanQry.setPageSize(1);
+
         scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
             @Override public boolean apply(Integer integer, Person person) {
                 return true;
             }
         });
 
-        final QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+        if (setPart)
+            scanQry.setPartition(1);
 
-        BlockTpcCommunicationSpi commSpi = commSpi(srv);
-
-        commSpi.blockMsg(GridCacheQueryResponse.class);
+        blockMessage(GridCacheQueryResponse.class);
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>()
{
             @Override public Object call() throws Exception {
                 try {
+                    QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+
                     qryCursor.getAll();
                 }
-                catch (Exception e) {
+                catch (CacheException e) {
                     checkAndWait(e);
 
                     return true;
@@ -262,11 +308,37 @@ public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstr
 
         assertNotDone(fut);
 
-        commSpi.unblockMsg();
+        unblockMessage();
 
         reconnectClientNode(cln, srv, null);
 
-        assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
+        assertTrue((Boolean)fut.get(2, SECONDS));
+
+        QueryCursor<Cache.Entry<Integer, Person>> qryCursor2 = clnCache.query(scanQry);
+
+        assertEquals(setPart ? 1 : 3, qryCursor2.getAll().size());
+    }
+
+    /**
+     * @param clazz Message class.
+     */
+    private void blockMessage(Class<?> clazz) {
+        for (int i = 0; i < serverCount(); i++) {
+            BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
+
+            commSpi.blockMessage(clazz);
+        }
+    }
+
+    /**
+     *
+     */
+    private void unblockMessage() {
+        for (int i = 0; i < serverCount(); i++) {
+            BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
+
+            commSpi.unblockMessage();
+        }
     }
 
     /**


Mime
View raw message