Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C78B0200C38 for ; Wed, 15 Mar 2017 15:37:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C611E160B70; Wed, 15 Mar 2017 14:37:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8F1E9160B72 for ; Wed, 15 Mar 2017 15:37:20 +0100 (CET) Received: (qmail 7156 invoked by uid 500); 15 Mar 2017 14:37:19 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 7096 invoked by uid 99); 15 Mar 2017 14:37:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Mar 2017 14:37:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79736DFFD7; Wed, 15 Mar 2017 14:37:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 15 Mar 2017 14:37:20 -0000 Message-Id: <82e57c0e7bb14901a9b77f9bffd874f1@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] ignite git commit: ignite-4768 txs archived-at: Wed, 15 Mar 2017 14:37:23 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 98f1140..13ca26a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -39,7 +39,6 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -91,7 +90,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; import static org.apache.ignite.transactions.TransactionState.ACTIVE; -import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.PREPARED; @@ -192,12 +190,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** */ protected boolean onePhaseCommit; - /** */ - protected CacheWriteSynchronizationMode syncMode; - - /** If this transaction contains transform entries. */ - protected boolean transform; - /** Commit version. */ private volatile GridCacheVersion commitVer; @@ -207,9 +199,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** Done marker. */ protected volatile boolean isDone; - /** Preparing flag (no need for volatile modifier). */ - private boolean preparing; - /** */ @GridToStringInclude private Map> invalidParts; @@ -416,8 +405,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean storeUsed() { - return storeEnabled() && txState().storeUsed(cctx); + @Override public boolean storeWriteThrough() { + return storeEnabled() && txState().storeWriteThrough(cctx); } /** @@ -508,32 +497,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } } - /** {@inheritDoc} */ - @Override public void onRemap(AffinityTopologyVersion topVer) { - assert false : this; - } - - /** {@inheritDoc} */ - @Override public boolean hasTransforms() { - return transform; - } - - /** {@inheritDoc} */ - @Override public boolean markPreparing() { - synchronized (this) { - if (preparing) - return false; - - preparing = true; - - return true; - } - } - /** * @return {@code True} if marked. */ - @Override public boolean markFinalizing(FinalizationStatus status) { + @Override public final boolean markFinalizing(FinalizationStatus status) { boolean res; switch (status) { @@ -625,26 +592,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean replicated() { - return false; - } - - /** {@inheritDoc} */ - @Override public CacheWriteSynchronizationMode syncMode() { - if (syncMode != null) - return syncMode; - - return txState().syncMode(cctx); - } - - /** - * @param syncMode Write synchronization mode. - */ - public void syncMode(CacheWriteSynchronizationMode syncMode) { - this.syncMode = syncMode; - } - - /** {@inheritDoc} */ @Override public IgniteUuid xid() { return xidVer.asGridUuid(); } @@ -897,30 +844,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } } - /** - * - */ - @Override public void close() throws IgniteCheckedException { - TransactionState state = state(); - - if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) - rollback(); - - synchronized (this) { - try { - while (!done()) - wait(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - if (!done()) - throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + - this, e); - } - } - } - /** {@inheritDoc} */ @Override public boolean needsCompletedVersions() { return false; @@ -1176,12 +1099,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean isSystemInvalidate() { + @Override public final boolean isSystemInvalidate() { return sysInvalidate; } /** {@inheritDoc} */ - @Override public void systemInvalidate(boolean sysInvalidate) { + @Override public final void systemInvalidate(boolean sysInvalidate) { this.sysInvalidate = sysInvalidate; } @@ -1950,21 +1873,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public void commit() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public void rollback() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ @Override public boolean activeCachesDeploymentEnabled() { return false; } @@ -1995,7 +1903,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean storeUsed() { + @Override public boolean storeWriteThrough() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } @@ -2029,11 +1937,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public void onRemap(AffinityTopologyVersion topVer) { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ @Override public void commitError(Throwable e) { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } @@ -2044,11 +1947,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean markPreparing() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ @Override public boolean markFinalizing(FinalizationStatus status) { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } @@ -2134,11 +2032,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean replicated() { - return false; - } - - /** {@inheritDoc} */ @Override public UUID subjectId() { return null; } @@ -2154,11 +2047,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public CacheWriteSynchronizationMode syncMode() { - return null; - } - - /** {@inheritDoc} */ @Override public boolean hasWriteKey(IgniteTxKey key) { return false; } @@ -2236,12 +2124,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public void prepare() throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture prepareAsync() { + @Override public IgniteInternalFuture salvageTx() { return null; } @@ -2371,11 +2254,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean hasTransforms() { - return false; - } - - /** {@inheritDoc} */ @Override public boolean equals(Object o) { return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 77387b0..4a1e085 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -255,7 +255,7 @@ public class IgniteTxHandler { req.last()); if (locTx.isRollbackOnly()) - locTx.rollbackAsync(); + locTx.rollbackNearTxLocalAsync(); return fut.chain(new C1, GridNearTxPrepareResponse>() { @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture f) { @@ -491,7 +491,7 @@ public class IgniteTxHandler { if (tx.isRollbackOnly() && !tx.commitOnPrepare()) { if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK) - tx.rollbackAsync(); + tx.rollbackDhtLocalAsync(); } final GridDhtTxLocal tx0 = tx; @@ -849,10 +849,11 @@ public class IgniteTxHandler { assert req.syncMode() != null : req; tx.syncMode(req.syncMode()); + tx.nearFinishFutureId(req.futureId()); + tx.nearFinishMiniId(req.miniId()); + tx.storeEnabled(req.storeEnabled()); if (req.commit()) { - tx.storeEnabled(req.storeEnabled()); - if (!tx.markFinalizing(USER_FINISH)) { if (log.isDebugEnabled()) log.debug("Will not finish transaction (it is handled by another thread): " + tx); @@ -860,10 +861,7 @@ public class IgniteTxHandler { return null; } - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); - - IgniteInternalFuture commitFut = tx.commitAsync(); + IgniteInternalFuture commitFut = tx.commitDhtLocalAsync(); // Only for error logging. commitFut.listen(CU.errorLogger(log)); @@ -871,10 +869,7 @@ public class IgniteTxHandler { return commitFut; } else { - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); - - IgniteInternalFuture rollbackFut = tx.rollbackAsync(); + IgniteInternalFuture rollbackFut = tx.rollbackDhtLocalAsync(); // Only for error logging. rollbackFut.listen(CU.errorLogger(log)); @@ -891,7 +886,7 @@ public class IgniteTxHandler { IgniteInternalFuture res; - IgniteInternalFuture rollbackFut = tx.rollbackAsync(); + IgniteInternalFuture rollbackFut = tx.rollbackDhtLocalAsync(); // Only for error logging. rollbackFut.listen(CU.errorLogger(log)); @@ -932,7 +927,7 @@ public class IgniteTxHandler { throw e; if (tx != null) - return tx.rollbackAsync(); + return tx.rollbackNearTxLocalAsync(); return new GridFinishedFuture<>(e); } @@ -1157,7 +1152,7 @@ public class IgniteTxHandler { if (completeFut != null) { completeFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture igniteTxIgniteFuture) { + @Override public void apply(IgniteInternalFuture fut) { sendReply(nodeId, req, true, nearTxId); } }); @@ -1561,8 +1556,6 @@ public class IgniteTxHandler { assert !F.isEmpty(req.transactionNodes()) : "Received last prepare request with empty transaction nodes: " + req; - tx.transactionNodes(req.transactionNodes()); - tx.state(PREPARED); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index bffb295..9417e1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -154,13 +154,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ - @Override public boolean storeUsed(GridCacheSharedContext cctx) { + @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) { if (cacheCtx == null) return false; CacheStoreManager store = cacheCtx.store(); - return store.configured(); + return store.configured() && store.isWriteThrough(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d457399..dc4e52f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -30,6 +30,7 @@ import javax.cache.expiry.Duration; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -139,6 +140,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @GridToStringInclude protected IgniteTxLocalState txState; + /** */ + protected CacheWriteSynchronizationMode syncMode; + /** * Empty constructor required for {@link Externalizable}. */ @@ -199,6 +203,23 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl(); } + /** + * @return Transaction write synchronization mode. + */ + public final CacheWriteSynchronizationMode syncMode() { + if (syncMode != null) + return syncMode; + + return txState().syncMode(cctx); + } + + /** + * @param syncMode Write synchronization mode. + */ + public void syncMode(CacheWriteSynchronizationMode syncMode) { + this.syncMode = syncMode; + } + /** {@inheritDoc} */ @Override public IgniteTxState txState() { return txState; @@ -410,21 +431,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } } - /** {@inheritDoc} */ - @Override public void commit() throws IgniteCheckedException { - try { - commitAsync().get(); - } - finally { - cctx.tm().resetContext(); - } - } - - /** {@inheritDoc} */ - @Override public void prepare() throws IgniteCheckedException { - prepareAsync().get(); - } - /** * Checks that locks are in proper state for commit. * http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 0cf1d67..307c348 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -17,15 +17,8 @@ package org.apache.ignite.internal.processors.cache.transactions; -import java.util.Collection; -import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.lang.GridInClosure3; import org.jetbrains.annotations.Nullable; /** @@ -59,5 +52,5 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @return {@code True} if state has been changed. * @throws IgniteCheckedException If finish failed. */ - public boolean finish(boolean commit) throws IgniteCheckedException; + public boolean localFinish(boolean commit) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index ff4a4e6..af406fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -82,7 +82,6 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; @@ -127,7 +126,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Slow tx warn timeout (initialized to 0). */ private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0); - /** Tx salvage timeout (default 3s). */ + /** Tx salvage timeout. */ private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100); /** One phase commit deferred ack request timeout. */ @@ -138,9 +137,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE = Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256); - /** Version in which deadlock detection introduced. */ - public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19"); - /** Deadlock detection maximum iterations. */ static int DEADLOCK_MAX_ITERS = IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000); @@ -184,7 +180,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { PER_SEGMENT_Q); /** Pending one phase commit ack requests sender. */ - private GridDeferredAckMessageSender deferredAckMessageSender; + private GridDeferredAckMessageSender deferredAckMsgSnd; /** Transaction finish synchronizer. */ private GridCacheTxFinishSync txFinishSync; @@ -216,7 +212,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { txHnd = new IgniteTxHandler(cctx); - deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) { + deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) { @Override public int getTimeout() { return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; } @@ -256,6 +252,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { UUID nodeId = discoEvt.eventNode().id(); + // Wait some time in case there are some unprocessed messages from failed node. cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId)); if (txFinishSync != null) @@ -305,85 +302,35 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * Invalidates transaction. * * @param tx Transaction. - * @return {@code True} if transaction was salvaged by this call. */ - public boolean salvageTx(IgniteInternalTx tx) { - return salvageTx(tx, false, USER_FINISH); + public void salvageTx(IgniteInternalTx tx) { + salvageTx(tx, USER_FINISH); } /** * Invalidates transaction. * * @param tx Transaction. - * @param warn {@code True} if warning should be logged. * @param status Finalization status. - * @return {@code True} if transaction was salvaged by this call. */ - private boolean salvageTx(IgniteInternalTx tx, boolean warn, IgniteInternalTx.FinalizationStatus status) { + private void salvageTx(IgniteInternalTx tx, IgniteInternalTx.FinalizationStatus status) { assert tx != null; TransactionState state = tx.state(); - if (state == ACTIVE || state == PREPARING || state == PREPARED) { - try { - if (!tx.markFinalizing(status)) { - if (log.isDebugEnabled()) - log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx); - - return false; - } - - tx.systemInvalidate(true); - - tx.prepare(); - - if (tx.state() == PREPARING) { - if (log.isDebugEnabled()) - log.debug("Ignoring transaction in PREPARING state as it is currently handled " + - "by another thread: " + tx); - - return false; - } - - if (tx instanceof IgniteTxRemoteEx) { - IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx; - - rmtTx.doneRemote(tx.xidVersion(), Collections.emptyList(), - Collections.emptyList(), Collections.emptyList()); - } - - tx.commit(); - - if (warn) { - // This print out cannot print any peer-deployed entity either - // directly or indirectly. - U.warn(log, "Invalidated transaction because originating node either " + - "crashed or left grid: " + CU.txString(tx)); - } - } - catch (IgniteCheckedException ignore) { + if (state == ACTIVE || state == PREPARING || state == PREPARED || state == MARKED_ROLLBACK) { + if (!tx.markFinalizing(status)) { if (log.isDebugEnabled()) - log.debug("Optimistic failure while invalidating transaction (will rollback): " + - tx.xidVersion()); + log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx); - try { - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e); - } - } - } - else if (state == MARKED_ROLLBACK) { - try { - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e); + return; } - } - return true; + tx.salvageTx(); + + if (log.isDebugEnabled()) + log.debug("Invalidated transaction because originating node left grid: " + CU.txString(tx)); + } } /** @@ -427,7 +374,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return {@code True} if transaction has been committed or rolled back, * {@code false} otherwise. */ - public boolean isCompleted(IgniteInternalTx tx) { + private boolean isCompleted(IgniteInternalTx tx) { boolean completed = completedVersHashMap.containsKey(tx.xidVersion()); // Need check that for tx with timeout rollback message was not received before lock. @@ -1237,7 +1184,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { unlockMultiple(tx, tx.readEntries()); // 6. Notify evictions. - notifyEvitions(tx); + notifyEvictions(tx); // 7. Remove obsolete entries from cache. removeObsolete(tx); @@ -1310,7 +1257,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { unlockMultiple(tx, tx.readEntries()); // 4. Notify evictions. - notifyEvitions(tx); + notifyEvictions(tx); // 5. Remove obsolete entries. removeObsolete(tx); @@ -1360,7 +1307,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (txIdMap.remove(tx.xidVersion(), tx)) { // 1. Notify evictions. - notifyEvitions(tx); + notifyEvictions(tx); // 2. Evict near entries. if (!tx.readMap().isEmpty()) { @@ -1396,7 +1343,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * * @param tx Tx to uncommit. */ - public void uncommitTx(IgniteInternalTx tx) { + void uncommitTx(IgniteInternalTx tx) { assert tx != null; if (log.isDebugEnabled()) @@ -1413,15 +1360,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { unlockMultiple(tx, tx.readEntries()); // 3. Notify evictions. - notifyEvitions(tx); + notifyEvictions(tx); // 4. Remove from per-thread storage. clearThreadMap(tx); // 5. Unregister explicit locks. - if (!tx.alternateVersions().isEmpty()) + if (!tx.alternateVersions().isEmpty()) { for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); + } // 6. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) @@ -1477,7 +1425,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @param tx Transaction to notify evictions for. */ - private void notifyEvitions(IgniteInternalTx tx) { + private void notifyEvictions(IgniteInternalTx tx) { if (tx.internal()) return; @@ -2056,43 +2004,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return; } - if (supportsDeadlockDetection(node)) { - TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys); + TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys); - try { - if (!cctx.localNodeId().equals(nodeId)) - req.prepareMarshal(cctx); - - cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - if (e instanceof ClusterTopologyCheckedException) { - if (log.isDebugEnabled()) - log.debug("Failed to finish deadlock detection, node left: " + nodeId); - } - else - U.warn(log, "Failed to finish deadlock detection: " + e, e); + try { + if (!cctx.localNodeId().equals(nodeId)) + req.prepareMarshal(cctx); - fut.onDone(); - } + cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL); } - else { - if (log.isDebugEnabled()) - log.debug("Failed to finish deadlock detection, node does not support deadlock detection: " + node); + catch (IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException) { + if (log.isDebugEnabled()) + log.debug("Failed to finish deadlock detection, node left: " + nodeId); + } + else + U.warn(log, "Failed to finish deadlock detection: " + e, e); fut.onDone(); } } /** - * @param node Node. - * @return {@code True} if node supports deadlock detection protocol. - */ - private boolean supportsDeadlockDetection(ClusterNode node) { - return TX_DEADLOCK_DETECTION_SINCE.compareToIgnoreTimestamp(node.version()) <= 0; - } - - /** * @param tx Tx. * @param txKeys Tx keys. * @return {@code True} if key is involved into tx. @@ -2263,7 +2195,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param ver Version to ack. */ public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) { - deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver); + deferredAckMsgSnd.sendDeferredAckMessage(nodeId, ver); } /** @@ -2312,9 +2244,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { ", failedNodeId=" + evtNodeId + ']'); for (final IgniteInternalTx tx : txs()) { - if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) { + if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) { // Invalidate transactions. - salvageTx(tx, false, RECOVERY_FINISH); + salvageTx(tx, RECOVERY_FINISH); } else { // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx. http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java index 1c2ccbe..3c27bad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java @@ -88,7 +88,7 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState } /** {@inheritDoc} */ - @Override public boolean storeUsed(GridCacheSharedContext cctx) { + @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java index c121b1b..822e44e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java @@ -104,7 +104,7 @@ public interface IgniteTxState { * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with * store enabled. */ - public boolean storeUsed(GridCacheSharedContext cctx); + public boolean storeWriteThrough(GridCacheSharedContext cctx); /** * @param cctx Context. http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index 76751de..399eea3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -289,14 +289,14 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ - @Override public boolean storeUsed(GridCacheSharedContext cctx) { + @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) { if (!activeCacheIds.isEmpty()) { for (int i = 0; i < activeCacheIds.size(); i++) { int cacheId = (int)activeCacheIds.get(i); CacheStoreManager store = cctx.cacheContext(cacheId).store(); - if (store.configured()) + if (store.configured() && store.isWriteThrough()) return true; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index c8c9219..0420182 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -395,7 +395,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, seq); - tx.commitTopLevelTx(); + tx.commit(); return seq; } @@ -496,7 +496,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, a); - tx.commitTopLevelTx(); + tx.commit(); return a; } @@ -560,7 +560,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { T dataStructure = c.applyx(); - tx.commitTopLevelTx(); + tx.commit(); return dataStructure; } @@ -641,7 +641,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { T rmvInfo = c.applyx(); - tx.commitTopLevelTx(); + tx.commit(); if (afterRmv != null && rmvInfo != null) afterRmv.applyx(rmvInfo); @@ -709,7 +709,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, ref); - tx.commitTopLevelTx(); + tx.commit(); return ref; } @@ -813,7 +813,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, stmp); - tx.commitTopLevelTx(); + tx.commit(); return stmp; } @@ -1048,7 +1048,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { T col = c.applyx(cacheCtx); - tx.commitTopLevelTx(); + tx.commit(); return col; } @@ -1162,7 +1162,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, latch); - tx.commitTopLevelTx(); + tx.commit(); return latch; } @@ -1211,7 +1211,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsView.remove(key); - tx.commitTopLevelTx(); + tx.commit(); } else tx.setRollbackOnly(); @@ -1283,7 +1283,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, sem0); - tx.commitTopLevelTx(); + tx.commit(); return sem0; } @@ -1329,7 +1329,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsView.remove(key); - tx.commitTopLevelTx(); + tx.commit(); } else tx.setRollbackOnly(); @@ -1401,7 +1401,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, reentrantLock0); - tx.commitTopLevelTx(); + tx.commit(); return reentrantLock0; } @@ -1448,7 +1448,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsView.remove(key); - tx.commitTopLevelTx(); + tx.commit(); } else tx.setRollbackOnly(); @@ -1481,7 +1481,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (val != null) { dsView.remove(key); - tx.commitTopLevelTx(); + tx.commit(); } else tx.setRollbackOnly(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index 9ebea2c..640b72d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -102,7 +102,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext atomicView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -129,7 +129,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext atomicView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -156,7 +156,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext atomicView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -183,7 +183,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext atomicView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -442,7 +442,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext atomicView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -476,7 +476,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext atomicView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -510,7 +510,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext atomicView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -547,7 +547,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext atomicView.getAndPut(key, val); - tx.commitTopLevelTx(); + tx.commit(); } return retVal; http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 51568bc..6911b3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -223,7 +223,7 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef atomicView.put(key, ref); - tx.commitTopLevelTx(); + tx.commit(); return true; } @@ -265,7 +265,7 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef atomicView.getAndPut(key, ref); - tx.commitTopLevelTx(); + tx.commit(); return expVal; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 2572f19..87aae8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -545,7 +545,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc seqView.put(key, seq); - tx.commitTopLevelTx(); + tx.commit(); return curLocVal; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index ec1e766..14f80e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -277,7 +277,7 @@ public final class GridCacheAtomicStampedImpl implements GridCacheAtomicSt atomicView.put(key, stmp); - tx.commitTopLevelTx(); + tx.commit(); return true; } @@ -321,7 +321,7 @@ public final class GridCacheAtomicStampedImpl implements GridCacheAtomicSt atomicView.getAndPut(key, stmp); - tx.commitTopLevelTx(); + tx.commit(); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 03a7fb6..45c3677 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -292,7 +292,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc return new CountDownLatch(0); } - tx.commitTopLevelTx(); + tx.commit(); return new CountDownLatch(val.get()); } @@ -432,7 +432,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc latchView.put(key, latchVal); - tx.commitTopLevelTx(); + tx.commit(); return retVal; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java index a62b656..5f0cb44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java @@ -50,7 +50,6 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -561,7 +560,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable lockView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return true; } @@ -629,7 +628,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable lockView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); // Keep track of all threads that are queued in global queue. // We deliberately don't use #sync.isQueued(), because AQS @@ -647,7 +646,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable lockView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); sync.waitingThreads.remove(thread.getId()); @@ -806,7 +805,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable lockView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); return true; } @@ -1099,7 +1098,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable return null; } - tx.rollbackTopLevelTx(); + tx.rollback(); return new Sync(val); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index c3e9218..a1c0515 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -320,7 +319,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter semView.put(key, val); - tx.commitTopLevelTx(); + tx.commit(); } return retVal; @@ -373,7 +372,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter Map map = val.getWaiters(); if (!map.containsKey(nodeId)) { - tx.rollbackTopLevelTx(); + tx.rollback(); return false; } @@ -391,7 +390,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter sync.nodeMap = map; - tx.commitTopLevelTx(); + tx.commit(); return true; } @@ -472,7 +471,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter final boolean failoverSafe = val.isFailoverSafe(); - tx.commitTopLevelTx(); + tx.commit(); return new Sync(cnt, waiters, failoverSafe); } @@ -687,7 +686,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter int cnt = val.getCount(); - tx.rollbackTopLevelTx(); + tx.rollback(); return cnt; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index 4b2d6cc..846eb69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -70,7 +70,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter else retVal = false; - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -106,7 +106,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter retVal = (T)cache.getAndRemove(itemKey(idx)); if (retVal == null) { // Possible if data was lost. - tx.commitTopLevelTx(); + tx.commit(); continue; } @@ -114,7 +114,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter else retVal = null; - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -164,7 +164,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter else retVal = false; - tx.commitTopLevelTx(); + tx.commit(); return retVal; } @@ -197,7 +197,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter cache.remove(itemKey(idx)); } - tx.commitTopLevelTx(); + tx.commit(); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index dce97c7..acd0a1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -661,7 +661,7 @@ public class IgfsDataManager extends IgfsManager { if (val != null) { putBlock(fileInfo.blockSize(), key, val); - tx.commitTopLevelTx(); + tx.commit(); } else { // File is being concurrently deleted. @@ -1086,7 +1086,7 @@ public class IgfsDataManager extends IgfsManager { "[key=" + colocatedKey + ", relaxedKey=" + key + ", startOff=" + startOff + ", dataLen=" + data.length + ']'); - tx.commitTopLevelTx(); + tx.commit(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index 9ff3d40..77272e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -615,7 +615,7 @@ public class IgfsMetaManager extends IgfsManager { IgfsEntryInfo newInfo = invokeLock(fileId, del); - tx.commitTopLevelTx(); + tx.commit(); return newInfo; } @@ -1039,7 +1039,7 @@ public class IgfsMetaManager extends IgfsManager { transferEntry(srcEntry, srcParentInfo.id(), srcName, dstParentInfo.id(), dstName); - tx.commitTopLevelTx(); + tx.commit(); // Fire events. IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName); @@ -1172,7 +1172,7 @@ public class IgfsMetaManager extends IgfsManager { // Note that root directory properties and other attributes are preserved: id2InfoPrj.put(IgfsUtils.ROOT_ID, rootInfo.listing(null)); - tx.commitTopLevelTx(); + tx.commit(); signalDeleteWorker(); @@ -1310,7 +1310,7 @@ public class IgfsMetaManager extends IgfsManager { transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName); - tx.commitTopLevelTx(); + tx.commit(); signalDeleteWorker(); @@ -1401,7 +1401,7 @@ public class IgfsMetaManager extends IgfsManager { id2InfoPrj.put(parentId, parentInfo.listing(newListing)); } - tx.commitTopLevelTx(); + tx.commit(); return res; } @@ -1454,7 +1454,7 @@ public class IgfsMetaManager extends IgfsManager { id2InfoPrj.remove(id); - tx.commitTopLevelTx(); + tx.commit(); return true; } @@ -1519,7 +1519,7 @@ public class IgfsMetaManager extends IgfsManager { try (GridNearTxLocal tx = startTx()) { IgfsEntryInfo info = updatePropertiesNonTx(fileId, props); - tx.commitTopLevelTx(); + tx.commit(); return info; } @@ -1560,7 +1560,7 @@ public class IgfsMetaManager extends IgfsManager { IgfsEntryInfo newInfo = invokeAndGet(fileId, new IgfsMetaFileReserveSpaceProcessor(space, affRange)); - tx.commitTopLevelTx(); + tx.commit(); return newInfo; } @@ -1616,7 +1616,7 @@ public class IgfsMetaManager extends IgfsManager { throw fsException("Failed to update file info (file types differ)" + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']'); - tx.commitTopLevelTx(); + tx.commit(); return newInfo; } @@ -1679,7 +1679,7 @@ public class IgfsMetaManager extends IgfsManager { continue; // Commit TX. - tx.commitTopLevelTx(); + tx.commit(); generateCreateEvents(res.createdPaths(), false); @@ -1711,7 +1711,7 @@ public class IgfsMetaManager extends IgfsManager { try (GridNearTxLocal tx = startTx()) { Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling); - tx.commitTopLevelTx(); + tx.commit(); return !F.eq(prev, val); } @@ -2756,7 +2756,7 @@ public class IgfsMetaManager extends IgfsManager { } } - tx.commitTopLevelTx(); + tx.commit(); } catch (IgniteCheckedException e) { if (!finished) { @@ -2839,7 +2839,7 @@ public class IgfsMetaManager extends IgfsManager { modificationTime == -1 ? targetInfo.modificationTime() : modificationTime) ); - tx.commitTopLevelTx(); + tx.commit(); return; } @@ -2948,7 +2948,7 @@ public class IgfsMetaManager extends IgfsManager { // At this point we can open the stream safely. info = invokeLock(info.id(), false); - tx.commitTopLevelTx(); + tx.commit(); IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE); @@ -2963,7 +2963,7 @@ public class IgfsMetaManager extends IgfsManager { continue; // Commit. - tx.commitTopLevelTx(); + tx.commit(); // Generate events. generateCreateEvents(res.createdPaths(), true); @@ -3103,7 +3103,7 @@ public class IgfsMetaManager extends IgfsManager { newBlockSize, affKey, newLockId, evictExclude, newLen)); // Prepare result and commit. - tx.commitTopLevelTx(); + tx.commit(); IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE); @@ -3131,7 +3131,7 @@ public class IgfsMetaManager extends IgfsManager { continue; // Commit. - tx.commitTopLevelTx(); + tx.commit(); // Generate events. generateCreateEvents(res.createdPaths(), true); http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 280817c..a680a88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -1042,7 +1042,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { cache.put(key, assigns); - tx.commitTopLevelTx(); + tx.commit(); break; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 53e6add..c4d8a79 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -88,6 +88,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { blockedMsgs.add(new T2<>(node, ioMsg)); + notifyAll(); + return; } } @@ -137,6 +139,33 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** + * @param cls Message class. + * @param nodeName Node name. + * @throws InterruptedException If interrupted. + */ + public void waitForMessage(Class cls, String nodeName) throws InterruptedException { + synchronized (this) { + while (!hasMessage(cls, nodeName)) + wait(); + } + } + + /** + * @param cls Message class. + * @param nodeName Node name. + * @return {@code True} if has blocked message. + */ + private boolean hasMessage(Class cls, String nodeName) { + for (T2 msg : blockedMsgs) { + if (msg.get2().message().getClass() == cls && + nodeName.equals(msg.get1().attribute(ATTR_IGNITE_INSTANCE_NAME))) + return true; + } + + return false; + } + + /** * @param blockP Message block predicate. */ public void blockMessages(IgnitePredicate blockP) { http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java index 84e439f..4fd4989 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java @@ -217,7 +217,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { cache.put("key", "val"); - tx.commitTopLevelTx(); + tx.commit(); } assert cache.containsKey("key"); http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java index f79c3e7..f821a45 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.Map; -import java.util.concurrent.Callable; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; @@ -28,12 +26,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -92,7 +88,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf utilityCache.getAndPut("3", "3"); - itx.commitTopLevelTx(); + itx.commit(); } jcache.put("2", "22"); http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java index 65fa7e0..91e3b26 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java @@ -176,7 +176,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri cache.putAll(map); try { - txEx.prepareTopLevelTx().get(3, TimeUnit.SECONDS); + txEx.prepareNearTxLocal().get(3, TimeUnit.SECONDS); } catch (IgniteFutureTimeoutCheckedException ignored) { info("Failed to wait for prepare future completion: " + partial); http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java index 3c1ae8e..4997b20 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java @@ -42,7 +42,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -349,7 +349,7 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest TransactionProxyImpl txProxy = (TransactionProxyImpl)tx; - IgniteInternalTx txEx = txProxy.tx(); + GridNearTxLocal txEx = txProxy.tx(); assertTrue(txEx.pessimistic()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java index 97385ab..7ca3914 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java @@ -215,7 +215,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2. - IgniteInternalFuture prepFut = txEx.prepareTopLevelTx(); + IgniteInternalFuture prepFut = txEx.prepareNearTxLocal(); waitPrepared(ignite(1)); @@ -376,7 +376,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2. - IgniteInternalFuture prepFut = txEx.prepareTopLevelTx(); + IgniteInternalFuture prepFut = txEx.prepareNearTxLocal(); waitPrepared(ignite(1));