Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B0732200B85 for ; Wed, 31 Aug 2016 08:32:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AEE16160AB5; Wed, 31 Aug 2016 06:32:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6B471160ADC for ; Wed, 31 Aug 2016 08:32:53 +0200 (CEST) Received: (qmail 58190 invoked by uid 500); 31 Aug 2016 06:32:52 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 57271 invoked by uid 99); 31 Aug 2016 06:32:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Aug 2016 06:32:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DBD5DE1075; Wed, 31 Aug 2016 06:32:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Wed, 31 Aug 2016 06:33:11 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [21/38] ignite git commit: ignite-2968 Deadlock detection for optimistic tx and near caches archived-at: Wed, 31 Aug 2016 06:32:56 -0000 ignite-2968 Deadlock detection for optimistic tx and near caches Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0465874d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0465874d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0465874d Branch: refs/heads/ignite-3443 Commit: 0465874d9dddcf962a82a2ef38589121201f0b75 Parents: 2891703 Author: agura Authored: Wed Aug 24 21:13:29 2016 +0300 Committer: agura Committed: Mon Aug 29 16:01:16 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 19 +- .../GridCachePartitionExchangeManager.java | 7 + .../GridDistributedTxPrepareRequest.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 53 +- .../distributed/dht/GridDhtTxFinishFuture.java | 4 +- .../cache/distributed/dht/GridDhtTxLocal.java | 26 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 109 +++- .../dht/GridDhtTxPrepareRequest.java | 4 +- .../colocated/GridDhtColocatedLockFuture.java | 37 +- .../distributed/near/GridNearLockFuture.java | 90 ++- ...arOptimisticSerializableTxPrepareFuture.java | 13 +- .../near/GridNearOptimisticTxPrepareFuture.java | 263 ++++++--- ...ridNearOptimisticTxPrepareFutureAdapter.java | 5 +- .../GridNearPessimisticTxPrepareFuture.java | 8 +- .../near/GridNearTxFinishFuture.java | 5 +- .../cache/distributed/near/GridNearTxLocal.java | 16 +- .../near/GridNearTxPrepareRequest.java | 4 +- .../cache/transactions/IgniteInternalTx.java | 3 +- .../cache/transactions/IgniteTxAdapter.java | 37 +- .../cache/transactions/IgniteTxHandler.java | 9 +- .../transactions/IgniteTxLocalAdapter.java | 19 +- .../cache/transactions/IgniteTxManager.java | 86 ++- .../cache/transactions/IgniteTxStateImpl.java | 11 +- .../cache/transactions/TxDeadlockDetection.java | 51 +- .../cache/IgniteTxConfigCacheSelfTest.java | 91 ++- .../IgniteTxTimeoutAbstractTest.java | 8 +- ...tionedMultiNodeLongTxTimeoutFullApiTest.java | 34 ++ ...nabledMultiNodeLongTxTimeoutFullApiTest.java | 41 ++ .../local/GridCacheLocalTxTimeoutSelfTest.java | 5 +- .../transactions/DepthFirstSearchTest.java | 100 +++- .../TxDeadlockDetectionNoHangsTest.java | 246 ++++++++ .../transactions/TxDeadlockDetectionTest.java | 13 +- ...timisticDeadlockDetectionCrossCacheTest.java | 257 +++++++++ .../TxOptimisticDeadlockDetectionTest.java | 574 +++++++++++++++++++ ...simisticDeadlockDetectionCrossCacheTest.java | 165 ++++-- .../TxPessimisticDeadlockDetectionTest.java | 50 +- .../IgniteCacheFullApiSelfTestSuite.java | 4 + .../TxDeadlockDetectionTestSuite.java | 6 + .../commands/cache/VisorCacheStopCommand.scala | 5 +- 39 files changed, 2127 insertions(+), 355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 57fa68e..f692bf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -4493,17 +4493,30 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** - * @return All MVCC local candidates. + * @return All MVCC local and non near candidates. */ + @SuppressWarnings("ForLoopReplaceableByForEach") @Nullable public synchronized List mvccAllLocal() { GridCacheMvcc mvcc = extras != null ? extras.mvcc() : null; if (mvcc == null) return null; - List locs = mvcc.allLocal(); + List allLocs = mvcc.allLocal(); - return (locs == null || locs.isEmpty()) ? null : new ArrayList<>(locs); + if (allLocs == null || allLocs.isEmpty()) + return null; + + List locs = new ArrayList<>(allLocs.size()); + + for (int i = 0; i < allLocs.size(); i++) { + GridCacheMvccCandidate loc = allLocs.get(i); + + if (!loc.nearLocal()) + locs.add(loc); + } + + return locs.isEmpty() ? null : locs; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index e6ab046..4eb61e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1302,6 +1302,13 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana for (GridCacheFuture fut : mvcc.atomicFutures()) U.warn(log, ">>> " + fut); + + if (tm != null) { + U.warn(log, "Pending transaction deadlock detection futures:"); + + for (IgniteInternalFuture fut : tm.deadlockDetectionFutures()) + U.warn(log, ">>> " + fut); + } } for (GridCacheContext ctx : cctx.cacheContexts()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 72e68db..c691374 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -154,6 +154,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** * @param tx Cache transaction. + * @param timeout Transactions timeout. * @param reads Read entries. * @param writes Write entries. * @param txNodes Transaction nodes mapping. @@ -162,6 +163,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage */ public GridDistributedTxPrepareRequest( IgniteInternalTx tx, + long timeout, @Nullable Collection reads, Collection writes, Map> txNodes, @@ -174,12 +176,12 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage threadId = tx.threadId(); concurrency = tx.concurrency(); isolation = tx.isolation(); - timeout = tx.timeout(); invalidate = tx.isInvalidate(); txSize = tx.size(); sys = tx.system(); plc = tx.ioPolicy(); + this.timeout = timeout; this.reads = reads; this.writes = writes; this.txNodes = txNodes; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 64b8745..b005b29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -242,12 +242,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture msgLog = cctx.shared().txLockMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtLockFuture.class); } - - if (timeout > 0) { - timeoutObj = new LockTimeoutObject(); - - cctx.time().addTimeoutObject(timeoutObj); - } } /** {@inheritDoc} */ @@ -298,8 +292,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture /** * @return Entries. */ - public synchronized Collection entriesCopy() { - return new ArrayList<>(entries()); + public Collection entriesCopy() { + synchronized (futs) { + return new ArrayList<>(entries()); + } } /** @@ -412,7 +408,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture return null; } - synchronized (this) { + synchronized (futs) { entries.add(c == null || c.reentry() ? null : entry); if (c != null && !c.reentry()) @@ -614,7 +610,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture * @param t Error. */ public void onError(Throwable t) { - synchronized (this) { + synchronized (futs) { if (err != null) return; @@ -654,15 +650,16 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture * @param entry Entry whose lock ownership changed. */ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { - if (isDone()) + if (isDone() || (inTx() && tx.remainingTime() == -1)) return false; // Check other futures. if (log.isDebugEnabled()) log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]"); if (owner != null && owner.version().equals(lockVer)) { - synchronized (this) { - pendingLocks.remove(entry.key()); + synchronized (futs) { + if (!pendingLocks.remove(entry.key())) + return false; } if (checkLocks()) @@ -677,8 +674,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture /** * @return {@code True} if locks have been acquired. */ - private synchronized boolean checkLocks() { - return pendingLocks.isEmpty(); + private boolean checkLocks() { + synchronized (futs) { + return pendingLocks.isEmpty(); + } } /** {@inheritDoc} */ @@ -709,7 +708,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture if (isDone() || (err == null && success && !checkLocks())) return false; - synchronized (this) { + synchronized (futs) { if (this.err == null) this.err = err; } @@ -776,13 +775,19 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture } readyLocks(); + + if (timeout > 0) { + timeoutObj = new LockTimeoutObject(); + + cctx.time().addTimeoutObject(timeoutObj); + } } /** * @param entries Entries. */ private void map(Iterable entries) { - synchronized (this) { + synchronized (futs) { if (mapped) return; @@ -842,6 +847,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture if (log.isDebugEnabled()) log.debug("Mapped DHT lock future [dhtMap=" + F.nodeIds(dhtMap.keySet()) + ", dhtLockFut=" + this + ']'); + long timeout = inTx() ? tx.remainingTime() : this.timeout; + // Create mini futures. for (Map.Entry> mapped : dhtMap.entrySet()) { ClusterNode n = mapped.getKey(); @@ -853,6 +860,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture if (cnt > 0) { assert !n.id().equals(cctx.localNodeId()); + if (inTx() && tx.remainingTime() == -1) + return; + MiniFuture fut = new MiniFuture(n, dhtMapping); GridDhtLockRequest req = new GridDhtLockRequest( @@ -1109,7 +1119,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture if (log.isDebugEnabled()) log.debug("Timed out waiting for lock response: " + this); - timedOut = true; + synchronized (futs) { + timedOut = true; + + // Stop locks and responses processing. + pendingLocks.clear(); + + futs.clear(); + } boolean releaseLocks = !(inTx() && cctx.tm().deadlockDetectionEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 4ece775..d2e26b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -133,6 +133,7 @@ public final class GridDhtTxFinishFuture extends GridCompoundIdentityFutur } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public boolean onNodeLeft(UUID nodeId) { for (IgniteInternalFuture fut : futures()) if (isMini(fut)) { @@ -391,8 +392,7 @@ public final class GridDhtTxFinishFuture extends GridCompoundIdentityFutur * @param nearMap Near map. * @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for. */ - private boolean finish(Map dhtMap, - Map nearMap) { + private boolean finish(Map dhtMap, Map nearMap) { if (tx.onePhaseCommit()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index c9d4345..b659abb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -314,6 +314,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa true); } + long timeout = remainingTime(); + // For pessimistic mode we don't distribute prepare request. GridDhtTxPrepareFuture fut = prepFut; @@ -322,11 +324,16 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( cctx, this, + timeout, nearMiniId, Collections.emptyMap(), true, - needReturnValue()))) + needReturnValue()))) { + if (timeout == -1) + prepFut.onError(timeoutException()); + return prepFut; + } } else // Prepare was called explicitly. @@ -334,15 +341,16 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (!state(PREPARING)) { if (setRollbackOnly()) { - if (timedOut()) - fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + this)); + if (timeout == -1) + fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + + this)); else fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + ", tx=" + this + ']')); } else - fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" + state() - + ", tx=" + this + ']')); + fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" + + state() + ", tx=" + this + ']')); return fut; } @@ -394,6 +402,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture fut = prepFut; + long timeout = remainingTime(); + if (fut == null) { init(); @@ -401,6 +411,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( cctx, this, + timeout, nearMiniId, verMap, last, @@ -410,6 +421,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; + if (timeout == -1) + f.onError(timeoutException()); + return chainOnePhasePrepare(f); } } @@ -427,7 +441,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa fut.complete(); if (setRollbackOnly()) { - if (timedOut()) + if (timeout == -1) fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + this)); else http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index e9805aa..1bdd9b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -59,8 +59,10 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -204,9 +206,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture dhtVerMap, boolean last, @@ -243,6 +250,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture 0 ? new PrepareTimeoutObject(timeout) : null; } /** {@inheritDoc} */ @@ -269,7 +278,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture fut = futs.get(i); @@ -543,9 +548,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture keys = new HashSet<>(); for (IgniteTxEntry txEntry : tx.allEntries()) { @@ -1434,7 +1453,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture tx.removeMapping(node.id()); // Primary node left the grid, so fail the future. - GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id())); + GridDhtColocatedLockFuture.this.onDone(false, newTopologyException(e, node.id())); onDone(true); } @@ -1494,7 +1513,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture else remap(); } - else { + else { int i = 0; for (KeyCacheObject k : keys) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 4b6448b..3d9b6ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; @@ -48,8 +50,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -63,7 +67,9 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -481,6 +487,38 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture requestedKeys() { + synchronized (futs) { + if (timeoutObj != null && timeoutObj.requestedKeys != null) + return timeoutObj.requestedKeys; + + return requestedKeys0(); + } + } + + /** + * @return Keys for which locks requested from remote nodes but response isn't received. + */ + private Set requestedKeys0() { + for (IgniteInternalFuture miniFut : futures()) { + if (isMini(miniFut) && !miniFut.isDone()) { + MiniFuture mini = (MiniFuture)miniFut; + + Set requestedKeys = U.newHashSet(mini.keys.size()); + + for (KeyCacheObject key : mini.keys) + requestedKeys.add(new IgniteTxKey(key, cctx.cacheId())); + + return requestedKeys; + } + } + + return null; + } + + /** * Finds pending mini future by the given mini ID. * * @param miniId Mini ID to find. @@ -621,6 +659,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture keys = new HashSet<>(); + + for (IgniteTxEntry txEntry : tx.allEntries()) { + if (!txEntry.locked()) + keys.add(txEntry.txKey()); + } + + IgniteInternalFuture fut = cctx.tm().detectDeadlock(tx, keys); + + fut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + TxDeadlock deadlock = fut.get(); + + if (deadlock != null) + err = new TransactionDeadlockException(deadlock.toString(cctx.shared())); + } + catch (IgniteCheckedException e) { + err = e; + + U.warn(log, "Failed to detect deadlock.", e); + } + + onComplete(false, true); + } + }); + } + else + onComplete(false, true); } /** {@inheritDoc} */ @@ -1466,7 +1546,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture requestedKeys() { + synchronized (futs) { + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture fut = futs.get(i); + + if (isMini(fut) && !fut.isDone()) { + MiniFuture miniFut = (MiniFuture)fut; + + Collection entries = miniFut.mapping().entries(); + + Set keys = U.newHashSet(entries.size()); + + for (IgniteTxEntry entry : entries) + keys.add(entry.txKey()); + + return keys; + } + } + } + + return null; + } + + /** * Finds pending mini future by the given mini ID. * * @param miniId Mini ID to find. @@ -264,7 +307,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (!txStateCheck) { if (tx.setRollbackOnly()) { - if (tx.timedOut()) + if (tx.remainingTime() == -1) onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " + "was rolled back: " + this), false); else @@ -437,89 +480,97 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa final ClusterNode n = m.node(); - GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( - futId, - tx.topologyVersion(), - tx, - null, - m.writes(), - m.near(), - txMapping.transactionNodes(), - m.last(), - tx.onePhaseCommit(), - tx.needReturnValue() && tx.implicit(), - tx.implicitSingle(), - m.explicitLock(), - tx.subjectId(), - tx.taskNameHash(), - m.clientFirst(), - tx.activeCachesDeploymentEnabled()); - - for (IgniteTxEntry txEntry : m.entries()) { - if (txEntry.op() == TRANSFORM) - req.addDhtVersion(txEntry.txKey(), null); - } + long timeout = tx.remainingTime(); + + if (timeout != -1) { + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + timeout, + null, + m.writes(), + m.near(), + txMapping.transactionNodes(), + m.last(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash(), + m.clientFirst(), + tx.activeCachesDeploymentEnabled()); + + for (IgniteTxEntry txEntry : m.entries()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } - // Must lock near entries separately. - if (m.near()) { - try { - tx.optimisticLockEntries(req.writes()); + // Must lock near entries separately. + if (m.near()) { + try { + tx.optimisticLockEntries(req.writes()); - tx.userPrepare(); - } - catch (IgniteCheckedException e) { - onError(e, false); + tx.userPrepare(); + } + catch (IgniteCheckedException e) { + onError(e, false); + } } - } - final MiniFuture fut = new MiniFuture(this, m, mappings); + final MiniFuture fut = new MiniFuture(this, m, mappings); - req.miniId(fut.futureId()); + req.miniId(fut.futureId()); - add(fut); // Append new future. + add(fut); // Append new future. - // If this is the primary node for the keys. - if (n.isLocal()) { - // At this point, if any new node joined, then it is - // waiting for this transaction to complete, so - // partition reassignments are not possible here. - IgniteInternalFuture prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + // If this is the primary node for the keys. + if (n.isLocal()) { + // At this point, if any new node joined, then it is + // waiting for this transaction to complete, so + // partition reassignments are not possible here. + IgniteInternalFuture prepFut = + cctx.tm().txHandler().prepareTx(n.id(), tx, req); - prepFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture prepFut) { - try { - fut.onResult(n.id(), prepFut.get()); + prepFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture prepFut) { + try { + fut.onResult(prepFut.get()); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } } - catch (IgniteCheckedException e) { - fut.onResult(e); + }); + } + else { + try { + cctx.io().send(n, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() + + ", node=" + n.id() + ']'); } } - }); - } - else { - try { - cctx.io().send(n, req, tx.ioPolicy()); + catch (ClusterTopologyCheckedException e) { + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() + - ", node=" + n.id() + ']'); + fut.onNodeLeft(e, false); } - } - catch (ClusterTopologyCheckedException e) { - e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } - fut.onNodeLeft(e, false); - } - catch (IgniteCheckedException e) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() + - ", node=" + n.id() + - ", err=" + e + ']'); + fut.onResult(e); } - - fut.onResult(e); } } + else + onTimeout(); } finally { if (set) @@ -623,6 +674,61 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return cur; } + /** + * + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private void onTimeout() { + if (cctx.tm().deadlockDetectionEnabled()) { + Set keys = null; + + if (keyLockFut != null) + keys = new HashSet<>(keyLockFut.lockKeys); + else { + if (futs != null && !futs.isEmpty()) { + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture fut = futs.get(i); + + if (isMini(fut) && !fut.isDone()) { + MiniFuture miniFut = (MiniFuture)fut; + + Collection entries = miniFut.mapping().entries(); + + keys = U.newHashSet(entries.size()); + + for (IgniteTxEntry entry : entries) + keys.add(entry.txKey()); + + break; + } + } + } + } + + add(new GridEmbeddedFuture<>(new IgniteBiClosure() { + @Override public GridNearTxPrepareResponse apply(TxDeadlock deadlock, Exception e) { + if (e != null) + U.warn(log, "Failed to detect deadlock.", e); + else { + e = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " + + "transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']', + deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx)) : null); + } + + onDone(null, e); + + return null; + } + }, cctx.tm().detectDeadlock(tx, keys))); + } + else { + ERR_UPD.compareAndSet(this, null, new IgniteTxTimeoutCheckedException("Failed to acquire lock " + + "within provided timeout for transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']')); + + onComplete(false); + } + } + /** {@inheritDoc} */ @Override public String toString() { Collection futs = F.viewReadOnly(futures(), new C1, String>() { @@ -652,7 +758,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa private static final long serialVersionUID = 0L; /** Receive result flag updater. */ - private static AtomicIntegerFieldUpdater RCV_RES_UPD = + private static final AtomicIntegerFieldUpdater RCV_RES_UPD = AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes"); /** Parent future. */ @@ -745,15 +851,21 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } /** - * @param nodeId Failed node ID. * @param res Result callback. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - void onResult(UUID nodeId, final GridNearTxPrepareResponse res) { + void onResult(final GridNearTxPrepareResponse res) { if (isDone()) return; if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { + if (parent.cctx.tm().deadlockDetectionEnabled() && + (parent.tx.remainingTime() == -1 || res.error() instanceof IgniteTxTimeoutCheckedException)) { + parent.onTimeout(); + + return; + } + if (res.error() != null) { // Fail the whole compound future. parent.onError(res.error(), false); @@ -801,8 +913,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa */ private void remap() { parent.prepareOnTopology(true, new Runnable() { - @Override - public void run() { + @Override public void run() { onDone((GridNearTxPrepareResponse) null); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index 4d77a3c..a00cf3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -40,8 +40,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT * @param cctx Context. * @param tx Transaction. */ - public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, - GridNearTxLocal tx) { + public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) { super(cctx, tx); assert tx.optimistic() : tx; @@ -172,7 +171,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT protected static class KeyLockFuture extends GridFutureAdapter { /** */ @GridToStringInclude - private Collection lockKeys = new GridConcurrentHashSet<>(); + protected Collection lockKeys = new GridConcurrentHashSet<>(); /** */ private volatile boolean allKeysAdded; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index ef2edc9..34b8281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -153,7 +153,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA @Override public void prepare() { if (!tx.state(PREPARING)) { if (tx.setRollbackOnly()) { - if (tx.timedOut()) + if (tx.remainingTime() == -1) onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx)); else onDone(new IgniteCheckedException("Invalid transaction state for prepare " + @@ -222,6 +222,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA checkOnePhase(); + long timeout = tx.remainingTime(); + + if (timeout == -1) + onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx)); + for (final GridDistributedTxMapping m : mappings.values()) { final ClusterNode node = m.node(); @@ -229,6 +234,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA futId, tx.topologyVersion(), tx, + timeout, m.reads(), m.writes(), m.near(), http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index adde63c..bb5d482 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -633,6 +633,9 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu if (m.explicitLock()) syncMode = FULL_SYNC; + // Version to be added in completed versions on primary node. + GridCacheVersion completedVer = !commit && tx.timeout() > 0 ? tx.xidVersion() : null; + GridNearTxFinishRequest req = new GridNearTxFinishRequest( futId, tx.xidVersion(), @@ -645,7 +648,7 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu m.explicitLock(), tx.storeEnabled(), tx.topologyVersion(), - null, + completedVer, // Reuse 'baseVersion' to do not add new fields in message. null, null, tx.size(), http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 28c60d4..410baf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -796,6 +796,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut; if (fut == null) { + long timeout = remainingTime(); + // Future must be created before any exception can be thrown. if (optimistic()) { fut = serializable() ? @@ -807,6 +809,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (!PREP_FUT_UPD.compareAndSet(this, null, fut)) return prepFut; + + if (timeout == -1) { + fut.onDone(this, timeoutException()); + + return fut; + } } else // Prepare was called explicitly. @@ -964,8 +972,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { Map> txNodes, boolean last ) { + long timeout = remainingTime(); + if (state() != PREPARING) { - if (timedOut()) + if (timeout == -1) return new GridFinishedFuture<>( new IgniteTxTimeoutCheckedException("Transaction timed out: " + this)); @@ -975,11 +985,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + ", tx=" + this + ']')); } + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + init(); GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture( cctx, this, + timeout, IgniteUuid.randomUuid(), Collections.emptyMap(), last, http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 9dfdb43..e55566b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -94,6 +94,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param futId Future ID. * @param topVer Topology version. * @param tx Transaction. + * @param timeout Transaction timeout. * @param reads Read entries. * @param writes Write entries. * @param near {@code True} if mapping is for near caches. @@ -112,6 +113,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { IgniteUuid futId, AffinityTopologyVersion topVer, IgniteInternalTx tx, + long timeout, Collection reads, Collection writes, boolean near, @@ -126,7 +128,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean firstClientReq, boolean addDepInfo ) { - super(tx, reads, writes, txNodes, onePhaseCommit, addDepInfo); + super(tx, timeout, reads, writes, txNodes, onePhaseCommit, addDepInfo); assert futId != null; assert !firstClientReq || tx.optimistic() : tx; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 8c0425d..dd900fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.lang.IgniteAsyncSupported; @@ -46,7 +45,7 @@ import org.jetbrains.annotations.Nullable; /** * Transaction managed by cache ({@code 'Ex'} stands for external). */ -public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { +public interface IgniteInternalTx extends AutoCloseable { /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index f76f4bf..eb2989e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -712,7 +712,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** * @return Transaction timeout exception. */ - protected final IgniteCheckedException timeoutException() { + public final IgniteCheckedException timeoutException() { return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " + "for transaction [timeout=" + timeout() + ", tx=" + this + ']'); } @@ -1032,7 +1032,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement * @return {@code True} if state changed. */ @SuppressWarnings({"TooBroadScope"}) - private boolean state(TransactionState state, boolean timedOut) { + protected boolean state(TransactionState state, boolean timedOut) { boolean valid = false; TransactionState prev; @@ -1154,24 +1154,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return xidVer.asGridUuid(); - } - - /** {@inheritDoc} */ - @Override public long endTime() { - long endTime = timeout == 0 ? Long.MAX_VALUE : startTime + timeout; - - return endTime > 0 ? endTime : endTime < 0 ? Long.MAX_VALUE : endTime; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - if (local() && !dht()) - state(MARKED_ROLLBACK, true); - } - - /** {@inheritDoc} */ @Override public boolean timedOut() { return timedOut; } @@ -2387,21 +2369,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return null; - } - - /** {@inheritDoc} */ - @Override public long endTime() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - // No-op. - } - - /** {@inheritDoc} */ @Override public boolean equals(Object o) { return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 7c3c206..e67e60f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -112,8 +112,7 @@ public class IgniteTxHandler { * @param req Request. * @return Prepare future. */ - public IgniteInternalFuture processNearTxPrepareRequest(final UUID nearNodeId, - final GridNearTxPrepareRequest req) { + public IgniteInternalFuture processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) { if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() + ", node=" + nearNodeId + ']'); @@ -361,7 +360,7 @@ public class IgniteTxHandler { req.deployInfo() != null); try { - ctx.io().send(nearNode, res, req.policy()); + ctx.io().send(nearNodeId, res, req.policy()); if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() + @@ -667,6 +666,10 @@ public class IgniteTxHandler { assert nodeId != null; assert req != null; + // 'baseVersion' message field is re-used for version to be added in completed versions. + if (!req.commit() && req.baseVersion() != null) + ctx.tm().addRolledbackTx(null, req.baseVersion()); + // Transaction on local cache only. if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped()) return new GridFinishedFuture(locTx); http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d9aca4a..9ad7fb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -107,6 +107,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; +import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; @@ -547,14 +548,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @SuppressWarnings({"CatchGenericClass"}) public void userPrepare() throws IgniteCheckedException { if (state() != PREPARING) { - if (timedOut()) + if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); TransactionState state = state(); setRollbackOnly(); - throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']'); + throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + + state + ", tx=" + this + ']'); } checkValid(); @@ -629,7 +631,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig TransactionState state = state(); if (state != COMMITTING) { - if (timedOut()) + if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); setRollbackOnly(); @@ -3540,8 +3542,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @throws IgniteCheckedException If transaction check failed. */ protected void checkValid() throws IgniteCheckedException { + if (local() && !dht() && remainingTime() == -1) + state(MARKED_ROLLBACK, true); + if (isRollbackOnly()) { - if (timedOut()) + if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Cache transaction timed out: " + this); TransactionState state = state(); @@ -3556,10 +3561,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig throw new IgniteCheckedException("Cache transaction marked as rollback-only: " + this); } - - if (remainingTime() == -1 && setRollbackOnly()) - throw new IgniteTxTimeoutCheckedException("Cache transaction timed out " + - "(was rolled back automatically): " + this); } /** {@inheritDoc} */ @@ -3604,7 +3605,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig TransactionState state = state(); - assert state == TransactionState.ACTIVE || timedOut() : + assert state == TransactionState.ACTIVE || remainingTime() == -1 : "Invalid tx state for adding entry [op=" + op + ", val=" + val + ", entry=" + entry + ", filter=" + Arrays.toString(filter) + ", txCtx=" + cctx.tm().txContextVersion() + ", tx=" + this + ']';