Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4D7BD185C5 for ; Thu, 13 Aug 2015 16:36:31 +0000 (UTC) Received: (qmail 8978 invoked by uid 500); 13 Aug 2015 16:36:31 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 8945 invoked by uid 500); 13 Aug 2015 16:36:31 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 8936 invoked by uid 99); 13 Aug 2015 16:36:30 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Aug 2015 16:36:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 7964FDD7CC for ; Thu, 13 Aug 2015 16:36:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.775 X-Spam-Level: * X-Spam-Status: No, score=1.775 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id E75A-ILJN2TB for ; Thu, 13 Aug 2015 16:36:20 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 8019251AEE for ; Thu, 13 Aug 2015 14:37:17 +0000 (UTC) Received: (qmail 37901 invoked by uid 99); 13 Aug 2015 14:37:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Aug 2015 14:37:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B022BE6B11; Thu, 13 Aug 2015 14:37:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Message-Id: <1f2444848c704581903e1fc16b95d507@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ignite git commit: # Register client continuous listeners on node join Date: Thu, 13 Aug 2015 14:37:15 +0000 (UTC) Repository: incubator-ignite Updated Branches: refs/heads/master aed83af5f -> 35e3e4e04 # Register client continuous listeners on node join Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35e3e4e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35e3e4e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35e3e4e0 Branch: refs/heads/master Commit: 35e3e4e048fa34b5b23ebd0ae235424f3e3492d9 Parents: aed83af Author: sboikov Authored: Thu Aug 13 17:37:00 2015 +0300 Committer: sboikov Committed: Thu Aug 13 17:37:00 2015 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 44 ++++++++++++++++---- .../IgniteCacheContinuousQueryClientTest.java | 33 ++++++++++++--- .../IgniteCacheQuerySelfTestSuite.java | 1 + scripts/git-format-patch.sh | 2 +- 4 files changed, 66 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index daa9494..5f1c4bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -193,10 +193,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { unregisterRemote(routineId); if (snd.isClient()) { - Map infoMap = clientInfos.get(snd.id()); + Map clientRoutineMap = clientInfos.get(snd.id()); - if (infoMap != null) - infoMap.remove(msg.routineId()); + if (clientRoutineMap != null) + clientRoutineMap.remove(msg.routineId()); } } } @@ -370,6 +370,34 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } for (Map.Entry> entry : data.clientInfos.entrySet()) { + UUID clientNodeId = entry.getKey(); + + Map clientRoutineMap = entry.getValue(); + + for (Map.Entry e : clientRoutineMap.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); + + try { + if (info.prjPred != null) + ctx.resource().injectGeneric(info.prjPred); + + if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { + if (registerHandler(clientNodeId, + routineId, + info.hnd, + info.bufSize, + info.interval, + info.autoUnsubscribe, + false)) + info.hnd.onListenerRegistered(routineId, ctx); + } + } + catch (IgniteCheckedException err) { + U.error(log, "Failed to register continuous handler.", err); + } + } + Map map = clientInfos.get(entry.getKey()); if (map == null) { @@ -723,17 +751,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } if (node.isClient()) { - Map clientRouteMap = clientInfos.get(node.id()); + Map clientRoutineMap = clientInfos.get(node.id()); - if (clientRouteMap == null) { - clientRouteMap = new HashMap<>(); + if (clientRoutineMap == null) { + clientRoutineMap = new HashMap<>(); - Map old = clientInfos.put(node.id(), clientRouteMap); + Map old = clientInfos.put(node.id(), clientRoutineMap); assert old == null; } - clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), + clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(), data.interval(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java index bb413a0..d66d1d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; @@ -38,7 +40,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; */ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest { /** */ - protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** */ private boolean client; @@ -47,6 +49,8 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); @@ -60,6 +64,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest return cfg; } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + /** * @throws Exception If failed. */ @@ -80,15 +91,27 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest QueryCursor cur = clientNode.cache(null).query(qry); - Ignite joined = startGrid(4); + Ignite joined1 = startGrid(4); - IgniteCache joinedCache = joined.cache(null); + IgniteCache joinedCache1 = joined1.cache(null); - joinedCache.put(primaryKey(joinedCache), 1); + joinedCache1.put(primaryKey(joinedCache1), 1); assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS)); cur.close(); + + lsnr.latch = new CountDownLatch(1); + + Ignite joined2 = startGrid(5); + + IgniteCache joinedCache2 = joined2.cache(null); + + joinedCache2.put(primaryKey(joinedCache2), 2); + + U.sleep(1000); + + assertEquals("Unexpected event received.", 1, lsnr.latch.getCount()); } /** @@ -96,7 +119,7 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest */ private static class CacheEventListener implements CacheEntryUpdatedListener { /** */ - private final CountDownLatch latch = new CountDownLatch(1); + private volatile CountDownLatch latch = new CountDownLatch(1); /** */ @LoggerResource http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 2d7d0ce..a3849d7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -98,6 +98,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class); + suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/scripts/git-format-patch.sh ---------------------------------------------------------------------- diff --git a/scripts/git-format-patch.sh b/scripts/git-format-patch.sh index b11c73d..83aee3e 100755 --- a/scripts/git-format-patch.sh +++ b/scripts/git-format-patch.sh @@ -20,7 +20,7 @@ # Git patch-file maker. # echo 'Usage: scripts/git-format-patch.sh [-ih|--ignitehome ] [-idb|--ignitedefbranch ] [-ph|--patchhome ]' -echo 'It is a script to create patch between Current branch (branch with changes) and Default branche. The script is safe and do not broke or lose your changes.' +echo 'It is a script to create patch between Current branch (branch with changes) and Default branch. The script is safe and does not break or lose your changes.' echo "It should be called from IGNITE_HOME directory." echo "Patch will be created at PATCHES_HOME (= IGNITE_HOME, by default) between Default branch (IGNITE_DEFAULT_BRANCH) and Current branch." echo "Note: you can use ${IGNITE_HOME}/scripts/git-patch-prop-local.sh to set your own local properties (to rewrite settings at git-patch-prop-local.sh). "