Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4D75C200C1A for ; Mon, 13 Feb 2017 12:20:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4BEB4160B60; Mon, 13 Feb 2017 11:20:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2068F160B4D for ; Mon, 13 Feb 2017 12:20:06 +0100 (CET) Received: (qmail 65305 invoked by uid 500); 13 Feb 2017 11:20:06 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 65296 invoked by uid 99); 13 Feb 2017 11:20:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Feb 2017 11:20:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 33B2CDFC68; Mon, 13 Feb 2017 11:20:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-4664 - Added lifecycle and injection support for TopologyValidator. Fixes #1514 Date: Mon, 13 Feb 2017 11:20:06 +0000 (UTC) archived-at: Mon, 13 Feb 2017 11:20:08 -0000 Repository: ignite Updated Branches: refs/heads/master 262a34108 -> 3ef7a0e03 IGNITE-4664 - Added lifecycle and injection support for TopologyValidator. Fixes #1514 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3ef7a0e0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3ef7a0e0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3ef7a0e0 Branch: refs/heads/master Commit: 3ef7a0e03b9cb36fb4037cb075512adac95cc3f7 Parents: 262a341 Author: Aleksei Scherbakov Authored: Mon Feb 13 14:19:27 2017 +0300 Committer: Alexey Goncharuk Committed: Mon Feb 13 14:19:27 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 5 + .../cache/GridCacheLifecycleAwareSelfTest.java | 33 ++ ...niteTopologyValidatorGridSplitCacheTest.java | 334 +++++++++++++++++++ .../IgniteTopologyValidatorTestSuit.java | 1 + 4 files changed, 373 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3ef7a0e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index b0a78f4..7093403 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -528,6 +528,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { prepare(cfg, cfg.getAffinityMapper(), false); prepare(cfg, cfg.getEvictionFilter(), false); prepare(cfg, cfg.getInterceptor(), false); + prepare(cfg, cfg.getTopologyValidator(), false); NearCacheConfiguration nearCfg = cfg.getNearConfiguration(); @@ -563,6 +564,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { cleanup(cfg, cfg.getEvictionPolicy(), false); cleanup(cfg, cfg.getAffinity(), false); cleanup(cfg, cfg.getAffinityMapper(), false); + cleanup(cfg, cfg.getEvictionFilter(), false); + cleanup(cfg, cfg.getInterceptor(), false); + cleanup(cfg, cfg.getTopologyValidator(), false); cleanup(cfg, cctx.store().configuredStore(), false); if (!CU.isUtilityCache(cfg.getName()) && !CU.isSystemCache(cfg.getName())) { @@ -3561,6 +3565,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ret.add(ccfg.getEvictionFilter()); ret.add(ccfg.getEvictionPolicy()); ret.add(ccfg.getInterceptor()); + ret.add(ccfg.getTopologyValidator()); NearCacheConfiguration nearCfg = ccfg.getNearConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3ef7a0e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java index 81a6433..aa31ff9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.UUID; import javax.cache.Cache; import javax.cache.integration.CacheLoaderException; +import org.apache.ignite.Ignite; import org.apache.ignite.cache.CacheInterceptor; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.AffinityFunction; @@ -39,10 +40,12 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.configuration.TopologyValidator; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.resources.CacheNameResource; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.junits.common.GridAbstractLifecycleAwareSelfTest; import org.jetbrains.annotations.Nullable; @@ -256,6 +259,30 @@ public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareS } } + /** + */ + private static class TestTopologyValidator extends TestLifecycleAware implements TopologyValidator { + @IgniteInstanceResource + private Ignite ignite; + + /** + */ + public TestTopologyValidator() { + super(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Override public boolean validate(Collection nodes) { + return false; + } + + @Override public void start() { + super.start(); + + assertNotNull(ignite); + } + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -324,6 +351,12 @@ public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareS ccfg.setInterceptor(interceptor); + TestTopologyValidator topValidator = new TestTopologyValidator(); + + lifecycleAwares.add(topValidator); + + ccfg.setTopologyValidator(topValidator); + cfg.setCacheConfiguration(ccfg); return cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/3ef7a0e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java new file mode 100644 index 0000000..3593ad6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.CacheNameResource; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * Tests complex scenario with topology validator. + * Grid is split between to data centers, defined by attribute {@link #DC_NODE_ATTR}. + * If only nodes from single DC are left in topology, grid is moved into inoperative state until special + * activator node'll enter a topology, enabling grid operations. + */ +public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstractTest { + /** */ + private static final String DC_NODE_ATTR = "dc"; + + /** */ + private static final String ACTIVATOR_NODE_ATTR = "split.resolved"; + + /** */ + private static final int GRID_CNT = 4; + + /** */ + private static final int CACHES_CNT = 10; + + /** */ + private static final int RESOLVER_GRID_IDX = GRID_CNT; + + /** */ + private static final int CONFIGLESS_GRID_IDX = GRID_CNT + 1; + + /** */ + private static CountDownLatch initLatch = new CountDownLatch(GRID_CNT); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + int idx = getTestGridIndex(gridName); + + cfg.setUserAttributes(F.asMap(DC_NODE_ATTR, idx % 2)); + + if (idx != CONFIGLESS_GRID_IDX) { + if (idx == RESOLVER_GRID_IDX) { + cfg.setClientMode(true); + + cfg.setUserAttributes(F.asMap(ACTIVATOR_NODE_ATTR, "true")); + } + else { + CacheConfiguration[] ccfgs = new CacheConfiguration[CACHES_CNT]; + + for (int cnt = 0; cnt < CACHES_CNT; cnt++) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(testCacheName(cnt)); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(0); + ccfg.setTopologyValidator(new SplitAwareTopologyValidator()); + + ccfgs[cnt] = ccfg; + } + + cfg.setCacheConfiguration(ccfgs); + } + } + + return cfg; + } + + /** + * @param idx Index. + */ + private String testCacheName(int idx) { + return "test" + idx; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * Tests topology split scenario. + * @throws Exception + */ + public void testTopologyValidator() throws Exception { + assertTrue(initLatch.await(10, TimeUnit.SECONDS)); + + // Tests what each node is able to do puts. + tryPut(0, 1, 2, 3); + + clearAll(); + + stopGrid(1); + + stopGrid(3); + + awaitPartitionMapExchange(); + + try { + tryPut(0, 2); + + fail(); + } + catch (Exception e) { + // No-op. + } + + resolveSplit(); + + tryPut(0, 2); + + clearAll(); + + startGrid(CONFIGLESS_GRID_IDX); + + awaitPartitionMapExchange(); + + tryPut(CONFIGLESS_GRID_IDX); + + stopGrid(CONFIGLESS_GRID_IDX); + + awaitPartitionMapExchange(); + + try { + tryPut(0, 2); + + fail(); + } + catch (Exception e) { + // No-op. + } + + resolveSplit(); + + tryPut(0, 2); + + clearAll(); + + startGrid(1); + + awaitPartitionMapExchange(); + + tryPut(0, 1, 2); + } + + /** */ + private void clearAll() { + for (int i = 0; i < CACHES_CNT; i++) + grid(0).cache(testCacheName(i)).clear(); + } + + /** + * Resolves split by client node join. + */ + private void resolveSplit() throws Exception { + startGrid(RESOLVER_GRID_IDX); + + stopGrid(RESOLVER_GRID_IDX); + } + + /** + * @param grids Grids to test. + */ + private void tryPut(int... grids) { + for (int i = 0; i < grids.length; i++) { + IgniteEx g = grid(grids[i]); + + for (int cnt = 0; cnt < CACHES_CNT; cnt++) { + String cacheName = testCacheName(cnt); + + for (int k = 0; k < 100; k++) { + if (g.affinity(cacheName).isPrimary(g.localNode(), k)) { + log().info("Put " + k + " to node " + g.localNode().id().toString()); + + IgniteCache cache = g.cache(cacheName); + + cache.put(k, k); + + assertEquals(1, cache.localSize()); + + break; + } + } + } + } + } + + /** + * Prevents cache from performing any operation if only nodes from single data center are left in topology. + */ + private static class SplitAwareTopologyValidator implements TopologyValidator, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + @CacheNameResource + private String cacheName; + + @IgniteInstanceResource + private Ignite ignite; + + @LoggerResource + private IgniteLogger log; + + /** */ + private transient volatile long activatorTopVer; + + /** {@inheritDoc} */ + @Override public boolean validate(Collection nodes) { + if (!F.view(nodes, new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return !node.isClient() && node.attribute(DC_NODE_ATTR) == null; + } + }).isEmpty()) + return false; + + IgniteKernal kernal = (IgniteKernal)ignite.cache(cacheName).unwrap(Ignite.class); + + GridDhtCacheAdapter dht = kernal.context().cache().internalCache(cacheName).context().dht(); + + long cacheTopVer = dht.topology().topologyVersionFuture().topologyVersion().topologyVersion(); + + if (hasSplit(nodes)) { + boolean resolved = activatorTopVer != 0 && cacheTopVer >= activatorTopVer; + + if (!resolved) + log.info("Grid segmentation is detected, switching to inoperative state."); + + return resolved; + } + else + activatorTopVer = 0; + + return true; + } + + /** */ + private boolean hasSplit(Collection nodes) { + ClusterNode prev = null; + + for (ClusterNode node : nodes) { + if (node.isClient()) + continue; + + if (prev != null && + !prev.attribute(DC_NODE_ATTR).equals(node.attribute(DC_NODE_ATTR))) + return false; + + prev = node; + } + + return true; + } + + @Override public void start() throws IgniteException { + if (ignite.cluster().localNode().isClient()) + return; + + initLatch.countDown(); + + ignite.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + ClusterNode node = discoEvt.eventNode(); + + if (isMarkerNode(node)) + activatorTopVer = discoEvt.topologyVersion(); + + return true; + } + }, EventType.EVT_NODE_LEFT); + } + + /** + * @param node Node. + */ + private boolean isMarkerNode(ClusterNode node) { + return node.isClient() && node.attribute(ACTIVATOR_NODE_ATTR) != null; + } + + @Override public void stop() throws IgniteException { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3ef7a0e0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java index b100127..8c4cd11 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteTopologyValidatorTestSuit.java @@ -37,6 +37,7 @@ public class IgniteTopologyValidatorTestSuit extends TestSuite { suite.addTest(new TestSuite(IgniteTopologyValidatorPartitionedTxCacheTest.class)); suite.addTest(new TestSuite(IgniteTopologyValidatorReplicatedAtomicCacheTest.class)); suite.addTest(new TestSuite(IgniteTopologyValidatorReplicatedTxCacheTest.class)); + suite.addTest(new TestSuite(IgniteTopologyValidatorGridSplitCacheTest.class)); return suite; }