Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C80BC185F5 for ; Fri, 18 Sep 2015 02:29:10 +0000 (UTC) Received: (qmail 66985 invoked by uid 500); 18 Sep 2015 02:29:10 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 66853 invoked by uid 500); 18 Sep 2015 02:29:10 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 66195 invoked by uid 99); 18 Sep 2015 02:29:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Sep 2015 02:29:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4AFCDE10F5; Fri, 18 Sep 2015 02:29:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Fri, 18 Sep 2015 02:29:31 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/50] [abbrv] ignite git commit: IGNITE-1482 - Fixed incorrect cache value for replace() on changing topology. IGNITE-1482 - Fixed incorrect cache value for replace() on changing topology. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/367d805d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/367d805d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/367d805d Branch: refs/heads/ignite-843 Commit: 367d805d10ea071532fe99c6b67cfc97cc8f2fb9 Parents: 91dd7c1 Author: sboikov Authored: Tue Sep 15 14:54:20 2015 +0300 Committer: sboikov Committed: Tue Sep 15 14:54:20 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedTxRemoteAdapter.java | 8 +-- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../IgniteCacheEntryProcessorNodeJoinTest.java | 73 ++++++++++++++++++++ 3 files changed, 78 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index c930d88..f969737 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -521,7 +521,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (updateNearCache(cacheCtx, txEntry.key(), topVer)) nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); - if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) + if (!F.isEmpty(txEntry.entryProcessors())) txEntry.cached().unswap(false); IgniteBiTuple res = @@ -573,12 +573,12 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Invalidate only for near nodes (backups cannot be invalidated). if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, - topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, + topVer, null, replicate ? DR_BACKUP : DR_NONE, near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); else { cached.innerSet(this, eventNodeId(), nodeId, val, false, false, - txEntry.ttl(), true, true, topVer, txEntry.filters(), + txEntry.ttl(), true, true, topVer, null, replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(), near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); @@ -598,7 +598,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } else if (op == DELETE) { cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, - topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, + topVer, null, replicate ? DR_BACKUP : DR_NONE, near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); // Keep near entry up to date. http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 89fc0ae..81cc272 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -842,7 +842,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture> map ) { - if (retVal || !F.isEmpty(e.entryProcessors())) { + if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters())) { if (map == null) map = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java index af9477e..6b4d473 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java @@ -30,7 +30,9 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -201,6 +203,77 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes } } + /** + * @throws Exception If failed. + */ + public void testReplaceNodeJoin() throws Exception { + final AtomicReference error = new AtomicReference<>(); + final int started = 6; + + try { + int keys = 100; + + final AtomicBoolean done = new AtomicBoolean(false); + + for (int i = 0; i < keys; i++) + ignite(0).cache(null).put(i, 0); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + for (int i = 0; i < started; i++) { + U.sleep(1_000); + + IgniteEx grid = startGrid(GRID_CNT + i); + + info("Test started grid [idx=" + (GRID_CNT + i) + ", nodeId=" + grid.localNode().id() + ']'); + } + } + catch (Exception e) { + error.compareAndSet(null, e); + } + finally { + done.set(true); + } + } + }, 1, "starter"); + + int updVal = 0; + + try { + while (!done.get()) { + info("Will put: " + (updVal + 1)); + + for (int i = 0; i < keys; i++) + assertTrue("Failed [key=" + i + ", oldVal=" + updVal+ ']', + ignite(0).cache(null).replace(i, updVal, updVal + 1)); + + updVal++; + } + } + finally { + fut.get(getTestTimeout()); + } + + for (int i = 0; i < keys; i++) { + for (int g = 0; g < GRID_CNT + started; g++) { + Integer val = ignite(g).cache(null).get(i); + + GridCacheEntryEx entry = ((IgniteKernal)grid(g)).internalCache(null).peekEx(i); + + if (updVal != val) + info("Invalid value for grid [g=" + g + ", entry=" + entry + ']'); + + assertEquals((Integer)updVal, val); + } + } + } + finally { + for (int i = 0; i < started; i++) + stopGrid(GRID_CNT + i); + } + } + /** */ private static class Processor implements EntryProcessor, Void>, Serializable { /** */