ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [02/27] ignite git commit: debugging slowdowns
Date Mon, 23 Nov 2015 19:07:11 GMT
debugging slowdowns


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e6d0ffe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e6d0ffe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e6d0ffe

Branch: refs/heads/ignite-single-op-get
Commit: 5e6d0ffefb7d58cbc21bea651671d4de02abf622
Parents: 8e7e330
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Fri Nov 20 19:03:40 2015 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Fri Nov 20 19:03:40 2015 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtGetFuture.java |  24 +-
 .../distributed/dht/GridDhtLockFuture.java      |  78 +-
 .../colocated/GridDhtColocatedLockFuture.java   | 618 ++++++++--------
 .../distributed/near/GridNearGetFuture.java     |   2 -
 .../distributed/near/GridNearLockFuture.java    | 719 ++++++++++---------
 .../distributed/near/GridNearLockMapping.java   |   6 +-
 6 files changed, 748 insertions(+), 699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 7108da6..6b696b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -37,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@ -83,7 +83,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private LinkedHashMap<KeyCacheObject, Boolean> keys;
 
     /** Reserved partitions. */
-    private Collection<GridDhtLocalPartition> parts = new GridLeanSet<>(5);
+    private Collection<GridDhtLocalPartition> parts = new HashSet<>();
 
     /** Future ID. */
     private IgniteUuid futId;
@@ -98,7 +98,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private IgniteTxLocalEx tx;
 
     /** Retries because ownership changed. */
-    private Collection<Integer> retries = new GridLeanSet<>();
+    private Collection<Integer> retries;
 
     /** Subject ID. */
     private UUID subjId;
@@ -174,7 +174,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> invalidPartitions() {
-        return retries;
+        return retries == null ? Collections.<Integer>emptyList() : retries;
     }
 
     /**
@@ -210,8 +210,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private void map(final LinkedHashMap<KeyCacheObject, Boolean> keys) {
         GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
 
-        if (!F.isEmpty(fut.invalidPartitions()))
+        if (!F.isEmpty(fut.invalidPartitions())) {
+            if (retries == null)
+                retries = new HashSet<>();
+
             retries.addAll(fut.invalidPartitions());
+        }
 
         add(new GridEmbeddedFuture<>(
             new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() {
@@ -229,9 +233,13 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                     for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
                         int part = cctx.affinity().partition(key.getKey());
 
-                        if (!retries.contains(part)) {
-                            if (!map(key.getKey(), parts))
+                        if (retries == null || !retries.contains(part)) {
+                            if (!map(key.getKey(), parts)) {
+                                if (retries == null)
+                                    retries = new HashSet<>();
+
                                 retries.add(part);
+                            }
                             else
                                 mappedKeys.put(key.getKey(), key.getValue());
                         }
@@ -441,4 +449,4 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private GridDhtCacheAdapter<K, V> cache() {
         return (GridDhtCacheAdapter<K, V>)cctx.cache();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index a7978c9..543acb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -51,8 +52,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -63,7 +62,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.NotNull;
@@ -123,7 +121,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     private boolean read;
 
     /** Error. */
-    private AtomicReference<Throwable> err = new AtomicReference<>(null);
+    private Throwable err;
 
     /** Timed out flag. */
     private volatile boolean timedOut;
@@ -142,19 +140,16 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     private GridDhtTxLocalAdapter tx;
 
     /** All replies flag. */
-    private AtomicBoolean mapped = new AtomicBoolean(false);
+    private boolean mapped;
 
     /** */
-    private Collection<Integer> invalidParts = new GridLeanSet<>();
+    private Collection<Integer> invalidParts;
 
     /** Trackable flag. */
     private boolean trackable = true;
 
-    /** Mutex. */
-    private final Object mux = new Object();
-
     /** Pending locks. */
-    private final Collection<KeyCacheObject> pendingLocks = new GridConcurrentHashSet<>();
+    private final Collection<KeyCacheObject> pendingLocks;
 
     /** TTL for read operation. */
     private long accessTtl;
@@ -231,6 +226,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         futId = IgniteUuid.randomUuid();
 
         entries = new ArrayList<>(cnt);
+        pendingLocks = U.newHashSet(cnt);
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridDhtLockFuture.class);
@@ -244,7 +240,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> invalidPartitions() {
-        return invalidParts;
+        return invalidParts == null ? Collections.<Integer>emptyList() : invalidParts;
     }
 
     /**
@@ -252,6 +248,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      * @param invalidPart Partition to retry.
      */
     void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) {
+        if (invalidParts == null)
+            invalidParts = new HashSet<>();
+
         invalidParts.add(invalidPart);
 
         // Register invalid partitions with transaction.
@@ -287,10 +286,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     /**
      * @return Entries.
      */
-    public Collection<GridDhtCacheEntry> entriesCopy() {
-        synchronized (mux) {
-            return new ArrayList<>(entries());
-        }
+    public synchronized Collection<GridDhtCacheEntry> entriesCopy() {
+        return new ArrayList<>(entries());
     }
 
     /**
@@ -403,12 +400,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             return null;
         }
 
-        synchronized (mux) {
+        synchronized (this) {
             entries.add(c == null || c.reentry() ? null : entry);
-        }
 
-        if (c != null && !c.reentry())
-            pendingLocks.add(entry.key());
+            if (c != null && !c.reentry())
+                pendingLocks.add(entry.key());
+        }
 
         // Double check if the future has already timed out.
         if (timedOut) {
@@ -615,19 +612,17 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     }
 
     /**
-     * @param e Error.
-     */
-    public void onError(GridDistributedLockCancelledException e) {
-        if (err.compareAndSet(null, e))
-            onComplete(false);
-    }
-
-    /**
      * @param t Error.
      */
     public void onError(Throwable t) {
-        if (err.compareAndSet(null, t))
-            onComplete(false);
+        synchronized (this) {
+            if (err != null)
+                return;
+
+            err = t;
+        }
+
+        onComplete(false);
     }
 
     /**
@@ -667,7 +662,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]");
 
         if (owner != null && owner.version().equals(lockVer)) {
-            pendingLocks.remove(entry.key());
+            synchronized (this) {
+                pendingLocks.remove(entry.key());
+            }
 
             if (checkLocks())
                 map(entries());
@@ -681,7 +678,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     /**
      * @return {@code True} if locks have been acquired.
      */
-    private boolean checkLocks() {
+    private synchronized boolean checkLocks() {
         return pendingLocks.isEmpty();
     }
 
@@ -713,7 +710,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         if (isDone() || (err == null && success && !checkLocks()))
             return false;
 
-        this.err.compareAndSet(null, err);
+        synchronized (this) {
+            if (this.err == null)
+                this.err = err;
+        }
 
         return onComplete(success);
     }
@@ -734,10 +734,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         if (tx != null)
             cctx.tm().txContext(tx);
 
-        if (err.get() == null)
+        if (err == null)
             loadMissingFromStore();
 
-        if (super.onDone(success, err.get())) {
+        if (super.onDone(success, err)) {
             if (log.isDebugEnabled())
                 log.debug("Completing future: " + this);
 
@@ -778,11 +778,11 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      * @param entries Entries.
      */
     private void map(Iterable<GridDhtCacheEntry> entries) {
-        if (!mapped.compareAndSet(false, true)) {
-            if (log.isDebugEnabled())
-                log.debug("Will not map DHT lock future (other thread is mapping): " + this);
+        synchronized (this) {
+            if (mapped)
+                return;
 
-            return;
+            mapped = true;
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 8245d88..7e17efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -66,12 +66,10 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 
@@ -113,7 +111,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     private boolean retval;
 
     /** Error. */
-    private AtomicReference<Throwable> err = new AtomicReference<>(null);
+    private volatile Throwable err;
 
     /** Timeout object. */
     @GridToStringExclude
@@ -130,7 +128,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     private GridNearTxLocal tx;
 
     /** Topology snapshot to operate on. */
-    private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>();
+    private volatile AffinityTopologyVersion topVer;
 
     /** Map of current values. */
     private Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap;
@@ -144,6 +142,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     /** Skip store flag. */
     private final boolean skipStore;
 
+    /** */
+    private Deque<GridNearLockMapping> mappings;
+
     /**
      * @param cctx Registry.
      * @param keys Keys to lock.
@@ -196,7 +197,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             cctx.time().addTimeoutObject(timeoutObj);
         }
 
-        valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
+        valMap = new ConcurrentHashMap8<>();
     }
 
     /** {@inheritDoc} */
@@ -318,7 +319,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     false,
                     null);
 
-                cand.topologyVersion(topVer.get());
+                cand.topologyVersion(topVer);
             }
         }
         else {
@@ -338,12 +339,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     false,
                     null);
 
-                cand.topologyVersion(topVer.get());
+                cand.topologyVersion(topVer);
             }
             else
                 cand = cand.reenter();
 
-            cctx.mvcc().addExplicitLock(threadId, cand, topVer.get());
+            cctx.mvcc().addExplicitLock(threadId, cand, topVer);
         }
 
         return cand;
@@ -479,8 +480,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     /**
      * @param t Error.
      */
-    private void onError(Throwable t) {
-        err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? null : t);
+    private synchronized void onError(Throwable t) {
+        if (err == null && !(t instanceof GridCacheLockTimeoutException))
+            err = t;
     }
 
     /** {@inheritDoc} */
@@ -499,7 +501,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         if (isDone())
             return false;
 
-        this.err.compareAndSet(null, err instanceof GridCacheLockTimeoutException ? null : err);
+        if (err != null)
+            onError(err);
 
         if (err != null)
             success = false;
@@ -525,7 +528,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         if (tx != null)
             cctx.tm().txContext(tx);
 
-        if (super.onDone(success, err.get())) {
+        if (super.onDone(success, err)) {
             if (log.isDebugEnabled())
                 log.debug("Completing future: " + this);
 
@@ -617,7 +620,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             }
 
             // Continue mapping on the same topology version as it was before.
-            this.topVer.compareAndSet(null, topVer);
+            synchronized (this) {
+                if (this.topVer == null)
+                    this.topVer = topVer;
+            }
 
             map(keys, false, true);
 
@@ -666,13 +672,18 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     if (tx != null)
                         tx.onRemap(topVer);
 
-                    this.topVer.set(topVer);
+                    synchronized (this) {
+                        this.topVer = topVer;
+                    }
                 }
                 else {
                     if (tx != null)
                         tx.topologyVersion(topVer);
 
-                    this.topVer.compareAndSet(null, topVer);
+                    synchronized (this) {
+                        if (this.topVer == null)
+                            this.topVer = topVer;
+                    }
                 }
 
                 map(keys, remap, false);
@@ -716,242 +727,256 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
      */
     private void map(Collection<KeyCacheObject> keys, boolean remap, boolean topLocked) {
         try {
-            AffinityTopologyVersion topVer = this.topVer.get();
+            map0(
+                keys,
+                remap,
+                topLocked);
+        }
+        catch (IgniteCheckedException ex) {
+            onDone(false, ex);
+        }
+    }
 
-            assert topVer != null;
+    private synchronized void map0(
+        Collection<KeyCacheObject> keys,
+        boolean remap,
+        boolean topLocked
+    ) throws IgniteCheckedException {
+        AffinityTopologyVersion topVer = this.topVer;
 
-            assert topVer.topologyVersion() > 0;
+        assert topVer != null;
 
-            if (CU.affinityNodes(cctx, topVer).isEmpty()) {
-                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                    "(all partition nodes left the grid): " + cctx.name()));
+        assert topVer.topologyVersion() > 0;
 
-                return;
-            }
+        if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                "(all partition nodes left the grid): " + cctx.name()));
 
-            boolean clientNode = cctx.kernalContext().clientNode();
+            return;
+        }
 
-            assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+        boolean clientNode = cctx.kernalContext().clientNode();
 
-            // First assume this node is primary for all keys passed in.
-            if (!clientNode && mapAsPrimary(keys, topVer))
-                return;
+        assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
 
-            Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
+        // First assume this node is primary for all keys passed in.
+        if (!clientNode && mapAsPrimary(keys, topVer))
+            return;
 
-            // Assign keys to primary nodes.
-            GridNearLockMapping map = null;
+        mappings = new ArrayDeque<>();
 
-            for (KeyCacheObject key : keys) {
-                GridNearLockMapping updated = map(key, map, topVer);
+        // Assign keys to primary nodes.
+        GridNearLockMapping map = null;
 
-                // If new mapping was created, add to collection.
-                if (updated != map) {
-                    mappings.add(updated);
+        for (KeyCacheObject key : keys) {
+            GridNearLockMapping updated = map(key, map, topVer);
 
-                    if (tx != null && updated.node().isLocal())
-                        tx.colocatedLocallyMapped(true);
-                }
+            // If new mapping was created, add to collection.
+            if (updated != map) {
+                mappings.add(updated);
 
-                map = updated;
+                if (tx != null && updated.node().isLocal())
+                    tx.colocatedLocallyMapped(true);
             }
 
-            if (isDone()) {
-                if (log.isDebugEnabled())
-                    log.debug("Abandoning (re)map because future is done: " + this);
-
-                return;
-            }
+            map = updated;
+        }
 
+        if (isDone()) {
             if (log.isDebugEnabled())
-                log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
+                log.debug("Abandoning (re)map because future is done: " + this);
+
+            return;
+        }
 
-            boolean hasRmtNodes = false;
+        if (log.isDebugEnabled())
+            log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
 
-            boolean first = true;
+        boolean hasRmtNodes = false;
 
-            // Create mini futures.
-            for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
-                GridNearLockMapping mapping = iter.next();
+        boolean first = true;
 
-                ClusterNode node = mapping.node();
-                Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
+        // Create mini futures.
+        for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
+            GridNearLockMapping mapping = iter.next();
 
-                boolean loc = node.equals(cctx.localNode());
+            ClusterNode node = mapping.node();
+            Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
 
-                assert !mappedKeys.isEmpty();
+            boolean loc = node.equals(cctx.localNode());
 
-                GridNearLockRequest req = null;
+            assert !mappedKeys.isEmpty();
 
-                Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
+            GridNearLockRequest req = null;
 
-                for (KeyCacheObject key : mappedKeys) {
-                    IgniteTxKey txKey = cctx.txKey(key);
+            Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
 
-                    GridDistributedCacheEntry entry = null;
+            for (KeyCacheObject key : mappedKeys) {
+                IgniteTxKey txKey = cctx.txKey(key);
 
-                    if (tx != null) {
-                        IgniteTxEntry txEntry = tx.entry(txKey);
+                GridDistributedCacheEntry entry = null;
 
-                        if (txEntry != null) {
-                            entry = (GridDistributedCacheEntry)txEntry.cached();
+                if (tx != null) {
+                    IgniteTxEntry txEntry = tx.entry(txKey);
 
-                            if (entry != null && !(loc ^ entry.detached())) {
-                                entry = cctx.colocated().entryExx(key, topVer, true);
+                    if (txEntry != null) {
+                        entry = (GridDistributedCacheEntry)txEntry.cached();
 
-                                txEntry.cached(entry);
-                            }
+                        if (entry != null && !(loc ^ entry.detached())) {
+                            entry = cctx.colocated().entryExx(key, topVer, true);
+
+                            txEntry.cached(entry);
                         }
                     }
+                }
 
-                    boolean explicit;
+                boolean explicit;
 
-                    while (true) {
-                        try {
-                            if (entry == null)
-                                entry = cctx.colocated().entryExx(key, topVer, true);
+                while (true) {
+                    try {
+                        if (entry == null)
+                            entry = cctx.colocated().entryExx(key, topVer, true);
 
-                            if (!cctx.isAll(entry, filter)) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Entry being locked did not pass filter (will not lock): " + entry);
+                        if (!cctx.isAll(entry, filter)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Entry being locked did not pass filter (will not lock): " + entry);
 
-                                onComplete(false, false);
+                            onComplete(false, false);
 
-                                return;
-                            }
+                            return;
+                        }
 
-                            assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']';
+                        assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']';
 
-                            GridCacheMvccCandidate cand = addEntry(entry);
+                        GridCacheMvccCandidate cand = addEntry(entry);
 
-                            // Will either return value from dht cache or null if this is a miss.
-                            IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null :
-                                ((GridDhtCacheEntry)entry).versionedValue(topVer);
+                        // Will either return value from dht cache or null if this is a miss.
+                        IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null :
+                            ((GridDhtCacheEntry)entry).versionedValue(topVer);
 
-                            GridCacheVersion dhtVer = null;
+                        GridCacheVersion dhtVer = null;
 
-                            if (val != null) {
-                                dhtVer = val.get1();
+                        if (val != null) {
+                            dhtVer = val.get1();
 
-                                valMap.put(key, val);
-                            }
+                            valMap.put(key, val);
+                        }
 
-                            if (cand != null && !cand.reentry()) {
-                                if (req == null) {
-                                    boolean clientFirst = false;
-
-                                    if (first) {
-                                        clientFirst = clientNode &&
-                                            !topLocked &&
-                                            (tx == null || !tx.hasRemoteLocks());
-
-                                        first = false;
-                                    }
-
-                                    req = new GridNearLockRequest(
-                                        cctx.cacheId(),
-                                        topVer,
-                                        cctx.nodeId(),
-                                        threadId,
-                                        futId,
-                                        lockVer,
-                                        inTx(),
-                                        implicitTx(),
-                                        implicitSingleTx(),
-                                        read,
-                                        retval,
-                                        isolation(),
-                                        isInvalidate(),
-                                        timeout,
-                                        mappedKeys.size(),
-                                        inTx() ? tx.size() : mappedKeys.size(),
-                                        inTx() && tx.syncCommit(),
-                                        inTx() ? tx.subjectId() : null,
-                                        inTx() ? tx.taskNameHash() : 0,
-                                        read ? accessTtl : -1L,
-                                        skipStore,
-                                        clientFirst,
-                                        cctx.deploymentEnabled());
-
-                                    mapping.request(req);
-                                }
+                        if (cand != null && !cand.reentry()) {
+                            if (req == null) {
+                                boolean clientFirst = false;
 
-                                distributedKeys.add(key);
+                                if (first) {
+                                    clientFirst = clientNode &&
+                                        !topLocked &&
+                                        (tx == null || !tx.hasRemoteLocks());
 
-                                if (tx != null)
-                                    tx.addKeyMapping(txKey, mapping.node());
+                                    first = false;
+                                }
 
-                                req.addKeyBytes(
-                                    key,
+                                req = new GridNearLockRequest(
+                                    cctx.cacheId(),
+                                    topVer,
+                                    cctx.nodeId(),
+                                    threadId,
+                                    futId,
+                                    lockVer,
+                                    inTx(),
+                                    implicitTx(),
+                                    implicitSingleTx(),
+                                    read,
                                     retval,
-                                    dhtVer, // Include DHT version to match remote DHT entry.
-                                    cctx);
+                                    isolation(),
+                                    isInvalidate(),
+                                    timeout,
+                                    mappedKeys.size(),
+                                    inTx() ? tx.size() : mappedKeys.size(),
+                                    inTx() && tx.syncCommit(),
+                                    inTx() ? tx.subjectId() : null,
+                                    inTx() ? tx.taskNameHash() : 0,
+                                    read ? accessTtl : -1L,
+                                    skipStore,
+                                    clientFirst,
+                                    cctx.deploymentEnabled());
+
+                                mapping.request(req);
                             }
 
-                            explicit = inTx() && cand == null;
+                            distributedKeys.add(key);
 
-                            if (explicit)
+                            if (tx != null)
                                 tx.addKeyMapping(txKey, mapping.node());
 
-                            break;
+                            req.addKeyBytes(
+                                key,
+                                retval,
+                                dhtVer, // Include DHT version to match remote DHT entry.
+                                cctx);
                         }
-                        catch (GridCacheEntryRemovedException ignored) {
-                            if (log.isDebugEnabled())
-                                log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
 
-                            entry = null;
-                        }
-                    }
+                        explicit = inTx() && cand == null;
 
-                    // Mark mapping explicit lock flag.
-                    if (explicit) {
-                        boolean marked = tx != null && tx.markExplicit(node.id());
+                        if (explicit)
+                            tx.addKeyMapping(txKey, mapping.node());
 
-                        assert tx == null || marked;
+                        break;
                     }
-                }
-
-                if (inTx() && req != null)
-                    req.hasTransforms(tx.hasTransforms());
+                    catch (GridCacheEntryRemovedException ignored) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
 
-                if (!distributedKeys.isEmpty()) {
-                    mapping.distributedKeys(distributedKeys);
-
-                    hasRmtNodes |= !mapping.node().isLocal();
+                        entry = null;
+                    }
                 }
-                else {
-                    assert mapping.request() == null;
 
-                    iter.remove();
+                // Mark mapping explicit lock flag.
+                if (explicit) {
+                    boolean marked = tx != null && tx.markExplicit(node.id());
+
+                    assert tx == null || marked;
                 }
             }
 
-            if (hasRmtNodes) {
-                trackable = true;
+            if (inTx() && req != null)
+                req.hasTransforms(tx.hasTransforms());
 
-                if (!remap && !cctx.mvcc().addFuture(this))
-                    throw new IllegalStateException("Duplicate future ID: " + this);
+            if (!distributedKeys.isEmpty()) {
+                mapping.distributedKeys(distributedKeys);
+
+                hasRmtNodes |= !mapping.node().isLocal();
             }
-            else
-                trackable = false;
+            else {
+                assert mapping.request() == null;
 
-            proceedMapping(mappings);
+                iter.remove();
+            }
         }
-        catch (IgniteCheckedException ex) {
-            onDone(false, ex);
+
+        if (hasRmtNodes) {
+            trackable = true;
+
+            if (!remap && !cctx.mvcc().addFuture(this))
+                throw new IllegalStateException("Duplicate future ID: " + this);
         }
+        else
+            trackable = false;
+
+        proceedMapping();
     }
 
     /**
      * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to
      * remote primary node.
      *
-     * @param mappings Queue of mappings.
      * @throws IgniteCheckedException If mapping can not be completed.
      */
-    private void proceedMapping(final Deque<GridNearLockMapping> mappings)
+    private void proceedMapping()
         throws IgniteCheckedException {
-        GridNearLockMapping map = mappings.poll();
+        GridNearLockMapping map;
+
+        synchronized (this) {
+            map = mappings.poll();
+        }
 
         // If there are no more mappings to process, complete the future.
         if (map == null)
@@ -965,9 +990,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             req.filter(filter, cctx);
 
         if (node.isLocal())
-            lockLocally(mappedKeys, req.topologyVersion(), mappings);
+            lockLocally(mappedKeys, req.topologyVersion());
         else {
-            final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings);
+            final MiniFuture fut = new MiniFuture(node, mappedKeys);
 
             req.miniId(fut.futureId());
 
@@ -1016,15 +1041,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
 
     /**
      * Locks given keys directly through dht cache.
-     *
-     * @param keys Collection of keys.
+     *  @param keys Collection of keys.
      * @param topVer Topology version to lock on.
-     * @param mappings Optional collection of mappings to proceed locking.
      */
     private void lockLocally(
         final Collection<KeyCacheObject> keys,
-        AffinityTopologyVersion topVer,
-        @Nullable final Deque<GridNearLockMapping> mappings
+        AffinityTopologyVersion topVer
     ) {
         if (log.isDebugEnabled())
             log.debug("Before locally locking keys : " + keys);
@@ -1078,7 +1100,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     try {
                         // Proceed and add new future (if any) before completing embedded future.
                         if (mappings != null)
-                            proceedMapping(mappings);
+                            proceedMapping();
                     }
                     catch (IgniteCheckedException ex) {
                         onError(ex);
@@ -1101,7 +1123,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
      * @return {@code True} if all keys were mapped locally, {@code false} if full mapping should be performed.
      * @throws IgniteCheckedException If key cannot be added to mapping.
      */
-    private boolean mapAsPrimary(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) throws IgniteCheckedException {
+    private boolean mapAsPrimary(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
         // Assign keys to primary nodes.
         Collection<KeyCacheObject> distributedKeys = new ArrayList<>(keys.size());
 
@@ -1137,7 +1160,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                     tx.addKeyMapping(cctx.txKey(key), cctx.localNode());
             }
 
-            lockLocally(distributedKeys, topVer, null);
+            lockLocally(distributedKeys, topVer);
         }
 
         return true;
@@ -1221,7 +1244,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
             "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
 
-        topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+        topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
 
         return topEx;
     }
@@ -1275,19 +1298,18 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         private Deque<GridNearLockMapping> mappings;
 
         /** */
-        private AtomicBoolean rcvRes = new AtomicBoolean(false);
+        private boolean rcvRes;
 
         /**
          * @param node Node.
          * @param keys Keys.
-         * @param mappings Mappings to proceed.
          */
-        MiniFuture(ClusterNode node,
-            Collection<KeyCacheObject> keys,
-            Deque<GridNearLockMapping> mappings) {
+        MiniFuture(
+            ClusterNode node,
+            Collection<KeyCacheObject> keys
+        ) {
             this.node = node;
             this.keys = keys;
-            this.mappings = mappings;
         }
 
         /**
@@ -1312,159 +1334,153 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         }
 
         /**
-         * @param e Error.
-         */
-        void onResult(Throwable e) {
-            if (rcvRes.compareAndSet(false, true)) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-                // Fail.
-                onDone(e);
-            }
-            else
-                U.warn(log, "Received error after another result has been processed [fut=" +
-                    GridDhtColocatedLockFuture.this + ", mini=" + this + ']', e);
-        }
-
-        /**
          * @param e Node left exception.
          */
         void onResult(ClusterTopologyCheckedException e) {
             if (isDone())
                 return;
 
-            if (rcvRes.compareAndSet(false, true)) {
-                if (log.isDebugEnabled())
-                    log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+            synchronized (this) {
+                if (rcvRes)
+                    return;
+
+                rcvRes = true;
+            }
 
-                if (tx != null)
-                    tx.removeMapping(node.id());
+            if (log.isDebugEnabled())
+                log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
 
-                // Primary node left the grid, so fail the future.
-                GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id()));
+            if (tx != null)
+                tx.removeMapping(node.id());
 
-                onDone(true);
-            }
+            // Primary node left the grid, so fail the future.
+            GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id()));
+
+            onDone(true);
         }
 
         /**
          * @param res Result callback.
          */
         void onResult(GridNearLockResponse res) {
-            if (rcvRes.compareAndSet(false, true)) {
-                if (res.error() != null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
-                            ", res=" + res + ']');
-
-                    // Fail.
-                    if (res.error() instanceof GridCacheLockTimeoutException)
-                        onDone(false);
-                    else
-                        onDone(res.error());
-
+            synchronized (this) {
+                if (rcvRes)
                     return;
-                }
 
-                if (res.clientRemapVersion() != null) {
-                    assert cctx.kernalContext().clientNode();
+                rcvRes = true;
+            }
 
-                    IgniteInternalFuture<?> affFut =
-                        cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+            if (res.error() != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
+                        ", res=" + res + ']');
 
-                    if (affFut != null && !affFut.isDone()) {
-                        affFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                            @Override public void apply(IgniteInternalFuture<?> fut) {
-                                try {
-                                    fut.get();
+                // Fail.
+                if (res.error() instanceof GridCacheLockTimeoutException)
+                    onDone(false);
+                else
+                    onDone(res.error());
 
-                                    remap();
-                                }
-                                catch (IgniteCheckedException e) {
-                                    onDone(e);
-                                }
-                                finally {
-                                    cctx.shared().txContextReset();
-                                }
-                            }
-                        });
-                    }
-                    else
-                        remap();
-                }
-                else  {
-                    int i = 0;
+                return;
+            }
 
-                    for (KeyCacheObject k : keys) {
-                        IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
+            if (res.clientRemapVersion() != null) {
+                assert cctx.kernalContext().clientNode();
 
-                        CacheObject newVal = res.value(i);
+                IgniteInternalFuture<?> affFut =
+                    cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
 
-                        GridCacheVersion dhtVer = res.dhtVersion(i);
+                if (affFut != null && !affFut.isDone()) {
+                    affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> fut) {
+                            try {
+                                fut.get();
 
-                        if (newVal == null) {
-                            if (oldValTup != null) {
-                                if (oldValTup.get1().equals(dhtVer))
-                                    newVal = oldValTup.get2();
+                                remap();
+                            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                            finally {
+                                cctx.shared().txContextReset();
                             }
                         }
+                    });
+                }
+                else
+                    remap();
+            }
+            else  {
+                int i = 0;
 
-                        if (inTx()) {
-                            IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
+                for (KeyCacheObject k : keys) {
+                    IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
 
-                            // In colocated cache we must receive responses only for detached entries.
-                            assert txEntry.cached().detached() : txEntry;
+                    CacheObject newVal = res.value(i);
 
-                            txEntry.markLocked();
+                    GridCacheVersion dhtVer = res.dhtVersion(i);
 
-                            GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
+                    if (newVal == null) {
+                        if (oldValTup != null) {
+                            if (oldValTup.get1().equals(dhtVer))
+                                newVal = oldValTup.get2();
+                        }
+                    }
 
-                            if (res.dhtVersion(i) == null) {
-                                onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
-                                    "(will fail the lock): " + res));
+                    if (inTx()) {
+                        IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
 
-                                return;
-                            }
+                        // In colocated cache we must receive responses only for detached entries.
+                        assert txEntry.cached().detached() : txEntry;
 
-                            // Set value to detached entry.
-                            entry.resetFromPrimary(newVal, dhtVer);
+                        txEntry.markLocked();
 
-                            tx.hasRemoteLocks(true);
+                        GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
 
-                            if (log.isDebugEnabled())
-                                log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
-                        }
-                        else
-                            cctx.mvcc().markExplicitOwner(cctx.txKey(k), threadId);
-
-                        if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                            cctx.events().addEvent(cctx.affinity().partition(k),
-                                k,
-                                tx,
-                                null,
-                                EVT_CACHE_OBJECT_READ,
-                                newVal,
-                                newVal != null,
-                                null,
-                                false,
-                                CU.subjectId(tx, cctx.shared()),
-                                null,
-                                tx == null ? null : tx.resolveTaskName());
+                        if (res.dhtVersion(i) == null) {
+                            onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+                                "(will fail the lock): " + res));
+
+                            return;
                         }
 
-                        i++;
-                    }
+                        // Set value to detached entry.
+                        entry.resetFromPrimary(newVal, dhtVer);
 
-                    try {
-                        proceedMapping(mappings);
+                        tx.hasRemoteLocks(true);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
                     }
-                    catch (IgniteCheckedException e) {
-                        onDone(e);
+                    else
+                        cctx.mvcc().markExplicitOwner(cctx.txKey(k), threadId);
+
+                    if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                        cctx.events().addEvent(cctx.affinity().partition(k),
+                            k,
+                            tx,
+                            null,
+                            EVT_CACHE_OBJECT_READ,
+                            newVal,
+                            newVal != null,
+                            null,
+                            false,
+                            CU.subjectId(tx, cctx.shared()),
+                            null,
+                            tx == null ? null : tx.resolveTaskName());
                     }
 
-                    onDone(true);
+                    i++;
+                }
+
+                try {
+                    proceedMapping();
                 }
+                catch (IgniteCheckedException e) {
+                    onDone(e);
+                }
+
+                onDone(true);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index dfaa44e..4a030b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -60,7 +59,6 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 


Mime
View raw message