From commits-return-121587-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Wed Nov 7 12:27:24 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 231A1180649 for ; Wed, 7 Nov 2018 12:27:22 +0100 (CET) Received: (qmail 6351 invoked by uid 500); 7 Nov 2018 11:27:22 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 6342 invoked by uid 99); 7 Nov 2018 11:27:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Nov 2018 11:27:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C9B05E0AD2; Wed, 7 Nov 2018 11:27:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-9975 Fixed LOST partitions handling - Fixes #5214. Date: Wed, 7 Nov 2018 11:27:21 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/master dbff7e263 -> b9cec42a0 IGNITE-9975 Fixed LOST partitions handling - Fixes #5214. Signed-off-by: Alexey Goncharuk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b9cec42a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b9cec42a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b9cec42a Branch: refs/heads/master Commit: b9cec42a08a2e4608c7a76ddbc401f88feed1b5b Parents: dbff7e2 Author: Anton Kalashnikov Authored: Wed Nov 7 14:25:48 2018 +0300 Committer: Alexey Goncharuk Committed: Wed Nov 7 14:25:48 2018 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 10 +- .../dht/preloader/GridDhtPreloader.java | 3 + .../topology/GridDhtPartitionTopologyImpl.java | 9 +- .../ignite/cache/ResetLostPartitionTest.java | 260 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite5.java | 1 - .../ignite/testsuites/IgnitePdsTestSuite4.java | 3 + 6 files changed, 282 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cec42a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 94116fe..fa6f278 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -548,6 +548,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param cacheOrGroupName Group or cache name for reset lost partitions. + * @return {@code True} if reset lost partition exchange. + */ + public boolean resetLostPartitionFor(String cacheOrGroupName) { + return exchActions != null && exchActions.cachesToResetLostPartitions().contains(cacheOrGroupName); + } + + /** * @return {@code True} if activate cluster exchange. */ public boolean activateCluster() { @@ -2165,8 +2173,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - exchActions = null; - if (firstDiscoEvt instanceof DiscoveryCustomEvent) ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cec42a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index f6032bb..eed0816 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -167,6 +167,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (ctx.kernalContext().clientNode() || rebTopVer.equals(AffinityTopologyVersion.NONE)) return false; // No-op. + if (exchFut.resetLostPartitionFor(grp.cacheOrGroupName())) + return true; + if (exchFut.localJoinExchange()) return true; // Required, can have outdated updSeq partition counter if node reconnects. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cec42a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index 01f3d6c..45f3282 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -2129,8 +2129,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locPart != null && locPart.state() == LOST) { boolean marked = locPart.own(); - if (marked) + if (marked) { updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); + + long updateCntr = locPart.updateCounter(); + + //Set update counters to 0, for full rebalance. + locPart.updateCounter(updateCntr, -updateCntr); + locPart.initialUpdateCounter(0); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cec42a/modules/core/src/test/java/org/apache/ignite/cache/ResetLostPartitionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/ResetLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/cache/ResetLostPartitionTest.java new file mode 100644 index 0000000..6f1e78b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/ResetLostPartitionTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; + +/** + * + */ +public class ResetLostPartitionTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** Cache name. */ + private static final String[] CACHE_NAMES = {"cacheOne", "cacheTwo", "cacheThree"}; + /** Cache size */ + public static final int CACHE_SIZE = 100000 / CACHE_NAMES.length; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + DataStorageConfiguration storageCfg = new DataStorageConfiguration(); + + storageCfg.getDefaultDataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(300L * 1024 * 1024); + + cfg.setDataStorageConfiguration(storageCfg); + + CacheConfiguration[] ccfg = new CacheConfiguration[] { + cacheConfiguration(CACHE_NAMES[0], CacheAtomicityMode.ATOMIC), + cacheConfiguration(CACHE_NAMES[1], CacheAtomicityMode.ATOMIC), + cacheConfiguration(CACHE_NAMES[2], CacheAtomicityMode.TRANSACTIONAL) + }; + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @param mode Cache atomicity mode. + * @return Configured cache configuration. + */ + private CacheConfiguration cacheConfiguration(String cacheName, CacheAtomicityMode mode) { + return new CacheConfiguration<>(cacheName) + .setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(mode) + .setBackups(1) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setPartitionLossPolicy(PartitionLossPolicy.READ_ONLY_SAFE) + .setAffinity(new RendezvousAffinityFunction(false, 1024)) + .setIndexedTypes(String.class, String.class); + } + + /** Client configuration */ + private IgniteConfiguration getClientConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(true); + cfg.setClientMode(true); + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + return cfg; + } + + /** + * Test to restore lost partitions after grid reactivation. + * + * @throws Exception if fail. + */ + public void testReactivateGridBeforeResetLostPartitions() throws Exception { + doRebalanceAfterPartitionsWereLost(true); + } + + /** + * Test to restore lost partitions on working grid. + * + * @throws Exception if fail. + */ + public void testResetLostPartitions() throws Exception { + doRebalanceAfterPartitionsWereLost(false); + } + + /** + * @param reactivateGridBeforeResetPart Reactive grid before try to reset lost partitions. + * @throws Exception if fail. + */ + private void doRebalanceAfterPartitionsWereLost(boolean reactivateGridBeforeResetPart) throws Exception { + startGrids(3); + + grid(0).cluster().active(true); + + Ignite igniteClient = startGrid(getClientConfiguration("client")); + + for (String cacheName : CACHE_NAMES) { + IgniteCache cache = igniteClient.cache(cacheName); + + for (int j = 0; j < CACHE_SIZE; j++) + cache.put(j, "Value" + j); + } + + stopGrid("client"); + + String dn2DirName = grid(1).name().replace(".", "_"); + + stopGrid(1); + + //Clean up the pds and WAL for second data node. + U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR + "/" + dn2DirName, true)); + + U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR + "/wal/" + dn2DirName, true)); + + //Here we have two from three data nodes and cache with 1 backup. So there is no data loss expected. + assertEquals(CACHE_NAMES.length * CACHE_SIZE, averageSizeAroundAllNodes()); + + //Start node 2 with empty PDS. Rebalance will be started. + startGrid(1); + + //During rebalance stop node 3. Rebalance will be stopped. + stopGrid(2); + + //Start node 3. + startGrid(2); + + //Loss data expected because rebalance to node 1 have not finished and node 2 was stopped. + assertTrue(CACHE_NAMES.length * CACHE_SIZE > averageSizeAroundAllNodes()); + + for (String cacheName : CACHE_NAMES) { + //Node 1 will have only OWNING partitions. + assertTrue(getPartitionsStates(0, cacheName).stream().allMatch(state -> state == OWNING)); + + //Node 3 will have only OWNING partitions. + assertTrue(getPartitionsStates(2, cacheName).stream().allMatch(state -> state == OWNING)); + } + + boolean hasLost = false; + for (String cacheName : CACHE_NAMES) { + //Node 2 will have OWNING and LOST partitions. + hasLost |= getPartitionsStates(1, cacheName).stream().anyMatch(state -> state == LOST); + } + + assertTrue(hasLost); + + if (reactivateGridBeforeResetPart) { + grid(0).cluster().active(false); + grid(0).cluster().active(true); + } + + //Try to reset lost partitions. + grid(2).resetLostPartitions(Arrays.asList(CACHE_NAMES)); + + awaitPartitionMapExchange(); + + for (String cacheName : CACHE_NAMES) { + //Node 2 will have only OWNING partitions. + assertTrue(getPartitionsStates(1, cacheName).stream().allMatch(state -> state == OWNING)); + } + + //All data was back. + assertEquals(CACHE_NAMES.length * CACHE_SIZE, averageSizeAroundAllNodes()); + + //Stop node 2 for checking rebalance correctness from this node. + stopGrid(2); + + //Rebalance should be successfully finished. + assertEquals(CACHE_NAMES.length * CACHE_SIZE, averageSizeAroundAllNodes()); + } + + /** + * @param gridNumber Grid number. + * @param cacheName Cache name. + * @return Partitions states for given cache name. + */ + private List getPartitionsStates(int gridNumber, String cacheName) { + CacheGroupContext cgCtx = grid(gridNumber).context().cache().cacheGroup(CU.cacheId(cacheName)); + + GridDhtPartitionTopologyImpl top = (GridDhtPartitionTopologyImpl)cgCtx.topology(); + + return top.localPartitions().stream() + .map(GridDhtLocalPartition::state) + .collect(Collectors.toList()); + } + + /** + * Checks that all nodes see the correct size. + */ + private int averageSizeAroundAllNodes() { + int totalSize = 0; + + for (Ignite ignite : IgnitionEx.allGrids()) { + for (String cacheName : CACHE_NAMES) { + totalSize += ignite.cache(cacheName).size(); + } + } + + return totalSize / IgnitionEx.allGrids().size(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cec42a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index a583317..39f3cea 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -106,7 +106,6 @@ public class IgniteCacheTestSuite5 extends TestSuite { suite.addTestSuite(ConcurrentCacheStartTest.class); - //suite.addTestSuite(GridCacheAtomicPreloadSelfTest.class); //suite.addTestSuite(IgniteCacheContainsKeyColocatedAtomicSelfTest.class); //suite.addTestSuite(IgniteCacheContainsKeyNearAtomicSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cec42a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java index 64615e2..027f341 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.cache.ResetLostPartitionTest; import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest; @@ -47,6 +48,8 @@ public class IgnitePdsTestSuite4 extends TestSuite { suite.addTestSuite(IgnitePdsPartitionPreloadTest.class); + suite.addTestSuite(ResetLostPartitionTest.class); + return suite; }