Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 92CA71852D for ; Mon, 23 Nov 2015 19:07:10 +0000 (UTC) Received: (qmail 42707 invoked by uid 500); 23 Nov 2015 19:07:10 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 42660 invoked by uid 500); 23 Nov 2015 19:07:10 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 42630 invoked by uid 99); 23 Nov 2015 19:07:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Nov 2015 19:07:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 23B72E0946; Mon, 23 Nov 2015 19:07:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 23 Nov 2015 19:07:11 -0000 Message-Id: In-Reply-To: <558777b8adeb41a89dbaf1c6379bf097@git.apache.org> References: <558777b8adeb41a89dbaf1c6379bf097@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/27] ignite git commit: debugging slowdowns debugging slowdowns Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e6d0ffe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e6d0ffe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e6d0ffe Branch: refs/heads/ignite-single-op-get Commit: 5e6d0ffefb7d58cbc21bea651671d4de02abf622 Parents: 8e7e330 Author: Yakov Zhdanov Authored: Fri Nov 20 19:03:40 2015 +0300 Committer: Yakov Zhdanov Committed: Fri Nov 20 19:03:40 2015 +0300 ---------------------------------------------------------------------- .../cache/distributed/dht/GridDhtGetFuture.java | 24 +- .../distributed/dht/GridDhtLockFuture.java | 78 +- .../colocated/GridDhtColocatedLockFuture.java | 618 ++++++++-------- .../distributed/near/GridNearGetFuture.java | 2 - .../distributed/near/GridNearLockFuture.java | 719 ++++++++++--------- .../distributed/near/GridNearLockMapping.java | 6 +- 6 files changed, 748 insertions(+), 699 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 7108da6..6b696b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -37,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; @@ -83,7 +83,7 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture keys; /** Reserved partitions. */ - private Collection parts = new GridLeanSet<>(5); + private Collection parts = new HashSet<>(); /** Future ID. */ private IgniteUuid futId; @@ -98,7 +98,7 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture retries = new GridLeanSet<>(); + private Collection retries; /** Subject ID. */ private UUID subjId; @@ -174,7 +174,7 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture invalidPartitions() { - return retries; + return retries == null ? Collections.emptyList() : retries; } /** @@ -210,8 +210,12 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture keys) { GridDhtFuture fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); - if (!F.isEmpty(fut.invalidPartitions())) + if (!F.isEmpty(fut.invalidPartitions())) { + if (retries == null) + retries = new HashSet<>(); + retries.addAll(fut.invalidPartitions()); + } add(new GridEmbeddedFuture<>( new IgniteBiClosure>() { @@ -229,9 +233,13 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture key : keys.entrySet()) { int part = cctx.affinity().partition(key.getKey()); - if (!retries.contains(part)) { - if (!map(key.getKey(), parts)) + if (retries == null || !retries.contains(part)) { + if (!map(key.getKey(), parts)) { + if (retries == null) + retries = new HashSet<>(); + retries.add(part); + } else mappedKeys.put(key.getKey(), key.getValue()); } @@ -441,4 +449,4 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture cache() { return (GridDhtCacheAdapter)cctx.cache(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index a7978c9..543acb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -51,8 +52,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -63,7 +62,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.NotNull; @@ -123,7 +121,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture private boolean read; /** Error. */ - private AtomicReference err = new AtomicReference<>(null); + private Throwable err; /** Timed out flag. */ private volatile boolean timedOut; @@ -142,19 +140,16 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture private GridDhtTxLocalAdapter tx; /** All replies flag. */ - private AtomicBoolean mapped = new AtomicBoolean(false); + private boolean mapped; /** */ - private Collection invalidParts = new GridLeanSet<>(); + private Collection invalidParts; /** Trackable flag. */ private boolean trackable = true; - /** Mutex. */ - private final Object mux = new Object(); - /** Pending locks. */ - private final Collection pendingLocks = new GridConcurrentHashSet<>(); + private final Collection pendingLocks; /** TTL for read operation. */ private long accessTtl; @@ -231,6 +226,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture futId = IgniteUuid.randomUuid(); entries = new ArrayList<>(cnt); + pendingLocks = U.newHashSet(cnt); if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridDhtLockFuture.class); @@ -244,7 +240,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture /** {@inheritDoc} */ @Override public Collection invalidPartitions() { - return invalidParts; + return invalidParts == null ? Collections.emptyList() : invalidParts; } /** @@ -252,6 +248,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture * @param invalidPart Partition to retry. */ void addInvalidPartition(GridCacheContext cacheCtx, int invalidPart) { + if (invalidParts == null) + invalidParts = new HashSet<>(); + invalidParts.add(invalidPart); // Register invalid partitions with transaction. @@ -287,10 +286,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture /** * @return Entries. */ - public Collection entriesCopy() { - synchronized (mux) { - return new ArrayList<>(entries()); - } + public synchronized Collection entriesCopy() { + return new ArrayList<>(entries()); } /** @@ -403,12 +400,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture return null; } - synchronized (mux) { + synchronized (this) { entries.add(c == null || c.reentry() ? null : entry); - } - if (c != null && !c.reentry()) - pendingLocks.add(entry.key()); + if (c != null && !c.reentry()) + pendingLocks.add(entry.key()); + } // Double check if the future has already timed out. if (timedOut) { @@ -615,19 +612,17 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture } /** - * @param e Error. - */ - public void onError(GridDistributedLockCancelledException e) { - if (err.compareAndSet(null, e)) - onComplete(false); - } - - /** * @param t Error. */ public void onError(Throwable t) { - if (err.compareAndSet(null, t)) - onComplete(false); + synchronized (this) { + if (err != null) + return; + + err = t; + } + + onComplete(false); } /** @@ -667,7 +662,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]"); if (owner != null && owner.version().equals(lockVer)) { - pendingLocks.remove(entry.key()); + synchronized (this) { + pendingLocks.remove(entry.key()); + } if (checkLocks()) map(entries()); @@ -681,7 +678,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture /** * @return {@code True} if locks have been acquired. */ - private boolean checkLocks() { + private synchronized boolean checkLocks() { return pendingLocks.isEmpty(); } @@ -713,7 +710,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture if (isDone() || (err == null && success && !checkLocks())) return false; - this.err.compareAndSet(null, err); + synchronized (this) { + if (this.err == null) + this.err = err; + } return onComplete(success); } @@ -734,10 +734,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture if (tx != null) cctx.tm().txContext(tx); - if (err.get() == null) + if (err == null) loadMissingFromStore(); - if (super.onDone(success, err.get())) { + if (super.onDone(success, err)) { if (log.isDebugEnabled()) log.debug("Completing future: " + this); @@ -778,11 +778,11 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture * @param entries Entries. */ private void map(Iterable entries) { - if (!mapped.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Will not map DHT lock future (other thread is mapping): " + this); + synchronized (this) { + if (mapped) + return; - return; + mapped = true; } try { http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 8245d88..7e17efe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Deque; import java.util.Iterator; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -66,12 +66,10 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; @@ -113,7 +111,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture private boolean retval; /** Error. */ - private AtomicReference err = new AtomicReference<>(null); + private volatile Throwable err; /** Timeout object. */ @GridToStringExclude @@ -130,7 +128,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture private GridNearTxLocal tx; /** Topology snapshot to operate on. */ - private AtomicReference topVer = new AtomicReference<>(); + private volatile AffinityTopologyVersion topVer; /** Map of current values. */ private Map> valMap; @@ -144,6 +142,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** Skip store flag. */ private final boolean skipStore; + /** */ + private Deque mappings; + /** * @param cctx Registry. * @param keys Keys to lock. @@ -196,7 +197,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture cctx.time().addTimeoutObject(timeoutObj); } - valMap = new ConcurrentHashMap8<>(keys.size(), 1f); + valMap = new ConcurrentHashMap8<>(); } /** {@inheritDoc} */ @@ -318,7 +319,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture false, null); - cand.topologyVersion(topVer.get()); + cand.topologyVersion(topVer); } } else { @@ -338,12 +339,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture false, null); - cand.topologyVersion(topVer.get()); + cand.topologyVersion(topVer); } else cand = cand.reenter(); - cctx.mvcc().addExplicitLock(threadId, cand, topVer.get()); + cctx.mvcc().addExplicitLock(threadId, cand, topVer); } return cand; @@ -479,8 +480,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** * @param t Error. */ - private void onError(Throwable t) { - err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? null : t); + private synchronized void onError(Throwable t) { + if (err == null && !(t instanceof GridCacheLockTimeoutException)) + err = t; } /** {@inheritDoc} */ @@ -499,7 +501,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (isDone()) return false; - this.err.compareAndSet(null, err instanceof GridCacheLockTimeoutException ? null : err); + if (err != null) + onError(err); if (err != null) success = false; @@ -525,7 +528,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (tx != null) cctx.tm().txContext(tx); - if (super.onDone(success, err.get())) { + if (super.onDone(success, err)) { if (log.isDebugEnabled()) log.debug("Completing future: " + this); @@ -617,7 +620,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } // Continue mapping on the same topology version as it was before. - this.topVer.compareAndSet(null, topVer); + synchronized (this) { + if (this.topVer == null) + this.topVer = topVer; + } map(keys, false, true); @@ -666,13 +672,18 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (tx != null) tx.onRemap(topVer); - this.topVer.set(topVer); + synchronized (this) { + this.topVer = topVer; + } } else { if (tx != null) tx.topologyVersion(topVer); - this.topVer.compareAndSet(null, topVer); + synchronized (this) { + if (this.topVer == null) + this.topVer = topVer; + } } map(keys, remap, false); @@ -716,242 +727,256 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture */ private void map(Collection keys, boolean remap, boolean topLocked) { try { - AffinityTopologyVersion topVer = this.topVer.get(); + map0( + keys, + remap, + topLocked); + } + catch (IgniteCheckedException ex) { + onDone(false, ex); + } + } - assert topVer != null; + private synchronized void map0( + Collection keys, + boolean remap, + boolean topLocked + ) throws IgniteCheckedException { + AffinityTopologyVersion topVer = this.topVer; - assert topVer.topologyVersion() > 0; + assert topVer != null; - if (CU.affinityNodes(cctx, topVer).isEmpty()) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid): " + cctx.name())); + assert topVer.topologyVersion() > 0; - return; - } + if (CU.affinityNodes(cctx, topVer).isEmpty()) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid): " + cctx.name())); - boolean clientNode = cctx.kernalContext().clientNode(); + return; + } - assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); + boolean clientNode = cctx.kernalContext().clientNode(); - // First assume this node is primary for all keys passed in. - if (!clientNode && mapAsPrimary(keys, topVer)) - return; + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); - Deque mappings = new ConcurrentLinkedDeque8<>(); + // First assume this node is primary for all keys passed in. + if (!clientNode && mapAsPrimary(keys, topVer)) + return; - // Assign keys to primary nodes. - GridNearLockMapping map = null; + mappings = new ArrayDeque<>(); - for (KeyCacheObject key : keys) { - GridNearLockMapping updated = map(key, map, topVer); + // Assign keys to primary nodes. + GridNearLockMapping map = null; - // If new mapping was created, add to collection. - if (updated != map) { - mappings.add(updated); + for (KeyCacheObject key : keys) { + GridNearLockMapping updated = map(key, map, topVer); - if (tx != null && updated.node().isLocal()) - tx.colocatedLocallyMapped(true); - } + // If new mapping was created, add to collection. + if (updated != map) { + mappings.add(updated); - map = updated; + if (tx != null && updated.node().isLocal()) + tx.colocatedLocallyMapped(true); } - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Abandoning (re)map because future is done: " + this); - - return; - } + map = updated; + } + if (isDone()) { if (log.isDebugEnabled()) - log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } - boolean hasRmtNodes = false; + if (log.isDebugEnabled()) + log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); - boolean first = true; + boolean hasRmtNodes = false; - // Create mini futures. - for (Iterator iter = mappings.iterator(); iter.hasNext(); ) { - GridNearLockMapping mapping = iter.next(); + boolean first = true; - ClusterNode node = mapping.node(); - Collection mappedKeys = mapping.mappedKeys(); + // Create mini futures. + for (Iterator iter = mappings.iterator(); iter.hasNext(); ) { + GridNearLockMapping mapping = iter.next(); - boolean loc = node.equals(cctx.localNode()); + ClusterNode node = mapping.node(); + Collection mappedKeys = mapping.mappedKeys(); - assert !mappedKeys.isEmpty(); + boolean loc = node.equals(cctx.localNode()); - GridNearLockRequest req = null; + assert !mappedKeys.isEmpty(); - Collection distributedKeys = new ArrayList<>(mappedKeys.size()); + GridNearLockRequest req = null; - for (KeyCacheObject key : mappedKeys) { - IgniteTxKey txKey = cctx.txKey(key); + Collection distributedKeys = new ArrayList<>(mappedKeys.size()); - GridDistributedCacheEntry entry = null; + for (KeyCacheObject key : mappedKeys) { + IgniteTxKey txKey = cctx.txKey(key); - if (tx != null) { - IgniteTxEntry txEntry = tx.entry(txKey); + GridDistributedCacheEntry entry = null; - if (txEntry != null) { - entry = (GridDistributedCacheEntry)txEntry.cached(); + if (tx != null) { + IgniteTxEntry txEntry = tx.entry(txKey); - if (entry != null && !(loc ^ entry.detached())) { - entry = cctx.colocated().entryExx(key, topVer, true); + if (txEntry != null) { + entry = (GridDistributedCacheEntry)txEntry.cached(); - txEntry.cached(entry); - } + if (entry != null && !(loc ^ entry.detached())) { + entry = cctx.colocated().entryExx(key, topVer, true); + + txEntry.cached(entry); } } + } - boolean explicit; + boolean explicit; - while (true) { - try { - if (entry == null) - entry = cctx.colocated().entryExx(key, topVer, true); + while (true) { + try { + if (entry == null) + entry = cctx.colocated().entryExx(key, topVer, true); - if (!cctx.isAll(entry, filter)) { - if (log.isDebugEnabled()) - log.debug("Entry being locked did not pass filter (will not lock): " + entry); + if (!cctx.isAll(entry, filter)) { + if (log.isDebugEnabled()) + log.debug("Entry being locked did not pass filter (will not lock): " + entry); - onComplete(false, false); + onComplete(false, false); - return; - } + return; + } - assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']'; + assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']'; - GridCacheMvccCandidate cand = addEntry(entry); + GridCacheMvccCandidate cand = addEntry(entry); - // Will either return value from dht cache or null if this is a miss. - IgniteBiTuple val = entry.detached() ? null : - ((GridDhtCacheEntry)entry).versionedValue(topVer); + // Will either return value from dht cache or null if this is a miss. + IgniteBiTuple val = entry.detached() ? null : + ((GridDhtCacheEntry)entry).versionedValue(topVer); - GridCacheVersion dhtVer = null; + GridCacheVersion dhtVer = null; - if (val != null) { - dhtVer = val.get1(); + if (val != null) { + dhtVer = val.get1(); - valMap.put(key, val); - } + valMap.put(key, val); + } - if (cand != null && !cand.reentry()) { - if (req == null) { - boolean clientFirst = false; - - if (first) { - clientFirst = clientNode && - !topLocked && - (tx == null || !tx.hasRemoteLocks()); - - first = false; - } - - req = new GridNearLockRequest( - cctx.cacheId(), - topVer, - cctx.nodeId(), - threadId, - futId, - lockVer, - inTx(), - implicitTx(), - implicitSingleTx(), - read, - retval, - isolation(), - isInvalidate(), - timeout, - mappedKeys.size(), - inTx() ? tx.size() : mappedKeys.size(), - inTx() && tx.syncCommit(), - inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0, - read ? accessTtl : -1L, - skipStore, - clientFirst, - cctx.deploymentEnabled()); - - mapping.request(req); - } + if (cand != null && !cand.reentry()) { + if (req == null) { + boolean clientFirst = false; - distributedKeys.add(key); + if (first) { + clientFirst = clientNode && + !topLocked && + (tx == null || !tx.hasRemoteLocks()); - if (tx != null) - tx.addKeyMapping(txKey, mapping.node()); + first = false; + } - req.addKeyBytes( - key, + req = new GridNearLockRequest( + cctx.cacheId(), + topVer, + cctx.nodeId(), + threadId, + futId, + lockVer, + inTx(), + implicitTx(), + implicitSingleTx(), + read, retval, - dhtVer, // Include DHT version to match remote DHT entry. - cctx); + isolation(), + isInvalidate(), + timeout, + mappedKeys.size(), + inTx() ? tx.size() : mappedKeys.size(), + inTx() && tx.syncCommit(), + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L, + skipStore, + clientFirst, + cctx.deploymentEnabled()); + + mapping.request(req); } - explicit = inTx() && cand == null; + distributedKeys.add(key); - if (explicit) + if (tx != null) tx.addKeyMapping(txKey, mapping.node()); - break; + req.addKeyBytes( + key, + retval, + dhtVer, // Include DHT version to match remote DHT entry. + cctx); } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); - entry = null; - } - } + explicit = inTx() && cand == null; - // Mark mapping explicit lock flag. - if (explicit) { - boolean marked = tx != null && tx.markExplicit(node.id()); + if (explicit) + tx.addKeyMapping(txKey, mapping.node()); - assert tx == null || marked; + break; } - } - - if (inTx() && req != null) - req.hasTransforms(tx.hasTransforms()); + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); - if (!distributedKeys.isEmpty()) { - mapping.distributedKeys(distributedKeys); - - hasRmtNodes |= !mapping.node().isLocal(); + entry = null; + } } - else { - assert mapping.request() == null; - iter.remove(); + // Mark mapping explicit lock flag. + if (explicit) { + boolean marked = tx != null && tx.markExplicit(node.id()); + + assert tx == null || marked; } } - if (hasRmtNodes) { - trackable = true; + if (inTx() && req != null) + req.hasTransforms(tx.hasTransforms()); - if (!remap && !cctx.mvcc().addFuture(this)) - throw new IllegalStateException("Duplicate future ID: " + this); + if (!distributedKeys.isEmpty()) { + mapping.distributedKeys(distributedKeys); + + hasRmtNodes |= !mapping.node().isLocal(); } - else - trackable = false; + else { + assert mapping.request() == null; - proceedMapping(mappings); + iter.remove(); + } } - catch (IgniteCheckedException ex) { - onDone(false, ex); + + if (hasRmtNodes) { + trackable = true; + + if (!remap && !cctx.mvcc().addFuture(this)) + throw new IllegalStateException("Duplicate future ID: " + this); } + else + trackable = false; + + proceedMapping(); } /** * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to * remote primary node. * - * @param mappings Queue of mappings. * @throws IgniteCheckedException If mapping can not be completed. */ - private void proceedMapping(final Deque mappings) + private void proceedMapping() throws IgniteCheckedException { - GridNearLockMapping map = mappings.poll(); + GridNearLockMapping map; + + synchronized (this) { + map = mappings.poll(); + } // If there are no more mappings to process, complete the future. if (map == null) @@ -965,9 +990,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture req.filter(filter, cctx); if (node.isLocal()) - lockLocally(mappedKeys, req.topologyVersion(), mappings); + lockLocally(mappedKeys, req.topologyVersion()); else { - final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings); + final MiniFuture fut = new MiniFuture(node, mappedKeys); req.miniId(fut.futureId()); @@ -1016,15 +1041,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** * Locks given keys directly through dht cache. - * - * @param keys Collection of keys. + * @param keys Collection of keys. * @param topVer Topology version to lock on. - * @param mappings Optional collection of mappings to proceed locking. */ private void lockLocally( final Collection keys, - AffinityTopologyVersion topVer, - @Nullable final Deque mappings + AffinityTopologyVersion topVer ) { if (log.isDebugEnabled()) log.debug("Before locally locking keys : " + keys); @@ -1078,7 +1100,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture try { // Proceed and add new future (if any) before completing embedded future. if (mappings != null) - proceedMapping(mappings); + proceedMapping(); } catch (IgniteCheckedException ex) { onError(ex); @@ -1101,7 +1123,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed. * @throws IgniteCheckedException If key cannot be added to mapping. */ - private boolean mapAsPrimary(Collection keys, AffinityTopologyVersion topVer) throws IgniteCheckedException { + private boolean mapAsPrimary(Collection keys, AffinityTopologyVersion topVer) + throws IgniteCheckedException { // Assign keys to primary nodes. Collection distributedKeys = new ArrayList<>(keys.size()); @@ -1137,7 +1160,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture tx.addKeyMapping(cctx.txKey(key), cctx.localNode()); } - lockLocally(distributedKeys, topVer, null); + lockLocally(distributedKeys, topVer); } return true; @@ -1221,7 +1244,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " + "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); - topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get())); + topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer)); return topEx; } @@ -1275,19 +1298,18 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture private Deque mappings; /** */ - private AtomicBoolean rcvRes = new AtomicBoolean(false); + private boolean rcvRes; /** * @param node Node. * @param keys Keys. - * @param mappings Mappings to proceed. */ - MiniFuture(ClusterNode node, - Collection keys, - Deque mappings) { + MiniFuture( + ClusterNode node, + Collection keys + ) { this.node = node; this.keys = keys; - this.mappings = mappings; } /** @@ -1312,159 +1334,153 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } /** - * @param e Error. - */ - void onResult(Throwable e) { - if (rcvRes.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - // Fail. - onDone(e); - } - else - U.warn(log, "Received error after another result has been processed [fut=" + - GridDhtColocatedLockFuture.this + ", mini=" + this + ']', e); - } - - /** * @param e Node left exception. */ void onResult(ClusterTopologyCheckedException e) { if (isDone()) return; - if (rcvRes.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this); + synchronized (this) { + if (rcvRes) + return; + + rcvRes = true; + } - if (tx != null) - tx.removeMapping(node.id()); + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this); - // Primary node left the grid, so fail the future. - GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id())); + if (tx != null) + tx.removeMapping(node.id()); - onDone(true); - } + // Primary node left the grid, so fail the future. + GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id())); + + onDone(true); } /** * @param res Result callback. */ void onResult(GridNearLockResponse res) { - if (rcvRes.compareAndSet(false, true)) { - if (res.error() != null) { - if (log.isDebugEnabled()) - log.debug("Finishing mini future with an error due to error in response [miniFut=" + this + - ", res=" + res + ']'); - - // Fail. - if (res.error() instanceof GridCacheLockTimeoutException) - onDone(false); - else - onDone(res.error()); - + synchronized (this) { + if (rcvRes) return; - } - if (res.clientRemapVersion() != null) { - assert cctx.kernalContext().clientNode(); + rcvRes = true; + } - IgniteInternalFuture affFut = - cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); + if (res.error() != null) { + if (log.isDebugEnabled()) + log.debug("Finishing mini future with an error due to error in response [miniFut=" + this + + ", res=" + res + ']'); - if (affFut != null && !affFut.isDone()) { - affFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + // Fail. + if (res.error() instanceof GridCacheLockTimeoutException) + onDone(false); + else + onDone(res.error()); - remap(); - } - catch (IgniteCheckedException e) { - onDone(e); - } - finally { - cctx.shared().txContextReset(); - } - } - }); - } - else - remap(); - } - else { - int i = 0; + return; + } - for (KeyCacheObject k : keys) { - IgniteBiTuple oldValTup = valMap.get(k); + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); - CacheObject newVal = res.value(i); + IgniteInternalFuture affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); - GridCacheVersion dhtVer = res.dhtVersion(i); + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + fut.get(); - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + remap(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + finally { + cctx.shared().txContextReset(); } } + }); + } + else + remap(); + } + else { + int i = 0; - if (inTx()) { - IgniteTxEntry txEntry = tx.entry(cctx.txKey(k)); + for (KeyCacheObject k : keys) { + IgniteBiTuple oldValTup = valMap.get(k); - // In colocated cache we must receive responses only for detached entries. - assert txEntry.cached().detached() : txEntry; + CacheObject newVal = res.value(i); - txEntry.markLocked(); + GridCacheVersion dhtVer = res.dhtVersion(i); - GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); + } + } - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + if (inTx()) { + IgniteTxEntry txEntry = tx.entry(cctx.txKey(k)); - return; - } + // In colocated cache we must receive responses only for detached entries. + assert txEntry.cached().detached() : txEntry; - // Set value to detached entry. - entry.resetFromPrimary(newVal, dhtVer); + txEntry.markLocked(); - tx.hasRemoteLocks(true); + GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - } - else - cctx.mvcc().markExplicitOwner(cctx.txKey(k), threadId); - - if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - cctx.events().addEvent(cctx.affinity().partition(k), - k, - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - null, - false, - CU.subjectId(tx, cctx.shared()), - null, - tx == null ? null : tx.resolveTaskName()); + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); + + return; } - i++; - } + // Set value to detached entry. + entry.resetFromPrimary(newVal, dhtVer); - try { - proceedMapping(mappings); + tx.hasRemoteLocks(true); + + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); } - catch (IgniteCheckedException e) { - onDone(e); + else + cctx.mvcc().markExplicitOwner(cctx.txKey(k), threadId); + + if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + cctx.events().addEvent(cctx.affinity().partition(k), + k, + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + null, + false, + CU.subjectId(tx, cctx.shared()), + null, + tx == null ? null : tx.resolveTaskName()); } - onDone(true); + i++; + } + + try { + proceedMapping(); } + catch (IgniteCheckedException e) { + onDone(e); + } + + onDone(true); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index dfaa44e..4a030b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -60,7 +59,6 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable;