Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7F24A18AB0 for ; Fri, 21 Aug 2015 11:52:18 +0000 (UTC) Received: (qmail 98746 invoked by uid 500); 21 Aug 2015 11:52:18 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 98714 invoked by uid 500); 21 Aug 2015 11:52:18 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 98703 invoked by uid 99); 21 Aug 2015 11:52:18 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 11:52:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id F0780C08D9 for ; Fri, 21 Aug 2015 11:52:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.795 X-Spam-Level: * X-Spam-Status: No, score=1.795 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id a1AeD05OxD1e for ; Fri, 21 Aug 2015 11:52:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id A7F5223134 for ; Fri, 21 Aug 2015 11:52:03 +0000 (UTC) Received: (qmail 97564 invoked by uid 99); 21 Aug 2015 11:52:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 11:52:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2511DE1786; Fri, 21 Aug 2015 11:52:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 21 Aug 2015 11:52:06 -0000 Message-Id: <18b6c78743574049830c6be4dd42a77d@git.apache.org> In-Reply-To: <686106f7db0742ffb55f415a7e7c65ce@git.apache.org> References: <686106f7db0742ffb55f415a7e7c65ce@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/40] incubator-ignite git commit: IGNITE-1265 - Entry processor must always have the correct cache value. IGNITE-1265 - Entry processor must always have the correct cache value. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5065a1ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5065a1ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5065a1ec Branch: refs/heads/ignite-1258 Commit: 5065a1eccb3d71b2573d37bb6ff2c78a1bbc107c Parents: ccaa2b2 Author: Alexey Goncharuk Authored: Tue Aug 18 19:35:50 2015 -0700 Committer: Alexey Goncharuk Committed: Tue Aug 18 19:35:50 2015 -0700 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 20 +++ .../dht/GridDhtPartitionTopology.java | 7 + .../dht/GridDhtPartitionTopologyImpl.java | 20 +++ .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 136 +++++++++++++++++-- .../cache/transactions/IgniteTxEntry.java | 18 +++ .../IgniteCacheEntryProcessorNodeJoinTest.java | 54 ++++---- .../cache/IgniteCacheInvokeReadThroughTest.java | 2 +- 8 files changed, 223 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index c3f3e7f..531678e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public Collection nodes(int p, AffinityTopologyVersion topVer) { lock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index c551fb3..7b08510 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology { public GridDhtPartitionMap localPartitionMap(); /** + * @param nodeId Node ID. + * @param part Partition. + * @return Partition state. + */ + public GridDhtPartitionState partitionState(UUID nodeId, int part); + + /** * @return Current update sequence. */ public long updateSequence(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index de7f876..f356138 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -614,6 +614,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public Collection nodes(int p, AffinityTopologyVersion topVer) { Collection affNodes = cctx.affinity().nodes(p, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 6a72c89..7da6e07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @return Future that will be completed when locks are acquired. */ public IgniteInternalFuture prepareAsync( - @Nullable Iterable reads, - @Nullable Iterable writes, + @Nullable Collection reads, + @Nullable Collection writes, Map verMap, long msgId, IgniteUuid nearMiniId, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 2b7e1bc..ad1023f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture lockKeys = new GridConcurrentHashSet<>(); + /** Force keys future for correct transforms. */ + private IgniteInternalFuture forceKeysFut; + /** Locks ready flag. */ private volatile boolean locksReady; @@ -291,7 +294,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture>() { + @Override public void apply(IgniteInternalFuture f) { + try { + f.get(); + + prepare0(); + } + catch (IgniteCheckedException e) { + onError(e); + } + } + }); + } return true; } @@ -709,7 +732,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture reads, Iterable writes, + @SuppressWarnings("TypeMayBeWeakened") + public void prepare(Collection reads, Collection writes, Map> txNodes) { if (tx.empty()) { tx.setRollbackOnly(); @@ -721,6 +745,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture> forceKeys = null; + + for (IgniteTxEntry entry : writes) + forceKeys = checkNeedRebalanceKeys(entry, forceKeys); + + forceKeysFut = forceRebalanceKeys(forceKeys); + } + readyLocks(); mapIfLocked(); @@ -735,12 +768,75 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture> checkNeedRebalanceKeys( + IgniteTxEntry e, + Map> map + ) { + if (retVal || !F.isEmpty(e.entryProcessors())) { + if (map == null) + map = new HashMap<>(); + + Collection keys = map.get(e.cacheId()); + + if (keys == null) { + keys = new ArrayList<>(); + + map.put(e.cacheId(), keys); + } + + keys.add(e.key()); + } + + return map; + } + + private IgniteInternalFuture forceRebalanceKeys(Map> keysMap) { + if (F.isEmpty(keysMap)) + return null; + + GridCompoundFuture compFut = null; + IgniteInternalFuture lastForceFut = null; + for (Map.Entry> entry : keysMap.entrySet()) { + if (lastForceFut != null && compFut == null) { + compFut = new GridCompoundFuture(); + + compFut.add(lastForceFut); + } + + int cacheId = entry.getKey(); + + Collection keys = entry.getValue(); + + lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion()); + + if (compFut != null) + compFut.add(lastForceFut); + } + + if (compFut != null) { + compFut.markInitialized(); + + return compFut; + } + else { + assert lastForceFut != null; + + return lastForceFut; + } + } + + /** + * + */ + private void prepare0() { try { // We are holding transaction-level locks for entries here, so we can get next write version. onEntriesLocked(); @@ -957,7 +1053,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture futDhtMap, - Map futNearMap) { + Map futNearMap + ) { if (entry.cached().isLocal()) return false; @@ -1024,14 +1121,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture nodes, - Map globalMap, Map locMap) { + private boolean map( + IgniteTxEntry entry, + Iterable nodes, + Map globalMap, + Map locMap + ) { boolean ret = false; if (nodes != null) { for (ClusterNode n : nodes) { GridDistributedTxMapping global = globalMap.get(n.id()); + if (!F.isEmpty(entry.entryProcessors())) { + GridDhtPartitionState state = entry.context().topology().partitionState(n.id(), + entry.cached().partition()); + + if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { + CacheObject procVal = entry.entryProcessorCalculatedValue(); + + entry.op(procVal == null ? DELETE : UPDATE); + entry.value(procVal, true, false); + entry.entryProcessors(null); + } + } + if (global == null) globalMap.put(n.id(), global = new GridDistributedTxMapping(n)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 247d350..7890831 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { @GridDirectTransient private Collection, Object[]>> entryProcessorsCol; + /** Transient field for calculated entry processor value. */ + @GridDirectTransient + private CacheObject entryProcessorCalcVal; + /** Transform closure bytes. */ @GridToStringExclude private byte[] transformClosBytes; @@ -775,6 +779,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { return expiryPlc; } + /** + * @return Entry processor calculated value. + */ + public CacheObject entryProcessorCalculatedValue() { + return entryProcessorCalcVal; + } + + /** + * @param entryProcessorCalcVal Entry processor calculated value. + */ + public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) { + this.entryProcessorCalcVal = entryProcessorCalcVal; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java index 9c17ebd..94bfd8f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java @@ -122,38 +122,44 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes final AtomicReference error = new AtomicReference<>(); final int started = 6; - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - try { - for (int i = 0; i < started; i++) { - U.sleep(1_000); - - startGrid(GRID_CNT + i); + try { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + for (int i = 0; i < started; i++) { + U.sleep(1_000); + + startGrid(GRID_CNT + i); + } + } + catch (Exception e) { + error.compareAndSet(null, e); } } - catch (Exception e) { - error.compareAndSet(null, e); - } - } - }, 1, "starter"); + }, 1, "starter"); - try { - checkIncrement(invokeAll); - } - finally { - stop.set(true); + try { + checkIncrement(invokeAll); + } + finally { + stop.set(true); - fut.get(getTestTimeout()); - } + fut.get(getTestTimeout()); + } - for (int i = 0; i < NUM_SETS; i++) { - for (int g = 0; g < GRID_CNT + started; g++) { - Set vals = ignite(g).>cache(null).get("set-" + i); + for (int i = 0; i < NUM_SETS; i++) { + for (int g = 0; g < GRID_CNT + started; g++) { + Set vals = ignite(g).>cache(null).get("set-" + i); - assertNotNull(vals); - assertEquals(100, vals.size()); + assertNotNull(vals); + assertEquals(100, vals.size()); + } } } + finally { + for (int i = 0; i < started; i++) + stopGrid(GRID_CNT + i); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java index 10ab1ab..b72540d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java @@ -34,7 +34,7 @@ import static org.apache.ignite.cache.CacheMode.*; public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-114"); +// fail("https://issues.apache.org/jira/browse/IGNITE-114"); } /** */