Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A99E3200D12 for ; Fri, 22 Sep 2017 13:21:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A7EA11609A7; Fri, 22 Sep 2017 11:21:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 515B2160BDC for ; Fri, 22 Sep 2017 13:21:43 +0200 (CEST) Received: (qmail 68106 invoked by uid 500); 22 Sep 2017 11:21:42 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 67758 invoked by uid 99); 22 Sep 2017 11:21:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Sep 2017 11:21:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3FCE9F5A9E; Fri, 22 Sep 2017 11:21:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 22 Sep 2017 11:21:57 -0000 Message-Id: <6c4cb971246c459ab9abdc9767b9dcfc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/35] ignite git commit: ignite-6181 Tx rollback on timeout archived-at: Fri, 22 Sep 2017 11:21:46 -0000 ignite-6181 Tx rollback on timeout Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5af30cf1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5af30cf1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5af30cf1 Branch: refs/heads/ignite-3479 Commit: 5af30cf118aeb3910398e6b15dbe2a51b62746d7 Parents: 27295f2 Author: sboikov Authored: Fri Sep 22 11:20:16 2017 +0300 Committer: sboikov Committed: Fri Sep 22 11:20:16 2017 +0300 ---------------------------------------------------------------------- .../IgniteDiagnosticPrepareContext.java | 4 +- .../processors/cache/CacheMetricsImpl.java | 2 +- .../processors/cache/GridCacheAdapter.java | 18 +- .../processors/cache/GridCacheMapEntry.java | 9 +- .../processors/cache/GridCacheMvccManager.java | 5 +- .../GridDistributedTxRemoteAdapter.java | 2 +- .../distributed/dht/GridDhtTxFinishFuture.java | 2 +- .../cache/distributed/dht/GridDhtTxLocal.java | 6 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 4 +- .../colocated/GridDhtColocatedLockFuture.java | 79 ++- .../distributed/near/GridNearLockFuture.java | 116 ++-- .../near/GridNearOptimisticTxPrepareFuture.java | 3 +- .../near/GridNearTransactionalCache.java | 3 - .../near/GridNearTxFastFinishFuture.java | 82 +++ .../near/GridNearTxFinishFuture.java | 23 +- .../cache/distributed/near/GridNearTxLocal.java | 303 ++++++--- .../distributed/near/NearTxFinishFuture.java | 31 + .../cache/transactions/IgniteInternalTx.java | 2 +- .../cache/transactions/IgniteTxAdapter.java | 13 +- .../cache/transactions/IgniteTxHandler.java | 26 +- .../transactions/IgniteTxLocalAdapter.java | 10 +- .../cache/transactions/IgniteTxLocalEx.java | 7 +- .../cache/transactions/IgniteTxManager.java | 80 +-- .../timeout/GridTimeoutProcessor.java | 23 +- .../processors/cache/CacheTxFastFinishTest.java | 9 +- .../cache/IgniteTxConfigCacheSelfTest.java | 14 + .../IgniteCacheThreadLocalTxTest.java | 223 +++++++ .../IgniteOptimisticTxSuspendResumeTest.java | 6 +- ...ionedMultiNodeLongTxTimeout2FullApiTest.java | 34 + .../TxRollbackOnTimeoutNearCacheTest.java | 28 + ...ollbackOnTimeoutNoDeadlockDetectionTest.java | 47 ++ .../transactions/TxRollbackOnTimeoutTest.java | 655 +++++++++++++++++++ .../IgniteCacheFullApiSelfTestSuite.java | 2 + .../testsuites/IgniteCacheTestSuite6.java | 10 + .../hadoop/impl/HadoopTxConfigCacheTest.java | 4 +- 35 files changed, 1627 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java index 378dc74..ed8d35e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java @@ -75,7 +75,7 @@ public class IgniteDiagnosticPrepareContext { * @param keys Entry keys. * @param msg Initial message. */ - public void txKeyInfo(UUID nodeId, int cacheId, Collection keys, String msg) { + public void txKeyInfo(UUID nodeId, int cacheId, Collection keys, String msg) { closure(nodeId).add(msg, new TxEntriesInfoClosure(cacheId, keys)); } @@ -280,4 +280,4 @@ public class IgniteDiagnosticPrepareContext { } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index d03a6f8..413b60d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -328,7 +328,7 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxDhtThreadMapSize() { - return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().threadMapSize() : -1; + return cctx.tm().threadMapSize(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 92a8245..32b1b99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -78,6 +78,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteTransactionsEx; import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.cluster.IgniteClusterEx; @@ -1857,7 +1858,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache(e); } - tx = ctx.tm().threadLocalTx(ctx.systemTx() ? ctx : null); + tx = ctx.tm().threadLocalTx(ctx); } if (tx == null || tx.implicit()) { @@ -4057,7 +4058,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache fut, final IgniteUuid futId) { + public boolean addFuture(final GridCacheFuture fut, final IgniteUuid futId) { GridCacheFuture old = futs.put(futId, fut); assert old == null : old; - onFutureAdded(fut); + return onFutureAdded(fut); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index ea6461d..e5bcc46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -855,7 +855,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Note that we don't evict near entries here - // they will be deleted by their corresponding transactions. if (state(ROLLING_BACK) || state() == UNKNOWN) { - cctx.tm().rollbackTx(this); + cctx.tm().rollbackTx(this, false); state(ROLLED_BACK); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 5311ddc..6380710 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -227,7 +227,7 @@ public final class GridDhtTxFinishFuture extends GridCacheCompoundIdentity try { boolean nodeStop = err != null && X.hasCause(err, NodeStoppingException.class); - this.tx.tmFinish(err == null, nodeStop); + this.tx.tmFinish(err == null, nodeStop, false); } catch (IgniteCheckedException finishErr) { U.error(log, "Failed to finish tx: " + tx, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index ab5631e..28cc018 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -436,7 +436,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (prepFut != null) prepFut.get(); // Check for errors. - boolean finished = localFinish(commit); + boolean finished = localFinish(commit, false); if (!finished) err = new IgniteCheckedException("Failed to finish transaction [commit=" + commit + @@ -542,7 +542,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) - @Override public boolean localFinish(boolean commit) throws IgniteCheckedException { + @Override public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException { assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate() || onePhaseCommit() || state() == PREPARED : "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit + @@ -550,7 +550,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa assert nearMiniId != 0; - return super.localFinish(commit); + return super.localFinish(commit, clearThreadMap); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 86eac42..e4a7141 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -734,7 +734,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) - @Override public boolean localFinish(boolean commit) throws IgniteCheckedException { + @Override public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Finishing dht local tx [tx=" + this + ", commit=" + commit + "]"); @@ -773,7 +773,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (commit && !isRollbackOnly()) userCommit(); else - userRollback(); + userRollback(clearThreadMap); } catch (IgniteCheckedException e) { err = e; http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 58c6319..e4f4601 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -54,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; @@ -99,6 +101,10 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** Logger. */ private static IgniteLogger msgLog; + /** Done field updater. */ + private static final AtomicIntegerFieldUpdater DONE_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridDhtColocatedLockFuture.class, "done"); + /** Cache registry. */ @GridToStringExclude private final GridCacheContext cctx; @@ -146,6 +152,10 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** Map of current values. */ private final Map> valMap; + /** */ + @SuppressWarnings("UnusedDeclaration") + private volatile int done; + /** Trackable flag (here may be non-volatile). */ private boolean trackable; @@ -226,12 +236,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF log = U.logger(cctx.kernalContext(), logRef, GridDhtColocatedLockFuture.class); } - if (timeout > 0) { - timeoutObj = new LockTimeoutObject(); - - cctx.time().addTimeoutObject(timeoutObj); - } - valMap = new ConcurrentHashMap8<>(); } @@ -322,6 +326,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF else { IgniteTxEntry txEntry = tx.entry(txKey); + assert txEntry != null; + txEntry.cached(entry); // Check transaction entries (corresponding tx entries must be enlisted in transaction). @@ -332,7 +338,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF threadId, lockVer, true, - tx.entry(txKey).locked(), + txEntry.locked(), inTx(), inTx() && tx.implicitSingle(), false, @@ -399,7 +405,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * @param success Success flag. */ public void complete(boolean success) { - onComplete(success, true); + onComplete(success, true, true); } /** @@ -533,7 +539,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** {@inheritDoc} */ @Override public boolean cancel() { if (onCancelled()) - onComplete(false, true); + onComplete(false, true, true); return isCancelled(); } @@ -556,7 +562,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (err != null) success = false; - return onComplete(success, true); + return onComplete(success, true, true); } /** @@ -564,19 +570,32 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * * @param success {@code True} if lock was acquired. * @param distribute {@code True} if need to distribute lock removal in case of failure. + * @param restoreTimeout {@code True} if need restore tx timeout callback. * @return {@code True} if complete by this operation. */ - private boolean onComplete(boolean success, boolean distribute) { - if (log.isDebugEnabled()) + private boolean onComplete(boolean success, boolean distribute, boolean restoreTimeout) { + if (log.isDebugEnabled()) { log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute + ", fut=" + this + ']'); + } + + if (!DONE_UPD.compareAndSet(this, 0, 1)) + return false; if (!success) undoLocks(distribute, true); - if (tx != null) + if (tx != null) { cctx.tm().txContext(tx); + if (restoreTimeout && tx.trackTimeout()) { + // Need restore timeout before onDone is called and next tx operation can proceed. + boolean add = tx.addTimeoutHandler(); + + assert add; + } + } + if (super.onDone(success, err)) { if (log.isDebugEnabled()) log.debug("Completing future: " + this); @@ -675,6 +694,30 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back. */ void map() { + if (tx != null && tx.trackTimeout()) { + if (!tx.removeTimeoutHandler()) { + tx.finishFuture().listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture fut) { + IgniteTxTimeoutCheckedException err = new IgniteTxTimeoutCheckedException("Failed to " + + "acquire lock, transaction was rolled back on timeout [timeout=" + tx.timeout() + + ", tx=" + tx + ']'); + + onError(err); + + onComplete(false, false, false); + } + }); + + return; + } + } + + if (timeout > 0) { + timeoutObj = new LockTimeoutObject(); + + cctx.time().addTimeoutObject(timeoutObj); + } + // Obtain the topology version to use. AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); @@ -930,7 +973,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (log.isDebugEnabled()) log.debug("Entry being locked did not pass filter (will not lock): " + entry); - onComplete(false, false); + onComplete(false, false, true); return; } @@ -1307,7 +1350,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF if (log.isDebugEnabled()) log.debug("Entry being locked did not pass filter (will not lock): " + entry); - onComplete(false, false); + onComplete(false, false, true); return false; } @@ -1419,12 +1462,12 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF U.warn(log, "Failed to detect deadlock.", e); } - onComplete(false, true); + onComplete(false, true, true); } }); } else - onComplete(false, true); + onComplete(false, true, true); } /** {@inheritDoc} */ @@ -1673,4 +1716,4 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString()); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index c947715..3d9989d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -50,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; @@ -89,6 +91,10 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture logRef = new AtomicReference<>(); + /** Done field updater. */ + private static final AtomicIntegerFieldUpdater DONE_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridNearLockFuture.class, "done"); + /** */ private static IgniteLogger log; @@ -142,8 +148,9 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture> valMap; - /** Trackable flag. */ - private boolean trackable = true; + /** */ + @SuppressWarnings("UnusedDeclaration") + private volatile int done; /** Keys locked so far. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @@ -184,6 +191,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture cctx, @@ -230,12 +238,6 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture 0) { - timeoutObj = new LockTimeoutObject(); - - cctx.time().addTimeoutObject(timeoutObj); - } - valMap = new ConcurrentHashMap8<>(); } @@ -260,12 +262,12 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + IgniteTxTimeoutCheckedException err = new IgniteTxTimeoutCheckedException("Failed to " + + "acquire lock, transaction was rolled back on timeout [timeout=" + tx.timeout() + + ", tx=" + tx + ']'); + + onError(err); + + onComplete(false, false, false); + } + }); + + return; + } + } + + if (timeout > 0) { + timeoutObj = new LockTimeoutObject(); + + cctx.time().addTimeoutObject(timeoutObj); + } + + boolean added = cctx.mvcc().addFuture(this); + + assert added : this; + // Obtain the topology version to use. long threadId = Thread.currentThread().getId(); @@ -971,19 +1014,13 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture val = entry.versionedValue(); @@ -1032,9 +1066,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture extends GridNearCacheAdapter opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery()); - if (!ctx.mvcc().addFuture(fut)) - throw new IllegalStateException("Duplicate future ID: " + fut); - fut.map(); return fut; http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java new file mode 100644 index 0000000..7222697 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFastFinishFuture.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.future.GridFutureAdapter; + +import static org.apache.ignite.transactions.TransactionState.COMMITTED; +import static org.apache.ignite.transactions.TransactionState.COMMITTING; +import static org.apache.ignite.transactions.TransactionState.PREPARED; +import static org.apache.ignite.transactions.TransactionState.PREPARING; +import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; +import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; + +/** + * + */ +public class GridNearTxFastFinishFuture extends GridFutureAdapter implements NearTxFinishFuture { + /** */ + private final GridNearTxLocal tx; + + /** */ + private final boolean commit; + + /** + * @param tx Transaction. + * @param commit Commit flag. + */ + GridNearTxFastFinishFuture(GridNearTxLocal tx, boolean commit) { + this.tx = tx; + this.commit = commit; + } + + /** {@inheritDoc} */ + @Override public boolean commit() { + return commit; + } + + /** + * + */ + public void finish() { + try { + if (commit) { + tx.state(PREPARING); + tx.state(PREPARED); + tx.state(COMMITTING); + + tx.context().tm().fastFinishTx(tx, true); + + tx.state(COMMITTED); + } + else { + tx.state(PREPARING); + tx.state(PREPARED); + tx.state(ROLLING_BACK); + + tx.context().tm().fastFinishTx(tx, false); + + tx.state(ROLLED_BACK); + } + } + finally { + onDone(tx); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index c45eb7b..b6a8855 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -67,7 +67,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; * */ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentityFuture - implements GridCacheFuture { + implements GridCacheFuture, NearTxFinishFuture { /** */ private static final long serialVersionUID = 0L; @@ -136,6 +136,11 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit } /** {@inheritDoc} */ + @Override public boolean commit() { + return commit; + } + + /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futId; } @@ -278,6 +283,13 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit } } + /** + * + */ + void forceFinish() { + super.onDone(tx, null, false); + } + /** {@inheritDoc} */ @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) { if (isDone()) @@ -310,7 +322,7 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit err = new TransactionRollbackException("Failed to commit transaction.", err); try { - tx.localFinish(err == null); + tx.localFinish(err == null, true); } catch (IgniteCheckedException e) { if (err != null) @@ -327,7 +339,7 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit finishOnePhase(commit); try { - tx.tmFinish(commit, nodeStop); + tx.tmFinish(commit, nodeStop, true); } catch (IgniteCheckedException e) { U.error(log, "Failed to finish tx: " + tx, e); @@ -386,9 +398,10 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit * Initializes future. * * @param commit Commit flag. + * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} from thread map. */ @SuppressWarnings("ForLoopReplaceableByForEach") - void finish(boolean commit) { + public void finish(boolean commit, boolean clearThreadMap) { if (tx.onNeedCheckBackup()) { assert tx.onePhaseCommit(); @@ -402,7 +415,7 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit } try { - if (tx.localFinish(commit) || (!commit && tx.state() == UNKNOWN)) { + if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) { if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit() && mappings != null)) { if (mappings.single()) { GridDistributedTxMapping mapping = mappings.singleMapping(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 058a3ff..8b043d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; @@ -89,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionConcurrency; @@ -107,7 +109,7 @@ import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxE import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; -import static org.apache.ignite.transactions.TransactionState.PREPARED; +import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; @@ -117,7 +119,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; * Replicated user transaction. */ @SuppressWarnings("unchecked") -public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoCloseable { +public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable { /** */ private static final long serialVersionUID = 0L; @@ -126,12 +128,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, IgniteInternalFuture.class, "prepFut"); /** Prepare future updater. */ - private static final AtomicReferenceFieldUpdater COMMIT_FUT_UPD = - AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "commitFut"); - - /** Rollback future updater. */ - private static final AtomicReferenceFieldUpdater ROLLBACK_FUT_UPD = - AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "rollbackFut"); + private static final AtomicReferenceFieldUpdater FINISH_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, NearTxFinishFuture.class, "finishFut"); /** DHT mappings. */ private IgniteTxMappings mappings; @@ -144,12 +142,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea /** Commit future. */ @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private volatile GridNearTxFinishFuture commitFut; - - /** Rollback future. */ - @SuppressWarnings("UnusedDeclaration") - @GridToStringExclude - private volatile GridNearTxFinishFuture rollbackFut; + private volatile NearTxFinishFuture finishFut; /** True if transaction contains near cache entries mapped to local node. */ private boolean nearLocallyMapped; @@ -170,6 +163,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea protected boolean transform; /** */ + private boolean trackTimeout; + + /** */ @GridToStringExclude private TransactionProxyImpl proxy; @@ -229,6 +225,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl(); initResult(); + + if (timeout() > 0 && !implicit()) + trackTimeout = cctx.time().addTimeoutObject(this); } /** {@inheritDoc} */ @@ -3044,7 +3043,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) - @Override public boolean localFinish(boolean commit) throws IgniteCheckedException { + @Override public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Finishing near local tx [tx=" + this + ", commit=" + commit + "]"); @@ -3080,7 +3079,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea if (commit && !isRollbackOnly()) userCommit(); else - userRollback(); + userRollback(clearThreadMap); } catch (IgniteCheckedException e) { err = e; @@ -3146,6 +3145,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea if (!PREP_FUT_UPD.compareAndSet(this, null, fut)) return prepFut; + if (trackTimeout) + removeTimeoutHandler(); + if (timeout == -1) { fut.onDone(this, timeoutException()); @@ -3178,14 +3180,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea if (awaitLastFuture) txState().awaitLastFuture(cctx); - prepareAsync().get(); - } - - /** - * @return Prepare future. - */ - private IgniteInternalFuture prepareAsync() { - return prepareNearTxLocal(); + prepareNearTxLocal().get(); } /** @@ -3202,42 +3197,43 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea if (log.isDebugEnabled()) log.debug("Committing near local tx: " + this); + NearTxFinishFuture fut = finishFut; + + if (fut != null) + return chainFinishFuture(fut, true); + if (fastFinish()) { - state(PREPARING); - state(PREPARED); - state(COMMITTING); + GridNearTxFastFinishFuture fut0; - cctx.tm().fastFinishTx(this, true); + if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFastFinishFuture(this, true))) + return chainFinishFuture(finishFut, true); - state(COMMITTED); + fut0.finish(); - return new GridFinishedFuture<>((IgniteInternalTx)this); + return fut0; } - final IgniteInternalFuture prepareFut = prepareNearTxLocal(); + final GridNearTxFinishFuture fut0; - GridNearTxFinishFuture fut = commitFut; + if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, true))) + return chainFinishFuture(finishFut, true); - if (fut != null || - !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, true))) - return commitFut; + cctx.mvcc().addFuture(fut0, fut0.futureId()); - cctx.mvcc().addFuture(fut, fut.futureId()); + final IgniteInternalFuture prepareFut = prepareNearTxLocal(); prepareFut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture f) { - GridNearTxFinishFuture fut0 = commitFut; - try { // Make sure that here are no exceptions. prepareFut.get(); - fut0.finish(true); + fut0.finish(true, true); } catch (Error | RuntimeException e) { COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); - fut0.finish(false); + fut0.finish(false, true); throw e; } @@ -3245,12 +3241,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); if (!(e instanceof NodeStoppingException)) - fut0.finish(false); + fut0.finish(false, true); } } }); - return fut; + return fut0; } /** {@inheritDoc} */ @@ -3269,30 +3265,42 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea * @return Rollback future. */ public IgniteInternalFuture rollbackNearTxLocalAsync() { + return rollbackNearTxLocalAsync(false); + } + + /** + * @param onTimeout {@code True} if rolled back asynchronously on timeout. + * @return Rollback future. + */ + private IgniteInternalFuture rollbackNearTxLocalAsync(final boolean onTimeout) { if (log.isDebugEnabled()) log.debug("Rolling back near tx: " + this); + if (!onTimeout && trackTimeout) + removeTimeoutHandler(); + + NearTxFinishFuture fut = finishFut; + + if (fut != null) + return chainFinishFuture(finishFut, false); + if (fastFinish()) { - state(PREPARING); - state(PREPARED); - state(ROLLING_BACK); + GridNearTxFastFinishFuture fut0; - cctx.tm().fastFinishTx(this, false); + if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFastFinishFuture(this, false))) + return chainFinishFuture(finishFut, false); - state(ROLLED_BACK); + fut0.finish(); - return new GridFinishedFuture<>((IgniteInternalTx)this); + return fut0; } - GridNearTxFinishFuture fut = rollbackFut; + final GridNearTxFinishFuture fut0; - if (fut != null) - return fut; + if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, false))) + return chainFinishFuture(finishFut, false); - if (!ROLLBACK_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, false))) - return rollbackFut; - - cctx.mvcc().addFuture(fut, fut.futureId()); + cctx.mvcc().addFuture(fut0, fut0.futureId()); IgniteInternalFuture prepFut = this.prepFut; @@ -3307,7 +3315,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']'); } - fut.finish(false); + fut0.finish(false, !onTimeout); } else { prepFut.listen(new CI1>() { @@ -3321,14 +3329,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']'); } - GridNearTxFinishFuture fut0 = rollbackFut; - - fut0.finish(false); + fut0.finish(false, !onTimeout); } }); } - return fut; + return fut0; } /** {@inheritDoc} */ @@ -3337,6 +3343,64 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea } /** + * @param fut Already started finish future. + * @param commit Commit flag. + * @return Finish future. + */ + private IgniteInternalFuture chainFinishFuture(final NearTxFinishFuture fut, final boolean commit) { + assert fut != null; + + if (fut.commit() != commit) { + final GridNearTxLocal tx = this; + + if (!commit) { + final GridNearTxFinishFuture rollbackFut = new GridNearTxFinishFuture<>(cctx, this, false); + + fut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture fut0) { + if (FINISH_FUT_UPD.compareAndSet(tx, fut, rollbackFut)) { + if (tx.state() == COMMITTED) { + if (log.isDebugEnabled()) + log.debug("Failed to rollback, transaction is already committed: " + tx); + + rollbackFut.forceFinish(); + + assert rollbackFut.isDone() : rollbackFut; + } + else { + if (!cctx.mvcc().addFuture(rollbackFut, rollbackFut.futureId())) + return; + + rollbackFut.finish(false, true); + } + } + } + }); + + return rollbackFut; + } + else { + final GridFutureAdapter fut0 = new GridFutureAdapter<>(); + + fut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture fut) { + if (timedOut()) + fut0.onDone(new IgniteTxTimeoutCheckedException("Failed to commit transaction, " + + "transaction is concurrently rolled back on timeout: " + tx)); + else + fut0.onDone(new IgniteCheckedException("Failed to commit transaction, " + + "transaction is concurrently rolled back: " + tx)); + } + }); + + return fut0; + } + } + + return fut; + } + + /** * @return {@code True} if 'fast finish' path can be used for transaction completion. */ private boolean fastFinish() { @@ -3693,38 +3757,53 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea @Override public void close() throws IgniteCheckedException { TransactionState state = state(); - if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) - rollback(); + try { + if (state == COMMITTED || state == ROLLED_BACK) + return; - synchronized (this) { - try { - while (!done()) - wait(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (trackTimeout) + removeTimeoutHandler(); - if (!done()) - throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + - this, e); + if (state != ROLLING_BACK && state != COMMITTING) + rollback(); + + synchronized (this) { + try { + while (!done()) + wait(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + if (!done()) + throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + + this, e); + } } } + finally { + // It is possible tx was rolled back asynchronously on timeout and thread map is not cleared yet. + boolean cleanup = state == ROLLED_BACK && timedOut(); - if (accessMap != null) { - assert optimistic(); + if (cleanup) + cctx.tm().clearThreadMap(this); - for (Map.Entry e : accessMap.entrySet()) { - if (e.getValue().entries() != null) { - GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId()); + if (accessMap != null) { + assert optimistic(); - if (cctx0.isNear()) - cctx0.near().dht().sendTtlUpdateRequest(e.getValue()); - else - cctx0.dht().sendTtlUpdateRequest(e.getValue()); + for (Map.Entry e : accessMap.entrySet()) { + if (e.getValue().entries() != null) { + GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId()); + + if (cctx0.isNear()) + cctx0.near().dht().sendTtlUpdateRequest(e.getValue()); + else + cctx0.dht().sendTtlUpdateRequest(e.getValue()); + } } - } - accessMap = null; + accessMap = null; + } } } @@ -4000,13 +4079,69 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea /** * @param threadId new owner of transaction. - * @throws IgniteCheckedException if method executed not in the middle of resume or suspend. */ public void threadId(long threadId) { this.threadId = threadId; } /** + * @return {@code True} if need register callback which cancels tx on timeout. + */ + public boolean trackTimeout() { + return trackTimeout; + } + + /** + * Removes timeout handler. + * + * @return {@code True} if handler was removed. + */ + public boolean removeTimeoutHandler() { + assert trackTimeout; + + return cctx.time().removeTimeoutObject(this); + } + + /** + * @return {@code True} if handler was added. + */ + public boolean addTimeoutHandler() { + assert trackTimeout; + + return cctx.time().addTimeoutObject(this); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return xid(); + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return startTime() + timeout(); + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) { + if (log.isDebugEnabled()) + log.debug("Will rollback tx on timeout: " + this); + + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + // Note: if rollback asynchonously on timeout should not clear thread map + // since thread started tx still should be able to see this tx. + rollbackNearTxLocalAsync(true); + } + }); + } + else { + if (log.isDebugEnabled()) + log.debug("Skip rollback tx on timeout: " + this); + } + } + + /** * Post-lock closure. * * @param Return type. http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java new file mode 100644 index 0000000..132c754 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; + +/** + * + */ +public interface NearTxFinishFuture extends IgniteInternalFuture { + /** + * @return Commit flag. + */ + boolean commit(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 7598003..9e06d9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -634,4 +634,4 @@ public interface IgniteInternalTx { * @param e Commit error. */ public void commitError(Throwable e); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index c447436..b5178b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -370,6 +370,13 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement consistentIdMapper = new ConsistentIdMapper(cctx.discovery()); } + /** + * @return Shared cache context. + */ + public GridCacheSharedContext context() { + return cctx; + } + /** {@inheritDoc} */ @Override public boolean localResult() { assert originatingNodeId() != null; @@ -987,7 +994,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement * @return {@code True} if state changed. */ @SuppressWarnings({"TooBroadScope"}) - protected boolean state(TransactionState state, boolean timedOut) { + protected final boolean state(TransactionState state, boolean timedOut) { boolean valid = false; TransactionState prev; @@ -1068,7 +1075,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (valid) { this.state = state; - this.timedOut = timedOut; + + if (timedOut) + this.timedOut = true; if (log.isDebugEnabled()) log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index b60bab5..38c877b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -957,28 +957,30 @@ public class IgniteTxHandler { } } catch (Throwable e) { - tx.commitError(e); + U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); - tx.systemInvalidate(true); + if (tx != null) { + tx.commitError(e); - U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); + tx.systemInvalidate(true); - IgniteInternalFuture res = null; + try { + IgniteInternalFuture res = tx.rollbackDhtLocalAsync(); - try { - res = tx.rollbackDhtLocalAsync(); + // Only for error logging. + res.listen(CU.errorLogger(log)); - // Only for error logging. - res.listen(CU.errorLogger(log)); - } - catch (Throwable e1) { - e.addSuppressed(e1); + return res; + } + catch (Throwable e1) { + e.addSuppressed(e1); + } } if (e instanceof Error) throw (Error)e; - return res; + return new GridFinishedFuture<>(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 7ab921c..143e5cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -898,10 +899,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * Commits transaction to transaction manager. Used for one-phase commit transactions only. * * @param commit If {@code true} commits transaction, otherwise rollbacks. + * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} from thread map. * @param nodeStop If {@code true} tx is cancelled on node stop. * @throws IgniteCheckedException If failed. */ - public void tmFinish(boolean commit, boolean nodeStop) throws IgniteCheckedException { + public void tmFinish(boolean commit, boolean nodeStop, boolean clearThreadMap) throws IgniteCheckedException { assert onePhaseCommit(); if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { @@ -910,7 +912,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (commit) cctx.tm().commitTx(this); else - cctx.tm().rollbackTx(this); + cctx.tm().rollbackTx(this, clearThreadMap); } state(commit ? COMMITTED : ROLLED_BACK); @@ -957,7 +959,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** {@inheritDoc} */ - @Override public void userRollback() throws IgniteCheckedException { + @Override public void userRollback(boolean clearThreadMap) throws IgniteCheckedException { TransactionState state = state(); if (state != ROLLING_BACK && state != ROLLED_BACK) { @@ -975,7 +977,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { - cctx.tm().rollbackTx(this); + cctx.tm().rollbackTx(this, clearThreadMap); if (!internal()) { Collection stores = txState.stores(cctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 307c348..b61b1a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.jetbrains.annotations.Nullable; @@ -41,16 +42,18 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { public void userCommit() throws IgniteCheckedException; /** + * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} from thread map. * @throws IgniteCheckedException If rollback failed. */ - public void userRollback() throws IgniteCheckedException; + public void userRollback(boolean clearThreadMap) throws IgniteCheckedException; /** * Finishes transaction (either commit or rollback). * * @param commit {@code True} if commit, {@code false} if rollback. + * @param clearThreadMap If {@code true} removes {@link GridNearTxLocal} from thread map. * @return {@code True} if state has been changed. * @throws IgniteCheckedException If finish failed. */ - public boolean localFinish(boolean commit) throws IgniteCheckedException; + public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException; }