Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1918B200C48 for ; Thu, 6 Apr 2017 11:31:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 17C0E160BA4; Thu, 6 Apr 2017 09:31:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 00586160BA6 for ; Thu, 6 Apr 2017 11:31:53 +0200 (CEST) Received: (qmail 15337 invoked by uid 500); 6 Apr 2017 09:31:53 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 15198 invoked by uid 99); 6 Apr 2017 09:31:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 09:31:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ABF61E93DC; Thu, 6 Apr 2017 09:31:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 06 Apr 2017 09:31:56 -0000 Message-Id: <7ebee9aeb9fe4540a249dc7c789f7bec@git.apache.org> In-Reply-To: <3ba27144dbdb41f0bb9f3db0049366f4@git.apache.org> References: <3ba27144dbdb41f0bb9f3db0049366f4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/50] [abbrv] ignite git commit: IGNITE-4284 - Fix. Failed second client node join with continuous query and peer class loading enabled. archived-at: Thu, 06 Apr 2017 09:31:55 -0000 IGNITE-4284 - Fix. Failed second client node join with continuous query and peer class loading enabled. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/357c20ab Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/357c20ab Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/357c20ab Branch: refs/heads/ignite-4003 Commit: 357c20ab9593390fb7af25f8638188595c5f6cd4 Parents: b689624 Author: dkarachentsev Authored: Thu Mar 30 12:50:42 2017 +0300 Committer: dkarachentsev Committed: Thu Mar 30 12:50:42 2017 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 39 ++--- .../ignite/custom/DummyEventFilterFactory.java | 47 ++++++ .../ContinuousQueryPeerClassLoadingTest.java | 142 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite3.java | 2 + 4 files changed, 214 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 9fd9b6d..6a4f57d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; @@ -495,29 +496,34 @@ public class GridContinuousProcessor extends GridProcessorAdapter { for (Map.Entry> entry : data.clientInfos.entrySet()) { UUID clientNodeId = entry.getKey(); - if (!ctx.localNodeId().equals(clientNodeId)) { + if (!ctx.clientNode()) { Map clientRoutineMap = entry.getValue(); for (Map.Entry e : clientRoutineMap.entrySet()) { UUID routineId = e.getKey(); LocalRoutineInfo info = e.getValue(); - try { - if (info.prjPred != null) - ctx.resource().injectGeneric(info.prjPred); - - if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { - registerHandler(clientNodeId, - routineId, - info.hnd, - info.bufSize, - info.interval, - info.autoUnsubscribe, - false); + GridCacheContext cctx = ctx.cache().context().cacheContext(CU.cacheId(info.hnd.cacheName())); + + // Do not register handler if it's not affinity node. + if (cctx == null || cctx.affinityNode()) { + try { + if (info.prjPred != null) + ctx.resource().injectGeneric(info.prjPred); + + if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { + registerHandler(clientNodeId, + routineId, + info.hnd, + info.bufSize, + info.interval, + info.autoUnsubscribe, + false); + } + } + catch (IgniteCheckedException err) { + U.error(log, "Failed to register continuous handler.", err); } - } - catch (IgniteCheckedException err) { - U.error(log, "Failed to register continuous handler.", err); } } } @@ -583,6 +589,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @return Routine ID. * @throws IgniteCheckedException If failed. */ + @SuppressWarnings("unchecked") public UUID registerStaticRoutine( String cacheName, CacheEntryUpdatedListener locLsnr, http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java new file mode 100644 index 0000000..e0688bc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.custom; + +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListenerException; + +/** + * Must be not in org.apache.ignite.internal + */ +public class DummyEventFilterFactory implements Factory> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public CacheEntryEventFilter create() { + return new DummyEventFilter(); + } + + /** + * + */ + private static class DummyEventFilter implements CacheEntryEventFilter { + /** {@inheritDoc} */ + @Override public boolean evaluate( + final CacheEntryEvent evt) throws CacheEntryListenerException { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java new file mode 100644 index 0000000..e5d1d60 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.custom.DummyEventFilterFactory; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Checks if filter factory correctly deployed on all nodes. + */ +public class ContinuousQueryPeerClassLoadingTest extends GridCommonAbstractTest { + /** */ + public static final String CACHE_NAME = "test-cache"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(true); + cfg.setClientMode(gridName.contains("client")); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteFilterFactoryClient() throws Exception { + check("server", "client1", "client2"); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteFilterFactoryServer1() throws Exception { + check("server1", "server2", "client"); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteFilterFactoryServer2() throws Exception { + check("server1", "server2", "server3"); + } + + /** + * @param node1Name Node 1 name. + * @param node2Name Node 2 name. + * @param node3Name Node 3 name. + */ + private void check(String node1Name, String node2Name, String node3Name) throws Exception { + final Ignite node1 = startGrid(node1Name); + + final IgniteCache cache = node1.getOrCreateCache(CACHE_NAME); + + for (int i = 0; i < 10; i++) + cache.put(i, String.valueOf(i)); + + final Ignite node2 = startGrid(node2Name); + + final ContinuousQuery qry1 = new ContinuousQuery<>(); + final ContinuousQuery qry2 = new ContinuousQuery<>(); + + qry1.setRemoteFilterFactory(new DummyEventFilterFactory()); + qry2.setRemoteFilterFactory(new DummyEventFilterFactory()); + + final AtomicInteger client1Evts = new AtomicInteger(0); + final AtomicInteger client2Evts = new AtomicInteger(0); + + final CountDownLatch latch1 = new CountDownLatch(20); + final CountDownLatch latch2 = new CountDownLatch(10); + + qry1.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated( + final Iterable> evts) throws CacheEntryListenerException { + System.out.println(">> Client 1 events " + evts); + for (CacheEntryEvent evt : evts) + latch1.countDown(); + } + }); + + qry2.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated( + final Iterable> evts) throws CacheEntryListenerException { + System.out.println(">> Client 2 events " + evts); + for (CacheEntryEvent evt : evts) + latch2.countDown(); + } + }); + + final IgniteCache cache1 = node2.cache(CACHE_NAME); + + cache1.query(qry1); + + for (int i = 10; i < 20; i++) + cache.put(i, String.valueOf(i)); + + // Fail on start second client. + final Ignite node3 = startGrid(node3Name); + + final IgniteCache cache2 = node3.cache(CACHE_NAME); + + cache2.query(qry2); + + for (int i = 20; i < 30; i++) + cache.put(i, String.valueOf(i)); + + assert latch1.await(5, TimeUnit.SECONDS) : latch1.getCount(); + assert latch2.await(5, TimeUnit.SECONDS) : latch2.getCount(); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index a865788..6b2fea0 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest; +import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryPeerClassLoadingTest; import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest; @@ -123,6 +124,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class); suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class); suite.addTestSuite(IgniteCacheContinuousQueryNoUnsubscribeTest.class); + suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class); return suite; }