Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1EC7F200C0E for ; Tue, 17 Jan 2017 15:04:43 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1D815160B43; Tue, 17 Jan 2017 14:04:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 98668160B5C for ; Tue, 17 Jan 2017 15:04:40 +0100 (CET) Received: (qmail 95214 invoked by uid 500); 17 Jan 2017 14:04:39 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 93785 invoked by uid 99); 17 Jan 2017 14:04:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jan 2017 14:04:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 81CFEE964D; Tue, 17 Jan 2017 14:04:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 17 Jan 2017 14:05:05 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] ignite git commit: IGNITE-4424 REPLICATED cache isn't synced across nodes archived-at: Tue, 17 Jan 2017 14:04:43 -0000 IGNITE-4424 REPLICATED cache isn't synced across nodes Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/82dd9128 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/82dd9128 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/82dd9128 Branch: refs/heads/ignite-gg-11810-1 Commit: 82dd912889b0dfca213edb1374c1fa0ed79411fd Parents: 27ba69b Author: Anton Vinogradov Authored: Fri Dec 30 13:41:34 2016 +0300 Committer: Anton Vinogradov Committed: Mon Jan 16 12:18:14 2017 +0300 ---------------------------------------------------------------------- .../GridNearAtomicAbstractUpdateFuture.java | 34 ++- .../GridNearAtomicSingleUpdateFuture.java | 44 ++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 57 ++--- .../AtomicPutAllChangingTopologyTest.java | 212 +++++++++++++++++++ .../IgniteCacheFailoverTestSuite.java | 3 + 5 files changed, 284 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/82dd9128/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 2fbabaa..c92e0f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -212,14 +212,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt // Cannot remap. remapCnt = 1; - map(topVer); + GridCacheVersion futVer = addAtomicFuture(topVer); + + if (futVer != null) + map(topVer, futVer); } } /** * @param topVer Topology version. + * @param futVer Future version */ - protected abstract void map(AffinityTopologyVersion topVer); + protected abstract void map(AffinityTopologyVersion topVer, GridCacheVersion futVer); /** * Maps future on ready topology. @@ -302,7 +306,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @param req Request. * @param e Error. */ - protected void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { + protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { synchronized (mux) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), req.nodeId(), @@ -314,4 +318,28 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt onResult(req.nodeId(), res, true); } } + + /** + * Adds future prevents topology change before operation complete. + * Should be invoked before topology lock released. + * + * @param topVer Topology version. + * @return Future version in case future added. + */ + protected final GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) { + GridCacheVersion futVer = cctx.versions().next(topVer); + + synchronized (mux) { + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + this.futVer = futVer; + } + + if (storeFuture() && !cctx.mvcc().addAtomicFuture(futVer, this)) + return null; + + return futVer; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/82dd9128/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index bd231cf..7376aff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -348,14 +348,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda @Override public void apply(final IgniteInternalFuture fut) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - try { - AffinityTopologyVersion topVer = fut.get(); - - map(topVer); - } - catch (IgniteCheckedException e) { - onDone(e); - } + mapOnTopology(); } }); } @@ -388,7 +381,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda @Override protected void mapOnTopology() { cache.topology().readLock(); - AffinityTopologyVersion topVer = null; + AffinityTopologyVersion topVer; + + GridCacheVersion futVer; try { if (cache.topology().stopping()) { @@ -410,6 +405,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } topVer = fut.topologyVersion(); + + futVer = addAtomicFuture(topVer); } else { if (waitTopFut) { @@ -435,11 +432,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda cache.topology().readUnlock(); } - map(topVer); + if (futVer != null) + map(topVer, futVer); } /** {@inheritDoc} */ - protected void map(AffinityTopologyVersion topVer) { + @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) { Collection topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -449,11 +447,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return; } - Exception err = null; - GridNearAtomicAbstractUpdateRequest singleReq0 = null; - - GridCacheVersion futVer = cctx.versions().next(topVer); - GridCacheVersion updVer; // Assign version on near node in CLOCK ordering mode even if fastMap is false. @@ -470,16 +463,17 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else updVer = null; + Exception err = null; + GridNearAtomicAbstractUpdateRequest singleReq0 = null; + try { singleReq0 = mapSingleUpdate(topVer, futVer, updVer); synchronized (mux) { - assert this.futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; + assert this.futVer == futVer || (this.isDone() && this.error() != null); + assert this.topVer == topVer; - this.topVer = topVer; this.updVer = updVer; - this.futVer = futVer; resCnt = 0; @@ -496,14 +490,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return; } - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, this)) { - assert isDone() : this; - - return; - } - } - // Optimize mapping for single key. mapSingle(singleReq0.nodeId(), singleReq0); } @@ -511,7 +497,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** * @return Future version. */ - GridCacheVersion onFutureDone() { + private GridCacheVersion onFutureDone() { GridCacheVersion ver0; GridFutureAdapter fut0; http://git-wip-us.apache.org/repos/asf/ignite/blob/82dd9128/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index cd64117..950e5bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -456,14 +456,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu @Override public void apply(final IgniteInternalFuture fut) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - try { - AffinityTopologyVersion topVer = fut.get(); - - map(topVer, remapKeys); - } - catch (IgniteCheckedException e) { - onDone(e); - } + mapOnTopology(); } }); } @@ -497,7 +490,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu @Override protected void mapOnTopology() { cache.topology().readLock(); - AffinityTopologyVersion topVer = null; + AffinityTopologyVersion topVer; + + GridCacheVersion futVer; try { if (cache.topology().stopping()) { @@ -519,6 +514,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } topVer = fut.topologyVersion(); + + futVer = addAtomicFuture(topVer); } else { if (waitTopFut) { @@ -544,7 +541,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu cache.topology().readUnlock(); } - map(topVer, null); + if (futVer != null) + map(topVer, futVer, remapKeys); } /** @@ -602,15 +600,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - protected void map(AffinityTopologyVersion topVer) { - map(topVer, null); + @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) { + map(topVer, futVer, null); } /** * @param topVer Topology version. + * @param futVer Future ID. * @param remapKeys Keys to remap. */ - void map(AffinityTopologyVersion topVer, @Nullable Collection remapKeys) { + void map(AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable Collection remapKeys) { Collection topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -620,14 +621,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } - Exception err = null; - GridNearAtomicFullUpdateRequest singleReq0 = null; - Map mappings0 = null; - - int size = keys.size(); - - GridCacheVersion futVer = cctx.versions().next(topVer); - GridCacheVersion updVer; // Assign version on near node in CLOCK ordering mode even if fastMap is false. @@ -644,6 +637,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu else updVer = null; + Exception err = null; + GridNearAtomicFullUpdateRequest singleReq0 = null; + Map mappings0 = null; + + int size = keys.size(); + try { if (size == 1 && !fastMap) { assert remapKeys == null || remapKeys.size() == 1; @@ -676,12 +675,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } synchronized (mux) { - assert this.futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; + assert this.futVer == futVer || (this.isDone() && this.error() != null); + assert this.topVer == topVer; - this.topVer = topVer; this.updVer = updVer; - this.futVer = futVer; resCnt = 0; @@ -701,14 +698,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, this)) { - assert isDone() : this; - - return; - } - } - // Optimize mapping for single key. if (singleReq0 != null) mapSingle(singleReq0.nodeId(), singleReq0); @@ -725,7 +714,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** * @return Future version. */ - GridCacheVersion onFutureDone() { + private GridCacheVersion onFutureDone() { GridCacheVersion ver0; GridFutureAdapter fut0; http://git-wip-us.apache.org/repos/asf/ignite/blob/82dd9128/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java new file mode 100644 index 0000000..878cb17 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/AtomicPutAllChangingTopologyTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CachePeekMode.BACKUP; +import static org.apache.ignite.cache.CachePeekMode.PRIMARY; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** */ +public class AtomicPutAllChangingTopologyTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES_CNT = 3; + + /** */ + public static final String CACHE_NAME = "test-cache"; + + /** */ + private static final int CACHE_SIZE = 20_000; + + /** */ + private static volatile CountDownLatch FILLED_LATCH; + + /** + * @return Cache configuration. + */ + private CacheConfiguration cacheConfig() { + return new CacheConfiguration() + .setAtomicityMode(ATOMIC) + .setCacheMode(REPLICATED) + .setAffinity(new FairAffinityFunction(false, 1)) + .setWriteSynchronizationMode(FULL_SYNC) + .setRebalanceMode(SYNC) + .setName(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPutAllOnChangingTopology() throws Exception { + List futs = new LinkedList<>(); + + for (int i = 1; i < NODES_CNT; i++) + futs.add(startNodeAsync(i)); + + futs.add(startSeedNodeAsync()); + + boolean failed = false; + + for (IgniteInternalFuture fut : futs) { + try { + fut.get(); + } + catch (Throwable th) { + log.error("Check failed.", th); + + failed = true; + } + } + + if (failed) + throw new RuntimeException("Test Failed."); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + FILLED_LATCH = new CountDownLatch(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @return Future. + * @throws IgniteCheckedException If failed. + */ + private IgniteInternalFuture startSeedNodeAsync() throws IgniteCheckedException { + return GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + Ignite node = startGrid(0); + + log.info("Creating cache."); + + IgniteCache cache = node.getOrCreateCache(cacheConfig()); + + log.info("Created cache."); + + Map data = new HashMap<>(CACHE_SIZE); + + for (int i = 0; i < CACHE_SIZE; i++) + data.put(i, i); + + log.info("Filling."); + + cache.putAll(data); + + log.info("Filled."); + + FILLED_LATCH.countDown(); + + checkCacheState(node, cache); + + return true; + } + }); + } + + /** + * @param nodeId Node index. + * @return Future. + * @throws IgniteCheckedException If failed. + */ + private IgniteInternalFuture startNodeAsync(final int nodeId) throws IgniteCheckedException { + return GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + Ignite node = startGrid(nodeId); + + log.info("Getting cache."); + + IgniteCache cache = node.getOrCreateCache(cacheConfig()); + + log.info("Got cache."); + + FILLED_LATCH.await(); + + log.info("Got Filled."); + + cache.put(1, nodeId); + + checkCacheState(node, cache); + + return true; + } + }); + } + + /** + * @param node Node. + * @param cache Cache. + * @throws Exception If failed. + */ + private void checkCacheState(Ignite node, IgniteCache cache) throws Exception { + int locSize = cache.localSize(PRIMARY, BACKUP); + int locSize2 = -1; + + if (locSize != CACHE_SIZE) { + U.sleep(5000); + + // Rechecking. + locSize2 = cache.localSize(PRIMARY, BACKUP); + } + + assertEquals("Wrong cache size on node [node=" + node.configuration().getGridName() + + ", expected= " + CACHE_SIZE + + ", actual=" + locSize + + ", actual2=" + locSize2 + "]", + locSize, CACHE_SIZE); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/82dd9128/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index 26cea39..986b8d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtR import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheTxNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridNearCacheTxNodeFailureSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteAtomicLongChangingTopologySelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.AtomicPutAllChangingTopologyTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientInvalidPartitionHandlingSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicClientRemoveFailureTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCacheAtomicInvalidPartitionHandlingSelfTest; @@ -97,6 +98,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { suite.addTestSuite(GridCacheTxNodeFailureSelfTest.class); suite.addTestSuite(GridNearCacheTxNodeFailureSelfTest.class); + suite.addTestSuite(AtomicPutAllChangingTopologyTest.class); + return suite; } }