Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D2D7200C80 for ; Thu, 25 May 2017 11:21:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3BB7A160BCA; Thu, 25 May 2017 09:21:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5A7B5160BC7 for ; Thu, 25 May 2017 11:21:56 +0200 (CEST) Received: (qmail 72417 invoked by uid 500); 25 May 2017 09:21:55 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 72408 invoked by uid 99); 25 May 2017 09:21:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 May 2017 09:21:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 767B7DFAEB; Thu, 25 May 2017 09:21:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: cc Date: Thu, 25 May 2017 09:21:55 +0000 (UTC) archived-at: Thu, 25 May 2017 09:21:57 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5075-cc b65a63d82 -> 52b716b8d cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52b716b8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52b716b8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52b716b8 Branch: refs/heads/ignite-5075-cc Commit: 52b716b8dfd73651d461eebe4098efd71688f05c Parents: b65a63d Author: sboikov Authored: Thu May 25 12:01:52 2017 +0300 Committer: sboikov Committed: Thu May 25 12:18:27 2017 +0300 ---------------------------------------------------------------------- ...ContinuousQueryFailoverAbstractSelfTest.java | 73 ++++++++++---------- 1 file changed, 36 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/52b716b8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index befd1d7..3108edc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -400,7 +400,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @param ignite Ignite. - * @param topVer Topology version. + * @param topVer Major topology version. + * @param minorVer Minor topology version. * @throws Exception If failed. */ private void waitRebalanceFinished(Ignite ignite, long topVer, int minorVer) throws Exception { @@ -511,9 +512,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC * @param nodes Count nodes. * @param killedNodeIdx Killed node index. * @param updCntrs Update counters. - * @return {@code True} if counters matches. */ - private boolean checkPartCounter(int nodes, int killedNodeIdx, Map updCntrs) { + private void checkPartCounter(int nodes, int killedNodeIdx, Map updCntrs) { for (int i = 0; i < nodes; i++) { if (i == killedNodeIdx) continue; @@ -527,8 +527,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC assertEquals(e.getValue(), act.get(e.getKey()).get2()); } } - - return true; } /** @@ -753,8 +751,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC assert GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { // (SRV_NODES + 1 client node) - 1 primary - backup nodes. - return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */) - - 1 /** Primary node */ - backups; + return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /* client node */) + - 1 /* Primary node */ - backups; } }, 5000L); @@ -1253,6 +1251,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @param expEvts Expected events. * @param lsnr Listener. + * @throws Exception If failed. */ private void checkEvents(final List> expEvts, final CacheEventListener3 lsnr, boolean allowLoseEvt) throws Exception { @@ -1347,9 +1346,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC QueryCursor cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry); - final Collection backupQueue = backupQueue(ignite(1)); - - assertEquals(0, backupQueue.size()); + assertEquals(0, backupQueue(ignite(1)).size()); IgniteCache cache0 = ignite(0).cache(DEFAULT_CACHE_NAME); @@ -1367,11 +1364,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(1)).isEmpty(); } }, 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), + backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD); if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); @@ -1389,11 +1387,11 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(1)).isEmpty(); } }, ACK_FREQ + 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty()); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), backupQueue(ignite(1)).isEmpty()); if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); @@ -1421,9 +1419,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC QueryCursor cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry); - final Collection backupQueue = backupQueue(ignite(0)); - - assertEquals(0, backupQueue.size()); + assertEquals(0, backupQueue(ignite(0)).size()); long ttl = 100; @@ -1433,9 +1429,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC final List keys = primaryKeys(ignite(1).cache(DEFAULT_CACHE_NAME), BACKUP_ACK_THRESHOLD); - CountDownLatch latch = new CountDownLatch(keys.size()); - - lsnr.latch = latch; + lsnr.latch = new CountDownLatch(keys.size()); for (Integer key : keys) { log.info("Put: " + key); @@ -1445,11 +1439,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(0)).isEmpty(); } }, 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)), + backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD); boolean wait = waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -1461,14 +1456,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(0)).isEmpty(); } }, 2000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)), backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD); - if (backupQueue.size() != 0) { - for (Object o : backupQueue) { + if (backupQueue(ignite(0)).size() != 0) { + for (Object o : backupQueue(ignite(0))) { CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o; assertNotSame("Evicted entry added to backup queue.", -1L, e.updateCounter()); @@ -1494,9 +1489,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC QueryCursor cur = cache.query(qry); - final Collection backupQueue = backupQueue(ignite(1)); - - assertEquals(0, backupQueue.size()); + assertEquals(0, backupQueue(ignite(1)).size()); List keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD); @@ -1512,11 +1505,12 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return backupQueue.isEmpty(); + return backupQueue(ignite(1)).isEmpty(); } }, 3000); - assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), + backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD); if (!latch.await(5, SECONDS)) fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); @@ -1533,20 +1527,25 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ConcurrentMap infos = GridTestUtils.getFieldValue(proc, "rmtInfos"); - Collection backupQueue = null; + Collection backupQueue = new ArrayList<>(); for (Object info : infos.values()) { GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd"); - if (hnd.isQuery() && DEFAULT_CACHE_NAME.equals(hnd.cacheName())) { - backupQueue = GridTestUtils.getFieldValue(hnd, CacheContinuousQueryHandler.class, "backupQueue"); + if (hnd.isQuery() && hnd.cacheName().equals(DEFAULT_CACHE_NAME)) { + Map map = GridTestUtils.getFieldValue(hnd, + CacheContinuousQueryHandler.class, "entryBufs"); + + for (CacheContinuousQueryEventBuffer buf : map.values()) { + Collection q = GridTestUtils.getFieldValue(buf, + CacheContinuousQueryEventBuffer.class, "backupQ"); - break; + if (q != null) + backupQueue.addAll(q); + } } } - assertNotNull(backupQueue); - return backupQueue; }