Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C947618F72 for ; Sat, 26 Dec 2015 10:30:46 +0000 (UTC) Received: (qmail 88611 invoked by uid 500); 26 Dec 2015 10:30:46 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 88518 invoked by uid 500); 26 Dec 2015 10:30:46 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 88452 invoked by uid 99); 26 Dec 2015 10:30:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Dec 2015 10:30:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A70A7DFC8A; Sat, 26 Dec 2015 10:30:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Sat, 26 Dec 2015 10:30:46 -0000 Message-Id: <36a952e73b054229b34c3c15fad2abdd@git.apache.org> In-Reply-To: <9bcfb43469b24ed9857da8f83fcd59ab@git.apache.org> References: <9bcfb43469b24ed9857da8f83fcd59ab@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] ignite git commit: IGNITE-2265: WIP (2) IGNITE-2265: WIP (2) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47d2fa37 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47d2fa37 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47d2fa37 Branch: refs/heads/ignite-2265 Commit: 47d2fa37916d694ac2df69b5de34f921ea4865af Parents: 79beb7e Author: vozerov-gridgain Authored: Sat Dec 26 13:20:26 2015 +0300 Committer: vozerov-gridgain Committed: Sat Dec 26 13:20:26 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedTxRemoteAdapter.java | 12 +++- .../dht/CacheDistributedGetFutureAdapter.java | 10 ++- .../cache/distributed/dht/GridDhtTxLocal.java | 31 ++++++---- .../distributed/dht/GridDhtTxPrepareFuture.java | 26 +++++--- .../dht/GridPartitionedGetFuture.java | 2 +- .../distributed/near/GridNearGetFuture.java | 2 +- .../near/GridNearOptimisticTxPrepareFuture.java | 5 +- .../cache/distributed/near/GridNearTxLocal.java | 65 ++++++++++++-------- .../cache/transactions/IgniteTxAdapter.java | 24 +++++--- .../cache/transactions/IgniteTxEntry.java | 12 ++-- .../transactions/IgniteTxLocalAdapter.java | 16 +++-- 11 files changed, 130 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 1fd0b2e..8e9d4a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -25,7 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -87,6 +88,10 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** */ private static final long serialVersionUID = 0L; + /** Commit allowed field updater. */ + private static final AtomicIntegerFieldUpdater COMMIT_ALLOWED_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridDistributedTxRemoteAdapter.class, "commitAllowed"); + /** Explicit versions. */ @GridToStringInclude private List explicitVers; @@ -96,8 +101,9 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter private boolean started; /** {@code True} only if all write entries are locked by this transaction. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringInclude - private AtomicBoolean commitAllowed = new AtomicBoolean(false); + private volatile int commitAllowed; /** */ @GridToStringInclude @@ -440,7 +446,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } // Only one thread gets to commit. - if (commitAllowed.compareAndSet(false, true)) { + if (COMMIT_ALLOWED_UPD.compareAndSet(this, 0, 1)) { IgniteCheckedException err = null; Map writeMap = txState.writeMap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index cfbc21b..c43cce9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -21,7 +21,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -50,6 +51,10 @@ public abstract class CacheDistributedGetFutureAdapter extends GridCompoun /** Maximum number of attempts to remap key to the same primary node. */ protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT); + /** Remap count updater. */ + protected static final AtomicIntegerFieldUpdater REMAP_CNT_UPD = + AtomicIntegerFieldUpdater.newUpdater(CacheDistributedGetFutureAdapter.class, "remapCnt"); + /** Context. */ protected final GridCacheContext cctx; @@ -69,7 +74,8 @@ public abstract class CacheDistributedGetFutureAdapter extends GridCompoun protected boolean trackable; /** Remap count. */ - protected AtomicInteger remapCnt = new AtomicInteger(); + @SuppressWarnings("UnusedDeclaration") + protected volatile int remapCnt; /** Subject ID. */ protected UUID subjId; http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index f344d48..e026b4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -22,7 +22,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -83,10 +84,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** Near XID. */ private GridCacheVersion nearXidVer; + /** Future updater. */ + private static final AtomicReferenceFieldUpdater PREP_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridDhtTxLocal.class, GridDhtTxPrepareFuture.class, "prepFut"); + /** Future. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private final AtomicReference prepFut = - new AtomicReference<>(); + private volatile GridDhtTxPrepareFuture prepFut; /** * Empty constructor required for {@link Externalizable}. @@ -306,18 +311,18 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } // For pessimistic mode we don't distribute prepare request. - GridDhtTxPrepareFuture fut = prepFut.get(); + GridDhtTxPrepareFuture fut = prepFut; if (fut == null) { // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture( + if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( cctx, this, nearMiniId, Collections.emptyMap(), true, needReturnValue()))) - return prepFut.get(); + return prepFut; } else // Prepare was called explicitly. @@ -383,20 +388,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa boolean last ) { // In optimistic mode prepare still can be called explicitly from salvageTx. - GridDhtTxPrepareFuture fut = prepFut.get(); + GridDhtTxPrepareFuture fut = prepFut; if (fut == null) { init(); // Future must be created before any exception can be thrown. - if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture( + if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( cctx, this, nearMiniId, verMap, last, needReturnValue()))) { - GridDhtTxPrepareFuture f = prepFut.get(); + GridDhtTxPrepareFuture f = prepFut; assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; @@ -492,7 +497,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa cctx.mvcc().addFuture(fut, fut.futureId()); - GridDhtTxPrepareFuture prep = prepFut.get(); + GridDhtTxPrepareFuture prep = prepFut; if (prep != null) { if (prep.isDone()) { @@ -571,12 +576,12 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) { assert optimistic(); - prepFut.compareAndSet(fut, null); + PREP_FUT_UPD.compareAndSet(this, fut, null); } /** {@inheritDoc} */ @Override public IgniteInternalFuture rollbackAsync() { - GridDhtTxPrepareFuture prepFut = this.prepFut.get(); + GridDhtTxPrepareFuture prepFut = this.prepFut; final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); @@ -687,7 +692,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public IgniteInternalFuture currentPrepareFuture() { - return prepFut.get(); + return prepFut; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index fed5824..d8b2f37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.Duration; @@ -119,6 +119,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture REPLIED_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridDhtTxPrepareFuture.class, "replied"); + + /** Mapped flag updater. */ + private static final AtomicIntegerFieldUpdater MAPPED_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridDhtTxPrepareFuture.class, "mapped"); + /** Logger. */ private static IgniteLogger log; @@ -143,10 +151,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture reads; @@ -577,9 +587,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture>() { @Override public void applyx(IgniteInternalFuture fut) { try { - if (replied.compareAndSet(false, true)) + if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1)) sendPrepareResponse(res); } catch (IgniteCheckedException e) { @@ -676,7 +687,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture extends CacheDistributedGetFutureAda LinkedHashMap keys = mapped.get(node); if (keys != null && keys.containsKey(key)) { - if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { + if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) { onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" + U.toShortString(node) + ", mappings=" + mapped + ']')); http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index a121af9..c547a88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -577,7 +577,7 @@ public final class GridNearGetFuture extends CacheDistributedGetFutureAdap LinkedHashMap keys = mapped.get(affNode); if (keys != null && keys.containsKey(key)) { - if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { + if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) { onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT + " attempts (key got remapped to the same node) " + "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']')); http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 7c72056..de19c95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -632,10 +632,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param m Mapping. * @param mappings Queue of mappings to proceed with. */ - MiniFuture( - GridDistributedTxMapping m, - Queue mappings - ) { + MiniFuture(GridDistributedTxMapping m, Queue mappings) { this.m = m; this.mappings = mappings; } http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 894a305..aa4e929f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; @@ -84,20 +85,35 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** */ private static final long serialVersionUID = 0L; + /** Prepare future updater. */ + private static final AtomicReferenceFieldUpdater PREP_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, IgniteInternalFuture.class, "prepFut"); + + /** Prepare future updater. */ + private static final AtomicReferenceFieldUpdater COMMIT_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "commitFut"); + + /** Rollback future updater. */ + private static final AtomicReferenceFieldUpdater ROLLBACK_FUT_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "rollbackFut"); + /** DHT mappings. */ private IgniteTxMappings mappings; - /** Future. */ + /** Prepare future. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private final AtomicReference> prepFut = new AtomicReference<>(); + private volatile IgniteInternalFuture prepFut; - /** */ + /** Commit future. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private final AtomicReference commitFut = new AtomicReference<>(); + private volatile GridNearTxFinishFuture commitFut; - /** */ + /** Rollback future. */ + @SuppressWarnings("UnusedDeclaration") @GridToStringExclude - private final AtomicReference rollbackFut = new AtomicReference<>(); + private volatile GridNearTxFinishFuture rollbackFut; /** Entries to lock on next step of prepare stage. */ private Collection optimisticLockEntries = Collections.emptyList(); @@ -225,7 +241,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) { - prepFut.compareAndSet(fut, null); + PREP_FUT_UPD.compareAndSet(this, fut, null); } /** {@inheritDoc} */ @@ -630,7 +646,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { - GridCacheMvccFuture fut = (GridCacheMvccFuture)prepFut.get(); + GridCacheMvccFuture fut = (GridCacheMvccFuture)prepFut; return fut != null && fut.onOwnerChanged(entry, owner); } @@ -784,7 +800,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture prepareAsync() { - GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut.get(); + GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut; if (fut == null) { // Future must be created before any exception can be thrown. @@ -796,8 +812,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { else fut = new GridNearPessimisticTxPrepareFuture(cctx, this); - if (!prepFut.compareAndSet(null, fut)) - return prepFut.get(); + if (!PREP_FUT_UPD.compareAndSet(this, null, fut)) + return prepFut; } else // Prepare was called explicitly. @@ -818,18 +834,19 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { prepareAsync(); - GridNearTxFinishFuture fut = commitFut.get(); + GridNearTxFinishFuture fut = commitFut; - if (fut == null && !commitFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, true))) - return commitFut.get(); + if (fut == null && + !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, true))) + return commitFut; cctx.mvcc().addFuture(fut, fut.futureId()); - final IgniteInternalFuture prepareFut = prepFut.get(); + final IgniteInternalFuture prepareFut = prepFut; prepareFut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture f) { - GridNearTxFinishFuture fut0 = commitFut.get(); + GridNearTxFinishFuture fut0 = commitFut; try { // Make sure that here are no exceptions. @@ -860,17 +877,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (log.isDebugEnabled()) log.debug("Rolling back near tx: " + this); - GridNearTxFinishFuture fut = rollbackFut.get(); + GridNearTxFinishFuture fut = rollbackFut; if (fut != null) return fut; - if (!rollbackFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, false))) - return rollbackFut.get(); + if (!ROLLBACK_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, false))) + return rollbackFut; cctx.mvcc().addFuture(fut, fut.futureId()); - IgniteInternalFuture prepFut = this.prepFut.get(); + IgniteInternalFuture prepFut = this.prepFut; if (prepFut == null || prepFut.isDone()) { try { @@ -897,7 +914,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']'); } - GridNearTxFinishFuture fut0 = rollbackFut.get(); + GridNearTxFinishFuture fut0 = rollbackFut; fut0.finish(); } @@ -997,7 +1014,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (pessimistic()) prepareAsync(); - IgniteInternalFuture prep = prepFut.get(); + IgniteInternalFuture prep = prepFut; // Do not create finish future if there are no remote nodes. if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) { @@ -1070,7 +1087,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { cctx.mvcc().addFuture(fut, fut.futureId()); - IgniteInternalFuture prep = prepFut.get(); + IgniteInternalFuture prep = prepFut; if (prep == null || prep.isDone()) { try { @@ -1279,7 +1296,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Nullable @Override public IgniteInternalFuture currentPrepareFuture() { - return prepFut.get(); + return prepFut; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 53f4f56..22e27c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -95,14 +96,17 @@ import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; /** * Managed transaction adapter. */ -public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter - implements IgniteInternalTx, Externalizable { +public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implements IgniteInternalTx, Externalizable { /** */ private static final long serialVersionUID = 0L; /** Static logger to avoid re-creation. */ private static final AtomicReference logRef = new AtomicReference<>(); + /** Finalizing status updater. */ + private static final AtomicReferenceFieldUpdater FINALIZING_UPD = + AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, FinalizationStatus.class, "finalizing"); + /** Logger. */ protected static IgniteLogger log; @@ -191,8 +195,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** Commit version. */ private volatile GridCacheVersion commitVer; - /** */ - private AtomicReference finalizing = new AtomicReference<>(FinalizationStatus.NONE); + /** Finalizing status. */ + private volatile FinalizationStatus finalizing = FinalizationStatus.NONE; /** Done marker. */ protected volatile boolean isDone; @@ -524,23 +528,23 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter switch (status) { case USER_FINISH: - res = finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.USER_FINISH); + res = FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.USER_FINISH); break; case RECOVERY_WAIT: - finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT); + FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT); - FinalizationStatus cur = finalizing.get(); + FinalizationStatus cur = finalizing; res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH; break; case RECOVERY_FINISH: - FinalizationStatus old = finalizing.get(); + FinalizationStatus old = finalizing; - res = old != FinalizationStatus.USER_FINISH && finalizing.compareAndSet(old, status); + res = old != FinalizationStatus.USER_FINISH && FINALIZING_UPD.compareAndSet(this, old, status); break; @@ -565,7 +569,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * @return Finalization status. */ protected FinalizationStatus finalizationStatus() { - return finalizing.get(); + return finalizing; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 2c6c3df..c42bc7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.LinkedList; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -73,6 +73,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Dummy version for any existing entry read in SERIALIZABLE transaction. */ public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1); + /** Prepared flag updater. */ + private static final AtomicIntegerFieldUpdater PREPARED_UPD = + AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared"); + /** Owning transaction. */ @GridToStringExclude @GridDirectTransient @@ -149,9 +153,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private GridCacheContext ctx; /** Prepared flag to prevent multiple candidate add. */ - @SuppressWarnings({"TransientFieldNotInitialized"}) + @SuppressWarnings("UnusedDeclaration") @GridDirectTransient - private AtomicBoolean prepared = new AtomicBoolean(); + private transient volatile int prepared; /** Lock flag for collocated cache. */ @GridDirectTransient @@ -441,7 +445,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @return True if entry was marked prepared by this call. */ boolean markPrepared() { - return prepared.compareAndSet(false, true); + return PREPARED_UPD.compareAndSet(this, 0, 1); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/47d2fa37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index ec996c4..21ff0cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -29,8 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.Cache; import javax.cache.CacheException; @@ -124,11 +123,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig protected static final AtomicReferenceFieldUpdater COMMIT_ERR_UPD = AtomicReferenceFieldUpdater.newUpdater(IgniteTxLocalAdapter.class, Throwable.class, "commitErr"); + /** Done flag updater. */ + protected static final AtomicIntegerFieldUpdater DONE_FLAG_UPD = + AtomicIntegerFieldUpdater.newUpdater(IgniteTxLocalAdapter.class, "doneFlag"); + /** Minimal version encountered (either explicit lock or XID of this transaction). */ protected GridCacheVersion minVer; /** Flag indicating with TM commit happened. */ - protected AtomicBoolean doneFlag = new AtomicBoolean(false); + @SuppressWarnings("UnusedDeclaration") + protected volatile int doneFlag; /** Committed versions, relative to base. */ private Collection committedVers = Collections.emptyList(); @@ -1198,7 +1202,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig // Do not unlock transaction entries if one-phase commit. if (!onePhaseCommit()) { - if (doneFlag.compareAndSet(false, true)) { + if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { // Unlock all locks. cctx.tm().commitTx(this); @@ -1219,7 +1223,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig public void tmFinish(boolean commit) { assert onePhaseCommit(); - if (doneFlag.compareAndSet(false, true)) { + if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { // Unlock all locks. if (commit) cctx.tm().commitTx(this); @@ -1287,7 +1291,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig evictNearEntry(e, false); } - if (doneFlag.compareAndSet(false, true)) { + if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) { try { cctx.tm().rollbackTx(this);