ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: cc
Date Thu, 25 May 2017 09:21:55 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc b65a63d82 -> 52b716b8d


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52b716b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52b716b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52b716b8

Branch: refs/heads/ignite-5075-cc
Commit: 52b716b8dfd73651d461eebe4098efd71688f05c
Parents: b65a63d
Author: sboikov <sboikov@gridgain.com>
Authored: Thu May 25 12:01:52 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu May 25 12:18:27 2017 +0300

----------------------------------------------------------------------
 ...ContinuousQueryFailoverAbstractSelfTest.java | 73 ++++++++++----------
 1 file changed, 36 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/52b716b8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index befd1d7..3108edc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -400,7 +400,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
     /**
      * @param ignite Ignite.
-     * @param topVer Topology version.
+     * @param topVer Major topology version.
+     * @param minorVer Minor topology version.
      * @throws Exception If failed.
      */
     private void waitRebalanceFinished(Ignite ignite, long topVer, int minorVer) throws Exception
{
@@ -511,9 +512,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
      * @param nodes Count nodes.
      * @param killedNodeIdx Killed node index.
      * @param updCntrs Update counters.
-     * @return {@code True} if counters matches.
      */
-    private boolean checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long>
updCntrs) {
+    private void checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long>
updCntrs) {
         for (int i = 0; i < nodes; i++) {
             if (i == killedNodeIdx)
                 continue;
@@ -527,8 +527,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
                     assertEquals(e.getValue(), act.get(e.getKey()).get2());
             }
         }
-
-        return true;
     }
 
     /**
@@ -753,8 +751,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
         assert GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
                 // (SRV_NODES + 1 client node) - 1 primary - backup nodes.
-                return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node
*/)
-                    - 1 /** Primary node */ - backups;
+                return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /* client node
*/)
+                    - 1 /* Primary node */ - backups;
             }
         }, 5000L);
 
@@ -1253,6 +1251,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
     /**
      * @param expEvts Expected events.
      * @param lsnr Listener.
+     * @throws Exception If failed.
      */
     private void checkEvents(final List<T3<Object, Object, Object>> expEvts,
final CacheEventListener3 lsnr,
         boolean allowLoseEvt) throws Exception {
@@ -1347,9 +1346,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         QueryCursor<?> cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry);
 
-        final Collection<Object> backupQueue = backupQueue(ignite(1));
-
-        assertEquals(0, backupQueue.size());
+        assertEquals(0, backupQueue(ignite(1)).size());
 
         IgniteCache<Object, Object> cache0 = ignite(0).cache(DEFAULT_CACHE_NAME);
 
@@ -1367,11 +1364,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                return backupQueue.isEmpty();
+                return backupQueue(ignite(1)).isEmpty();
             }
         }, 2000);
 
-        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() <
BACKUP_ACK_THRESHOLD);
+        assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)),
+            backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD);
 
         if (!latch.await(5, SECONDS))
             fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount()
+ ']');
@@ -1389,11 +1387,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                return backupQueue.isEmpty();
+                return backupQueue(ignite(1)).isEmpty();
             }
         }, ACK_FREQ + 2000);
 
-        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+        assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), backupQueue(ignite(1)).isEmpty());
 
         if (!latch.await(5, SECONDS))
             fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount()
+ ']');
@@ -1421,9 +1419,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         QueryCursor<?> cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry);
 
-        final Collection<Object> backupQueue = backupQueue(ignite(0));
-
-        assertEquals(0, backupQueue.size());
+        assertEquals(0, backupQueue(ignite(0)).size());
 
         long ttl = 100;
 
@@ -1433,9 +1429,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         final List<Integer> keys = primaryKeys(ignite(1).cache(DEFAULT_CACHE_NAME),
BACKUP_ACK_THRESHOLD);
 
-        CountDownLatch latch = new CountDownLatch(keys.size());
-
-        lsnr.latch = latch;
+        lsnr.latch = new CountDownLatch(keys.size());
 
         for (Integer key : keys) {
             log.info("Put: " + key);
@@ -1445,11 +1439,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                return backupQueue.isEmpty();
+                return backupQueue(ignite(0)).isEmpty();
             }
         }, 2000);
 
-        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() <
BACKUP_ACK_THRESHOLD);
+        assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)),
+            backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD);
 
         boolean wait = waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
@@ -1461,14 +1456,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                return backupQueue.isEmpty();
+                return backupQueue(ignite(0)).isEmpty();
             }
         }, 2000);
 
-        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() <
BACKUP_ACK_THRESHOLD);
+        assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)), backupQueue(ignite(0)).size()
< BACKUP_ACK_THRESHOLD);
 
-        if (backupQueue.size() != 0) {
-            for (Object o : backupQueue) {
+        if (backupQueue(ignite(0)).size() != 0) {
+            for (Object o : backupQueue(ignite(0))) {
                 CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o;
 
                 assertNotSame("Evicted entry added to backup queue.", -1L, e.updateCounter());
@@ -1494,9 +1489,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         QueryCursor<?> cur = cache.query(qry);
 
-        final Collection<Object> backupQueue = backupQueue(ignite(1));
-
-        assertEquals(0, backupQueue.size());
+        assertEquals(0, backupQueue(ignite(1)).size());
 
         List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD);
 
@@ -1512,11 +1505,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                return backupQueue.isEmpty();
+                return backupQueue(ignite(1)).isEmpty();
             }
         }, 3000);
 
-        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() <
BACKUP_ACK_THRESHOLD);
+        assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)),
+            backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD);
 
         if (!latch.await(5, SECONDS))
             fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount()
+ ']');
@@ -1533,20 +1527,25 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
 
         ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos");
 
-        Collection<Object> backupQueue = null;
+        Collection<Object> backupQueue = new ArrayList<>();
 
         for (Object info : infos.values()) {
             GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
 
-            if (hnd.isQuery() && DEFAULT_CACHE_NAME.equals(hnd.cacheName())) {
-                backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class,
"backupQueue");
+            if (hnd.isQuery() && hnd.cacheName().equals(DEFAULT_CACHE_NAME)) {
+                Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd,
+                    CacheContinuousQueryHandler.class, "entryBufs");
+
+                for (CacheContinuousQueryEventBuffer buf : map.values()) {
+                    Collection<Object> q = GridTestUtils.getFieldValue(buf,
+                        CacheContinuousQueryEventBuffer.class, "backupQ");
 
-                break;
+                    if (q != null)
+                        backupQueue.addAll(q);
+                }
             }
         }
 
-        assertNotNull(backupQueue);
-
         return backupQueue;
     }
 


Mime
View raw message