Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B454B10E70 for ; Fri, 10 Apr 2015 06:47:06 +0000 (UTC) Received: (qmail 44851 invoked by uid 500); 10 Apr 2015 06:47:06 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 44822 invoked by uid 500); 10 Apr 2015 06:47:06 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 44813 invoked by uid 99); 10 Apr 2015 06:47:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Apr 2015 06:47:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 10 Apr 2015 06:47:01 +0000 Received: (qmail 44203 invoked by uid 99); 10 Apr 2015 06:46:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Apr 2015 06:46:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2DDD8DFC56; Fri, 10 Apr 2015 06:46:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 10 Apr 2015 06:46:42 -0000 Message-Id: In-Reply-To: <1379bb6c7e9547a6a7f1ee0d8a87f721@git.apache.org> References: <1379bb6c7e9547a6a7f1ee0d8a87f721@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/12] incubator-ignite git commit: IGNITE-674 - Merging 6.6.4 fixes to Ignite. X-Virus-Checked: Checked by ClamAV on apache.org IGNITE-674 - Merging 6.6.4 fixes to Ignite. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ab01e7dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ab01e7dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ab01e7dd Branch: refs/heads/ignite-sprint-3 Commit: ab01e7dd37803a5a39b71d58429bccae79def1a4 Parents: 7e86251 Author: Alexey Goncharuk Authored: Fri Apr 3 15:23:38 2015 -0700 Committer: Alexey Goncharuk Committed: Fri Apr 3 15:23:38 2015 -0700 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 1 - .../processors/cache/GridCacheContext.java | 58 ++++- .../cache/GridCacheEvictionManager.java | 6 +- .../cache/GridCacheExplicitLockSpan.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 28 ++- .../processors/cache/GridCacheMvcc.java | 2 +- .../cache/GridCacheMvccCandidate.java | 50 ++++- .../processors/cache/GridCacheMvccManager.java | 44 +++- .../distributed/GridDistributedCacheEntry.java | 12 +- .../distributed/dht/GridDhtCacheEntry.java | 31 ++- .../distributed/dht/GridDhtLockFuture.java | 15 +- .../dht/GridDhtTransactionalCacheAdapter.java | 14 +- .../distributed/dht/GridDhtTxFinishFuture.java | 22 +- .../cache/distributed/dht/GridDhtTxLocal.java | 2 + .../distributed/dht/GridDhtTxLocalAdapter.java | 30 ++- .../distributed/dht/GridDhtTxPrepareFuture.java | 28 ++- .../dht/colocated/GridDhtColocatedCache.java | 32 ++- .../colocated/GridDhtColocatedLockFuture.java | 46 +++- .../GridDhtPartitionsExchangeFuture.java | 39 +++- .../distributed/near/GridNearCacheEntry.java | 10 +- .../distributed/near/GridNearLockFuture.java | 11 +- .../near/GridNearTxFinishFuture.java | 8 +- .../cache/distributed/near/GridNearTxLocal.java | 7 + .../near/GridNearTxPrepareFuture.java | 122 +++++++---- .../near/GridNearTxPrepareRequest.java | 64 ++++-- .../cache/transactions/IgniteTxAdapter.java | 3 +- .../cache/transactions/IgniteTxHandler.java | 120 +++++++--- .../transactions/IgniteTxLocalAdapter.java | 76 ++++--- .../cache/transactions/IgniteTxManager.java | 7 + .../util/future/GridCompoundFuture.java | 1 + .../internal/util/future/GridFutureAdapter.java | 18 +- .../ignite/internal/util/worker/GridWorker.java | 6 +- .../dht/IgniteCacheLockFailoverSelfTest.java | 148 +++++++++++++ .../dht/IgniteCacheMultiTxLockSelfTest.java | 217 +++++++++++++++++++ .../testframework/junits/GridAbstractTest.java | 4 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 36 files changed, 1052 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 9551680..f75bcf4 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -18,7 +18,6 @@ package org.apache.ignite.codegen; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index dfa82e6..6c3bd60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1416,6 +1416,7 @@ public class GridCacheContext implements Externalizable { UUID nearNodeId, AffinityTopologyVersion topVer, GridDhtCacheEntry entry, + GridCacheVersion explicitLockVer, IgniteLogger log, Map> dhtMap, @Nullable Map> nearMap @@ -1431,6 +1432,8 @@ public class GridCacheContext implements Externalizable { boolean ret = map(entry, dhtRemoteNodes, dhtMap); + Collection nearRemoteNodes = null; + if (nearMap != null) { Collection readers = entry.readers(); @@ -1445,8 +1448,21 @@ public class GridCacheContext implements Externalizable { else if (log.isDebugEnabled()) log.debug("Entry has no near readers: " + entry); - if (nearNodes != null && !nearNodes.isEmpty()) - ret |= map(entry, F.view(nearNodes, F.notIn(dhtNodes)), nearMap); + if (nearNodes != null && !nearNodes.isEmpty()) { + nearRemoteNodes = F.view(nearNodes, F.notIn(dhtNodes)); + + ret |= map(entry, nearRemoteNodes, nearMap); + } + } + + if (explicitLockVer != null) { + Collection dhtNodeIds = new ArrayList<>(dhtRemoteNodes); + Collection nearNodeIds = F.isEmpty(nearRemoteNodes) ? null : new ArrayList<>(nearRemoteNodes); + + if (!F.isEmpty(nearNodeIds)) + U.dumpStack("Added near mapped nodes: " + entry + ", " + nearNodeIds); + + entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds); } return ret; @@ -1454,6 +1470,44 @@ public class GridCacheContext implements Externalizable { /** * @param entry Entry. + * @param log Log. + * @param dhtMap Dht mappings. + * @param nearMap Near mappings. + * @return {@code True} if mapped. + * @throws GridCacheEntryRemovedException If reader for entry is removed. + */ + public boolean dhtMap( + GridDhtCacheEntry entry, + GridCacheVersion explicitLockVer, + IgniteLogger log, + Map> dhtMap, + Map> nearMap + ) throws GridCacheEntryRemovedException { + assert explicitLockVer != null; + + GridCacheMvccCandidate cand = entry.candidate(explicitLockVer); + + if (cand != null) { + Collection dhtNodes = cand.mappedDhtNodes(); + + if (log.isDebugEnabled()) + log.debug("Mapping explicit lock to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); + + Collection nearNodes = cand.mappedNearNodes(); + + boolean ret = map(entry, dhtNodes, dhtMap); + + if (nearNodes != null && !nearNodes.isEmpty()) + ret |= map(entry, nearNodes, nearMap); + + return ret; + } + + return false; + } + + /** + * @param entry Entry. * @param nodes Nodes. * @param map Map. * @return {@code True} if mapped. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index fd56568..b0c9243 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -253,11 +253,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { if (plcEnabled && evictSync && !cctx.isNear()) { // Add dummy event to worker. - ClusterNode locNode = cctx.localNode(); - - DiscoveryEvent evt = new DiscoveryEvent(locNode, "Dummy event.", EVT_NODE_JOINED, locNode); - - evt.topologySnapshot(locNode.order(), cctx.discovery().topology(locNode.order())); + DiscoveryEvent evt = cctx.discovery().localJoinEvent(); backupWorker.addEvent(evt); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java index 242e769..0e555c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java @@ -45,7 +45,7 @@ public class GridCacheExplicitLockSpan extends ReentrantLock { private final Map> cands = new HashMap<>(); /** Span lock release future. */ - @GridToStringInclude + @GridToStringExclude private final GridFutureAdapter releaseFut = new GridFutureAdapter<>(); /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 542d0ad..76fd69b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2623,7 +2623,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { ttlAndExpireTimeExtras(ttl, expireTime); - if (expireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl()) + if (expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl()) cctx.ttl().addTrackedEntry(this); this.ver = ver; @@ -3508,26 +3508,32 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { try { synchronized (this) { - CacheObject expiredVal = val; + CacheObject expiredVal = saveValueForIndexUnlocked(); boolean hasOldBytes = valPtr != 0; boolean expired = checkExpired(); if (expired) { - if (cctx.deferredDelete() && !detached() && !isInternal()) { - if (!deletedUnlocked()) { - update(null, 0L, 0L, ver); + if (!obsolete()) { + if (cctx.deferredDelete() && !detached() && !isInternal()) { + if (!deletedUnlocked()) { + update(null, 0L, 0L, ver); - deletedUnlocked(true); + deletedUnlocked(true); - deferred = true; + deferred = true; + } + } + else { + if (markObsolete0(obsoleteVer, true)) + obsolete = true; // Success, will return "true". } } - else { - if (markObsolete0(obsoleteVer, true)) - obsolete = true; // Success, will return "true". - } + + clearIndex(expiredVal); + + releaseSwap(); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { cctx.events().addEvent(partition(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index b0f5b2e..83a108f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -1014,7 +1014,7 @@ public final class GridCacheMvcc { for (Iterator it = rmts.iterator(); it.hasNext(); ) { GridCacheMvccCandidate cand = it.next(); - if (!cand.tx() && nodeId.equals(cand.nodeId())) { + if (!cand.tx() && (nodeId.equals(cand.nodeId()) || nodeId.equals(cand.otherNodeId()))) { cand.setUsed(); // Mark as used to be consistent. cand.setRemoved(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java index c5f26cb..894e465 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -94,6 +96,14 @@ public class GridCacheMvccCandidate implements Externalizable, /** Other lock version (near version vs dht version). */ private transient GridCacheVersion otherVer; + /** Mapped DHT node IDs. */ + @GridToStringInclude + private transient volatile Collection mappedDhtNodes; + + /** Mapped near node IDs. */ + @GridToStringInclude + private transient volatile Collection mappedNearNodes; + /** Owned lock version by the moment this candidate was added. */ @GridToStringInclude private transient volatile GridCacheVersion ownerVer; @@ -229,13 +239,6 @@ public class GridCacheMvccCandidate implements Externalizable, } /** - * @return {@code True} if has reentry. - */ - public boolean hasReentry() { - return reentry != null; - } - - /** * @return Removed reentry candidate or {@code null}. */ @Nullable public GridCacheMvccCandidate unenter() { @@ -282,6 +285,39 @@ public class GridCacheMvccCandidate implements Externalizable, } /** + * @return Mapped node IDs. + */ + public Collection mappedDhtNodes() { + return mappedDhtNodes; + } + + /** + * @return Mapped node IDs. + */ + public Collection mappedNearNodes() { + return mappedNearNodes; + } + + /** + * @param mappedDhtNodes Mapped DHT node IDs. + */ + public void mappedNodeIds(Collection mappedDhtNodes, Collection mappedNearNodes) { + this.mappedDhtNodes = mappedDhtNodes; + this.mappedNearNodes = mappedNearNodes; + } + + /** + * @param node Node to remove. + */ + public void removeMappedNode(ClusterNode node) { + if (mappedDhtNodes.contains(node)) + mappedDhtNodes = new ArrayList<>(F.view(mappedDhtNodes, F.notEqualTo(node))); + + if (mappedNearNodes != null && mappedNearNodes.contains(node)) + mappedNearNodes = new ArrayList<>(F.view(mappedNearNodes, F.notEqualTo(node))); + } + + /** * @return Near version. */ public GridCacheVersion otherVersion() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 081ea78..a569e56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -174,17 +174,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Processing node left [nodeId=" + discoEvt.eventNode().id() + "]"); - for (GridDistributedCacheEntry entry : locked()) { - try { - entry.removeExplicitNodeLocks(discoEvt.eventNode().id()); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Attempted to remove node locks from removed entry in mvcc manager " + - "disco callback (will ignore): " + entry); - } - } - for (Collection> futsCol : futs.values()) { for (GridCacheFuture fut : futsCol) { if (!fut.trackable()) { @@ -239,6 +228,39 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @return Collection of pending explicit locks. + */ + public Collection activeExplicitLocks() { + return pendingExplicit.values(); + } + + /** + * @return Collection of active futures. + */ + public Collection> activeFutures() { + return F.flatCollections(futs.values()); + } + + /** + * @param leftNodeId Left node ID. + * @param topVer Topology version. + */ + public void removeExplicitNodeLocks(UUID leftNodeId, AffinityTopologyVersion topVer) { + for (GridDistributedCacheEntry entry : locked()) { + try { + entry.removeExplicitNodeLocks(leftNodeId); + + entry.context().evicts().touch(entry, topVer); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Attempted to remove node locks from removed entry in mvcc manager " + + "disco callback (will ignore): " + entry); + } + } + } + + /** * @param from From version. * @param to To version. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index a1d83d5..9fb9f46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -316,8 +316,11 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { refreshRemotes(); - if (emptyAfter) + if (emptyAfter) { mvccExtras(null); + + onUnlock(); + } } } @@ -326,6 +329,13 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { } /** + * + */ + public void onUnlock() { + // No-op. + } + + /** * Unlocks local lock. * * @return Removed candidate, or null if thread still holds the lock. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index e6d5173..7762763 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -297,7 +297,9 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { /** * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition. */ - public void onUnlock() { + @Override public void onUnlock() { + super.onUnlock(); + locPart.onUnlock(); } @@ -644,13 +646,34 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { * @return Candidate, if one existed for the version, or {@code null} if candidate was not found. * @throws GridCacheEntryRemovedException If removed. */ - @Nullable public synchronized GridCacheMvccCandidate mappings(GridCacheVersion ver) - throws GridCacheEntryRemovedException { + @Nullable public synchronized GridCacheMvccCandidate mappings( + GridCacheVersion ver, + Collection dhtNodeIds, + Collection nearNodeIds + ) throws GridCacheEntryRemovedException { checkObsolete(); GridCacheMvcc mvcc = mvccExtras(); - return mvcc == null ? null : mvcc.candidate(ver); + GridCacheMvccCandidate cand = mvcc == null ? null : mvcc.candidate(ver); + + if (cand != null) + cand.mappedNodeIds(dhtNodeIds, nearNodeIds); + + return cand; + } + + /** + * @param ver Version. + * @param mappedNode Mapped node to remove. + */ + public synchronized void removeMapping(GridCacheVersion ver, ClusterNode mappedNode) { + GridCacheMvcc mvcc = mvccExtras(); + + GridCacheMvccCandidate cand = mvcc == null ? null : mvcc.candidate(ver); + + if (cand != null) + cand.removeMappedNode(mappedNode); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 7e93946..9898cf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -188,7 +188,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridDhtCach req.implicitTx(), req.implicitSingleTx(), ctx.system(), + false, ctx.ioPolicy(), PESSIMISTIC, req.isolation(), @@ -1165,13 +1166,9 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach while (true) { GridDistributedCacheEntry entry = peekExx(key); - boolean created = false; - - if (entry == null) { - entry = entryExx(key); - - created = true; - } + if (entry == null) + // Nothing to unlock. + break; try { entry.doneRemote( @@ -1195,9 +1192,6 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach "(added to cancelled locks set): " + req); } - if (created && entry.markObsolete(req.version())) - removeEntry(entry); - ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 728145a..8e8a0a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -243,13 +243,6 @@ public final class GridDhtTxFinishFuture extends GridCompoundIdentityFutur } /** - * Completes this future. - */ - void complete() { - onComplete(); - } - - /** * Initializes future. */ public void finish() { @@ -279,10 +272,13 @@ public final class GridDhtTxFinishFuture extends GridCompoundIdentityFutur if (tx.onePhaseCommit()) return false; - boolean res = false; - boolean sync = commit ? tx.syncCommit() : tx.syncRollback(); + if (tx.explicitLock()) + sync = true; + + boolean res = false; + // Create mini futures. for (GridDistributedTxMapping dhtMapping : dhtMap.values()) { ClusterNode n = dhtMapping.node(); @@ -313,8 +309,8 @@ public final class GridDhtTxFinishFuture extends GridCompoundIdentityFutur tx.system(), tx.ioPolicy(), tx.isSystemInvalidate(), - tx.syncCommit(), - tx.syncRollback(), + sync, + sync, tx.completedBase(), tx.committedVersions(), tx.rolledbackVersions(), @@ -365,8 +361,8 @@ public final class GridDhtTxFinishFuture extends GridCompoundIdentityFutur tx.system(), tx.ioPolicy(), tx.isSystemInvalidate(), - tx.syncCommit(), - tx.syncRollback(), + sync, + sync, tx.completedBase(), tx.committedVersions(), tx.rolledbackVersions(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 00e8e3e..f3266df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -107,6 +107,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa boolean implicit, boolean implicitSingle, boolean sys, + boolean explicitLock, GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, @@ -126,6 +127,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa implicit, implicitSingle, sys, + explicitLock, plc, concurrency, isolation, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 5e37d96..e0f2008 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -65,6 +65,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { private long dhtThreadId; /** */ + protected boolean explicitLock; + + /** */ private boolean needsCompletedVers; /** Versions of pending locks for entries of this tx. */ @@ -96,6 +99,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { boolean implicit, boolean implicitSingle, boolean sys, + boolean explicitLock, GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, @@ -113,6 +117,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { assert cctx != null; + this.explicitLock = explicitLock; + threadId = Thread.currentThread().getId(); dhtThreadId = threadId; } @@ -179,6 +185,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** + * @return Explicit lock flag. + */ + public boolean explicitLock() { + return explicitLock; + } + + /** + * @param explicitLock Explicit lock flag. + */ + public void explicitLock(boolean explicitLock) { + this.explicitLock = explicitLock; + } + + /** * @return DHT thread ID. */ long dhtThreadId() { @@ -227,8 +247,12 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (nearEntryMap == null) nearEntryMap = new GridLeanMap<>(); - cacheCtx.dhtMap(nearNodeId(), topologyVersion(), - (GridDhtCacheEntry)e.cached(), log, dhtEntryMap, nearEntryMap); + cacheCtx.dhtMap( + (GridDhtCacheEntry)e.cached(), + e.explicitVersion(), + log, + dhtEntryMap, + nearEntryMap); } break; @@ -828,6 +852,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), - "dhtNodes", dhtMap.keySet(), "super", super.toString()); + "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index e0fc719..87e5a29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -856,13 +856,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundIdentityFutu if (!F.isEmpty(nearWrites)) { for (IgniteTxEntry entry : nearWrites) { try { - GridCacheMvccCandidate added = entry.cached().candidate(version()); + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate added = entry.cached().candidate(version()); - assert added != null; - assert added.dhtLocal(); + assert added != null : "Missing candidate for cache entry:" + entry; + assert added.dhtLocal(); - if (added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); + if (added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + } break; } @@ -912,15 +914,17 @@ public final class GridDhtTxPrepareFuture extends GridCompoundIdentityFutu for (IgniteTxEntry entry : nearMapping.writes()) { try { - GridCacheMvccCandidate added = entry.cached().candidate(version()); + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate added = entry.cached().candidate(version()); - assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; + assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; - if (added != null && added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); + if (added != null && added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + } break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index eb6d9a6..e941d30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; @@ -501,9 +502,20 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte assert !n.isLocal(); - if (!F.isEmpty(req.keys())) - // We don't wait for reply to this message. - ctx.io().send(n, req, ctx.ioPolicy()); + if (!F.isEmpty(req.keys())) { + try { + // We don't wait for reply to this message. + ctx.io().send(n, req, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send unlock request (node has left the grid) [keys=" + req.keys() + + ", n=" + n + ", e=" + e + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send unlock request [keys=" + req.keys() + ", n=" + n + ']', e); + } + } } } catch (IgniteCheckedException ex) { @@ -584,8 +596,18 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte if (!F.isEmpty(req.keys())) { req.completedVersions(committed, rolledback); - // We don't wait for reply to this message. - ctx.io().send(n, req, ctx.ioPolicy()); + try { + // We don't wait for reply to this message. + ctx.io().send(n, req, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send unlock request (node has left the grid) [keys=" + req.keys() + + ", n=" + n + ", e=" + e + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send unlock request [keys=" + req.keys() + ", n=" + n + ']', e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 3087dff..7b05065 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -141,6 +141,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity this.accessTtl = accessTtl; this.filter = filter; + ignoreInterrupts(true); + threadId = tx == null ? Thread.currentThread().getId() : tx.threadId(); lockVer = tx != null ? tx.xidVersion() : cctx.versions().next(); @@ -517,8 +519,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity */ void map() { // Obtain the topology version to use. - AffinityTopologyVersion topVer = tx != null ? tx.topologyVersionSnapshot() : - cctx.mvcc().lastExplicitLockTopologyVersion(threadId); + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); + + if (topVer != null && tx != null) + tx.topologyVersion(topVer); + + if (topVer == null && tx != null) + topVer = tx.topologyVersionSnapshot(); if (topVer != null) { // Continue mapping on the same topology version as it was before. @@ -651,15 +658,30 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity Collection distributedKeys = new ArrayList<>(mappedKeys.size()); for (KeyCacheObject key : mappedKeys) { - boolean explicit; - IgniteTxKey txKey = cctx.txKey(key); - while (true) { - GridDistributedCacheEntry entry = null; + GridDistributedCacheEntry entry = null; + + if (tx != null) { + IgniteTxEntry txEntry = tx.entry(txKey); + + if (txEntry != null) { + entry = (GridDistributedCacheEntry)txEntry.cached(); + + if (entry != null && !(loc ^ entry.detached())) { + entry = cctx.colocated().entryExx(key, topVer, true); + + txEntry.cached(entry); + } + } + } + + boolean explicit; + while (true) { try { - entry = cctx.colocated().entryExx(key, topVer, true); + if (entry == null) + entry = cctx.colocated().entryExx(key, topVer, true); if (!cctx.isAll(entry, filter)) { if (log.isDebugEnabled()) @@ -941,6 +963,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity // Assign keys to primary nodes. Collection distributedKeys = new ArrayList<>(keys.size()); + boolean explicit = false; + for (KeyCacheObject key : keys) { if (!cctx.affinity().primary(cctx.localNode(), key, topVer)) { // Remove explicit locks added so far. @@ -950,7 +974,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity return false; } - addLocalKey(key, topVer, distributedKeys); + explicit |= addLocalKey(key, topVer, distributedKeys); if (isDone()) return true; @@ -958,8 +982,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity trackable = false; - if (tx != null) + if (tx != null) { + if (explicit) + tx.markExplicit(cctx.localNodeId()); + tx.colocatedLocallyMapped(true); + } if (!distributedKeys.isEmpty()) { if (tx != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index ddf0df1..ccc2d8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; @@ -564,7 +565,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter>> " + tx); + + U.warn(log, "Pending explicit locks:"); + + for (GridCacheExplicitLockSpan lockSpan : cctx.mvcc().activeExplicitLocks()) + U.warn(log, ">>> " + lockSpan); + + U.warn(log, "Pending cache futures:"); + + for (GridCacheFuture fut : cctx.mvcc().activeFutures()) + U.warn(log, ">>> " + fut); + } + + /** * @param cacheId Cache ID to check. * @return {@code True} if cache is stopping by this exchange. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index ce80ad9..74342fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -499,7 +499,15 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { return null; // Local lock for near cache is a local lock. - cand = mvcc.addNearLocal(this, locId, dhtNodeId, threadId, ver, timeout, tx, implicitSingle); + cand = mvcc.addNearLocal( + this, + locId, + dhtNodeId, + threadId, + ver, + timeout, + tx, + implicitSingle); owner = mvcc.anyOwner(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index d6d1a1b..a25d546 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -152,6 +152,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFutu private IgniteUuid futId; /** Transaction. */ - @GridToStringExclude + @GridToStringInclude private GridNearTxLocal tx; /** Commit flag. */ @@ -85,12 +85,12 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu public GridNearTxFinishFuture(GridCacheSharedContext cctx, GridNearTxLocal tx, boolean commit) { super(cctx.kernalContext(), F.identityReducer(tx)); - assert cctx != null; - this.cctx = cctx; this.tx = tx; this.commit = commit; + ignoreInterrupts(true); + mappings = tx.mappings(); futId = IgniteUuid.randomUuid(); @@ -264,7 +264,7 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu * @return Synchronous flag. */ private boolean isSync() { - return commit ? tx.syncCommit() : tx.syncRollback(); + return tx.explicitLock() || (commit ? tx.syncCommit() : tx.syncRollback()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index fa916b0..1b2d204 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -59,14 +60,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { new ConcurrentHashMap8<>(); /** Future. */ + @GridToStringExclude private final AtomicReference> prepFut = new AtomicReference<>(); /** */ + @GridToStringExclude private final AtomicReference commitFut = new AtomicReference<>(); /** */ + @GridToStringExclude private final AtomicReference rollbackFut = new AtomicReference<>(); @@ -126,6 +130,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { implicit, implicitSingle, sys, + false, plc, concurrency, isolation, @@ -527,6 +532,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @return {@code True} if mapping was found. */ public boolean markExplicit(UUID nodeId) { + explicitLock = true; + GridDistributedTxMapping m = mappings.get(nodeId); if (m != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 2da78fb..39a33b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -68,7 +68,7 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut private IgniteUuid futId; /** Transaction. */ - @GridToStringExclude + @GridToStringInclude private GridNearTxLocal tx; /** Error. */ @@ -198,6 +198,7 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut tx.removeKeysMapping(nodeId, mappings); } + if (e instanceof IgniteTxRollbackCheckedException) { if (marked) { try { @@ -308,62 +309,59 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut */ public void prepare() { if (tx.optimistic()) { - GridDhtTopologyFuture topFut = topologyReadLock(); + // Obtain the topology version to use. + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); - try { - if (topFut == null) { - assert isDone(); + if (topVer != null) { + tx.topologyVersion(topVer); - return; - } + prepare0(); - if (topFut.isDone()) { - try { - if (!tx.state(PREPARING)) { - if (tx.setRollbackOnly()) { - if (tx.timedOut()) - onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + - "was rolled back: " + this)); - else - onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + - "[state=" + tx.state() + ", tx=" + this + ']')); - } - else - onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + - "prepare [state=" + tx.state() + ", tx=" + this + ']')); + return; + } - return; - } + prepareOnTopology(); - tx.topologyVersion(topFut.topologyVersion()); + } + else + preparePessimistic(); + } - // Make sure to add future before calling prepare. - cctx.mvcc().addFuture(this); + /** + * + */ + private void prepareOnTopology() { + GridDhtTopologyFuture topFut = topologyReadLock(); - prepare0(); - } - catch (TransactionTimeoutException | TransactionOptimisticException e) { - onError(cctx.localNodeId(), null, e); - } - } - else { - topFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture t) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - prepare(); - } - }); - } - }); - } + try { + if (topFut == null) { + assert isDone(); + + return; + } + + if (topFut.isDone()) { + tx.topologyVersion(topFut.topologyVersion()); + + prepare0(); } - finally { - topologyReadUnlock(); + else { + topFut.listen(new CI1>() { + @Override + public void apply(IgniteInternalFuture t) { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override + public void run() { + prepareOnTopology(); + } + }); + } + }); } } - else - preparePessimistic(); + finally { + topologyReadUnlock(); + } } /** @@ -431,12 +429,34 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut assert tx.optimistic(); try { + if (!tx.state(PREPARING)) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } + else + onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); + + return; + } + + // Make sure to add future before calling prepare. + cctx.mvcc().addFuture(this); + prepare( tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.emptyList(), tx.writeEntries()); markInitialized(); } + catch (TransactionTimeoutException | TransactionOptimisticException e) { + onError(cctx.localNodeId(), null, e); + } catch (IgniteCheckedException e) { onDone(e); } @@ -592,6 +612,7 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), tx.implicitSingle(), + m.explicitLock(), tx.subjectId(), tx.taskNameHash()); @@ -682,6 +703,7 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), tx.implicitSingle(), + m.explicitLock(), tx.subjectId(), tx.taskNameHash()); @@ -790,6 +812,12 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut cur.add(entry); + if (entry.explicitVersion() != null) { + tx.markExplicit(primary.id()); + + cur.markExplicitLock(); + } + entry.nodeId(primary.id()); if (cacheCtx.isNear()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 846022c..f0587ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -66,6 +66,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** Implicit single flag. */ private boolean implicitSingle; + /** Explicit lock flag. Set to true if at leat one entry was explicitly locked. */ + private boolean explicitLock; + /** Subject ID. */ private UUID subjId; @@ -109,6 +112,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean onePhaseCommit, boolean retVal, boolean implicitSingle, + boolean explicitLock, @Nullable UUID subjId, int taskNameHash ) { @@ -123,6 +127,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { this.lastBackups = lastBackups; this.retVal = retVal; this.implicitSingle = implicitSingle; + this.explicitLock = explicitLock; this.subjId = subjId; this.taskNameHash = taskNameHash; } @@ -198,6 +203,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { } /** + * @return Explicit lock flag. + */ + public boolean explicitLock() { + return explicitLock; + } + + /** * @return Topology version. */ @Override public AffinityTopologyVersion topologyVersion() { @@ -259,60 +271,66 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { switch (writer.state()) { case 25: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeBoolean("explicitLock", explicitLock)) return false; writer.incrementState(); case 26: - if (!writer.writeBoolean("implicitSingle", implicitSingle)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 27: - if (!writer.writeBoolean("last", last)) + if (!writer.writeBoolean("implicitSingle", implicitSingle)) return false; writer.incrementState(); case 28: - if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) + if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); case 29: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID)) return false; writer.incrementState(); case 30: - if (!writer.writeBoolean("near", near)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 31: - if (!writer.writeBoolean("retVal", retVal)) + if (!writer.writeBoolean("near", near)) return false; writer.incrementState(); case 32: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); case 33: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 34: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 35: if (!writer.writeMessage("topVer", topVer)) return false; @@ -335,7 +353,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { switch (reader.state()) { case 25: - futId = reader.readIgniteUuid("futId"); + explicitLock = reader.readBoolean("explicitLock"); if (!reader.isLastRead()) return false; @@ -343,7 +361,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 26: - implicitSingle = reader.readBoolean("implicitSingle"); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -351,7 +369,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 27: - last = reader.readBoolean("last"); + implicitSingle = reader.readBoolean("implicitSingle"); if (!reader.isLastRead()) return false; @@ -359,7 +377,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 28: - lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); + last = reader.readBoolean("last"); if (!reader.isLastRead()) return false; @@ -367,7 +385,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 29: - miniId = reader.readIgniteUuid("miniId"); + lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID); if (!reader.isLastRead()) return false; @@ -375,7 +393,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 30: - near = reader.readBoolean("near"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -383,7 +401,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 31: - retVal = reader.readBoolean("retVal"); + near = reader.readBoolean("near"); if (!reader.isLastRead()) return false; @@ -391,7 +409,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 32: - subjId = reader.readUuid("subjId"); + retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) return false; @@ -399,7 +417,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 33: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -407,6 +425,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 34: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 35: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -426,7 +452,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 35; + return 36; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 1b66b4a..735e373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1164,7 +1164,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public void onTimeout() { - state(MARKED_ROLLBACK, true); + if (local() && !dht()) + state(MARKED_ROLLBACK, true); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index da30a94..8e47105 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -268,6 +268,7 @@ public class IgniteTxHandler { req.implicitSingle(), req.implicitSingle(), req.system(), + req.explicitLock(), req.policy(), req.concurrency(), req.isolation(), @@ -292,6 +293,9 @@ public class IgniteTxHandler { } if (tx != null) { + if (req.explicitLock()) + tx.explicitLock(req.explicitLock()); + tx.transactionNodes(req.transactionNodes()); if (req.onePhaseCommit()) { @@ -538,6 +542,7 @@ public class IgniteTxHandler { true, false, /* we don't know, so assume false. */ req.system(), + req.explicitLock(), req.policy(), PESSIMISTIC, READ_COMMITTED, @@ -556,6 +561,10 @@ public class IgniteTxHandler { tx.topologyVersion(req.topologyVersion()); } + else { + if (req.explicitLock()) + tx.explicitLock(req.explicitLock()); + } tx.storeEnabled(req.storeEnabled()); @@ -580,20 +589,44 @@ public class IgniteTxHandler { return commitFut; } else { - assert tx != null : "Transaction is null for near rollback request [nodeId=" + + assert tx != null || req.explicitLock() : "Transaction is null for near rollback request [nodeId=" + nodeId + ", req=" + req + "]"; - tx.syncRollback(req.syncRollback()); + if (tx != null) { + tx.syncRollback(req.syncRollback()); - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); + tx.nearFinishFutureId(req.futureId()); + tx.nearFinishMiniId(req.miniId()); - IgniteInternalFuture rollbackFut = tx.rollbackAsync(); + IgniteInternalFuture rollbackFut = tx.rollbackAsync(); - // Only for error logging. - rollbackFut.listen(CU.errorLogger(log)); + // Only for error logging. + rollbackFut.listen(CU.errorLogger(log)); - return rollbackFut; + return rollbackFut; + } + else { + // Always send finish response. + GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), + req.futureId(), req.miniId(), null); + + try { + ctx.io().send(nodeId, res, req.policy()); + } + catch (Throwable e) { + // Double-check. + if (ctx.discovery().node(nodeId) == null) { + if (log.isDebugEnabled()) + log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + + ']'); + } + else + U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " + + "res=" + res + ']', e); + } + + return null; + } } } catch (Throwable e) { @@ -757,15 +790,36 @@ public class IgniteTxHandler { if (nearTx != null) finish(nodeId, nearTx, req); - if (dhtTx != null && !dhtTx.done()) { - dhtTx.finishFuture().listen(new CI1>() { - @Override public void apply(IgniteInternalFuture igniteTxIgniteFuture) { - sendReply(nodeId, req); - } - }); + if (req.replyRequired()) { + IgniteInternalFuture completeFut; + + IgniteInternalFuture dhtFin = dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture(); + IgniteInternalFuture nearFin = nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture(); + + if (dhtFin != null && nearFin != null) { + GridCompoundFuture fut = new GridCompoundFuture(); + + fut.add(dhtFin); + fut.add(nearFin); + + fut.markInitialized(); + + completeFut = fut; + } + else + completeFut = dhtFin != null ? dhtFin : nearFin; + + if (completeFut != null) { + completeFut.listen(new CI1>() { + @Override + public void apply(IgniteInternalFuture igniteTxIgniteFuture) { + sendReply(nodeId, req); + } + }); + } + else + sendReply(nodeId, req); } - else - sendReply(nodeId, req); } /** @@ -874,21 +928,19 @@ public class IgniteTxHandler { * @param req Request. */ protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req) { - if (req.replyRequired()) { - GridCacheMessage res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId()); + GridCacheMessage res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId()); - try { - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (Throwable e) { - // Double-check. - if (ctx.discovery().node(nodeId) == null) { - if (log.isDebugEnabled()) - log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']'); - } - else - U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e); + try { + ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + } + catch (Throwable e) { + // Double-check. + if (ctx.discovery().node(nodeId) == null) { + if (log.isDebugEnabled()) + log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']'); } + else + U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e); } } @@ -942,6 +994,16 @@ public class IgniteTxHandler { return null; } + + if (ctx.discovery().node(nodeId) == null) { + tx.state(ROLLING_BACK); + + tx.state(ROLLED_BACK); + + ctx.tm().uncommitTx(tx); + + return null; + } } if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {