ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [01/17] ignite git commit: debugging slowdowns
Date Mon, 23 Nov 2015 14:02:12 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1282 df859c0a1 -> 3ca58d7b5


http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 76f2fbe..6f92204 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -57,17 +58,14 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 
@@ -109,7 +107,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     private boolean retval;
 
     /** Error. */
-    private AtomicReference<Throwable> err = new AtomicReference<>(null);
+    private volatile Throwable err;
 
     /** Timed out flag. */
     private volatile boolean timedOut;
@@ -129,8 +127,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     private GridNearTxLocal tx;
 
     /** Topology snapshot to operate on. */
-    private AtomicReference<AffinityTopologyVersion> topVer =
-        new AtomicReference<>();
+    private volatile AffinityTopologyVersion topVer;
 
     /** Map of current values. */
     private Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap;
@@ -138,9 +135,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     /** Trackable flag. */
     private boolean trackable = true;
 
-    /** Mutex. */
-    private final Object mux = new Object();
-
     /** Keys locked so far. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     @GridToStringExclude
@@ -152,6 +146,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     /** Skip store flag. */
     private final boolean skipStore;
 
+    /** Mappings to proceed. */
+    @GridToStringExclude
+    private Queue<GridNearLockMapping> mappings;
+
     /**
      * @param cctx Registry.
      * @param keys Keys to lock.
@@ -206,7 +204,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             cctx.time().addTimeoutObject(timeoutObj);
         }
 
-        valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
+        valMap = new ConcurrentHashMap8<>();
     }
 
     /** {@inheritDoc} */
@@ -217,10 +215,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     /**
      * @return Entries.
      */
-    public List<GridDistributedCacheEntry> entriesCopy() {
-        synchronized (mux) {
-            return new ArrayList<>(entries);
-        }
+    public synchronized List<GridDistributedCacheEntry> entriesCopy() {
+        return new ArrayList<>(entries);
     }
 
     /**
@@ -313,6 +309,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         GridNearCacheEntry entry,
         UUID dhtNodeId
     ) throws GridCacheEntryRemovedException {
+        assert Thread.holdsLock(this);
+
         // Check if lock acquisition is timed out.
         if (timedOut)
             return null;
@@ -335,9 +333,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             txEntry.cached(entry);
         }
 
-        synchronized (mux) {
-            entries.add(entry);
-        }
+        entries.add(entry);
 
         if (c == null && timeout < 0) {
             if (log.isDebugEnabled())
@@ -525,7 +521,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      * @param t Error.
      */
     private void onError(Throwable t) {
-        err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? null : t);
+        synchronized (this) {
+            if (err == null)
+                err = t;
+        }
     }
 
     /**
@@ -572,35 +571,39 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      */
     private boolean checkLocks() {
         if (!isDone() && initialized() && !hasPending()) {
-            for (int i = 0; i < entries.size(); i++) {
-                while (true) {
-                    GridCacheEntryEx cached = entries.get(i);
+            synchronized (this) {
+                for (int i = 0; i < entries.size(); i++) {
+                    while (true) {
+                        GridCacheEntryEx cached = entries.get(i);
 
-                    try {
-                        if (!locked(cached)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Lock is still not acquired for entry (will keep waiting) [entry=" +
-                                    cached + ", fut=" + this + ']');
+                        try {
+                            if (!locked(cached)) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Lock is still not acquired for entry (will keep waiting) [entry=" +
+                                        cached + ", fut=" + this + ']');
 
-                            return false;
-                        }
+                                return false;
+                            }
 
-                        break;
-                    }
-                    // Possible in concurrent cases, when owner is changed after locks
-                    // have been released or cancelled.
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached);
+                            break;
+                        }
+                        // Possible in concurrent cases, when owner is changed after locks
+                        // have been released or cancelled.
+                        catch (GridCacheEntryRemovedException ignore) {
+                            if (log.isDebugEnabled())
+                                log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached);
 
-                        // Replace old entry with new one.
-                        entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(cached.key()));
+                            // Replace old entry with new one.
+                            entries.set(
+                                i,
+                                (GridDistributedCacheEntry)cctx.cache().entryEx(cached.key()));
+                        }
                     }
                 }
-            }
 
-            if (log.isDebugEnabled())
-                log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]");
+                if (log.isDebugEnabled())
+                    log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]");
+            }
 
             onComplete(true, true);
 
@@ -627,7 +630,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         if (isDone() || (err == null && success && !checkLocks()))
             return false;
 
-        this.err.compareAndSet(null, err instanceof GridCacheLockTimeoutException ? null : err);
+        if (err != null && !(err instanceof GridCacheLockTimeoutException))
+            onError(err);
 
         if (err != null)
             success = false;
@@ -653,7 +657,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         if (tx != null)
             cctx.tm().txContext(tx);
 
-        if (super.onDone(success, err.get())) {
+        if (super.onDone(success, err)) {
             if (log.isDebugEnabled())
                 log.debug("Completing future: " + this);
 
@@ -730,7 +734,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             }
 
             // Continue mapping on the same topology version as it was before.
-            this.topVer.compareAndSet(null, topVer);
+            if (this.topVer == null)
+                this.topVer = topVer;
 
             map(keys, false, true);
 
@@ -749,7 +754,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      *
      * @param remap Remap flag.
      */
-    void mapOnTopology(final boolean remap) {
+    synchronized void mapOnTopology(final boolean remap) {
         // We must acquire topology snapshot from the topology version future.
         cctx.topology().readLock();
 
@@ -778,13 +783,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                     if (tx != null)
                         tx.onRemap(topVer);
 
-                    this.topVer.set(topVer);
+                    this.topVer = topVer;
                 }
                 else {
                     if (tx != null)
                         tx.topologyVersion(topVer);
 
-                    this.topVer.compareAndSet(null, topVer);
+                    if (this.topVer == null)
+                        this.topVer = topVer;
                 }
 
                 map(keys, remap, false);
@@ -825,7 +831,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      */
     private void map(Iterable<KeyCacheObject> keys, boolean remap, boolean topLocked) {
         try {
-            AffinityTopologyVersion topVer = this.topVer.get();
+            AffinityTopologyVersion topVer = this.topVer;
 
             assert topVer != null;
 
@@ -842,204 +848,227 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
 
             assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
 
-            ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
+            synchronized (this) {
+                mappings = new ArrayDeque<>();
 
-            // Assign keys to primary nodes.
-            GridNearLockMapping map = null;
+                // Assign keys to primary nodes.
+                GridNearLockMapping map = null;
 
-            for (KeyCacheObject key : keys) {
-                GridNearLockMapping updated = map(key, map, topVer);
+                for (KeyCacheObject key : keys) {
+                    GridNearLockMapping updated = map(
+                        key,
+                        map,
+                        topVer);
 
-                // If new mapping was created, add to collection.
-                if (updated != map) {
-                    mappings.add(updated);
+                    // If new mapping was created, add to collection.
+                    if (updated != map) {
+                        mappings.add(updated);
 
-                    if (tx != null && updated.node().isLocal())
-                        tx.nearLocallyMapped(true);
+                        if (tx != null && updated.node().isLocal())
+                            tx.nearLocallyMapped(true);
+                    }
+
+                    map = updated;
                 }
 
-                map = updated;
-            }
+                if (isDone()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Abandoning (re)map because future is done: " + this);
 
-            if (isDone()) {
-                if (log.isDebugEnabled())
-                    log.debug("Abandoning (re)map because future is done: " + this);
+                    return;
+                }
 
-                return;
-            }
+                if (log.isDebugEnabled())
+                    log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
 
-            if (log.isDebugEnabled())
-                log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
+                boolean first = true;
 
-            boolean first = true;
+                // Create mini futures.
+                for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
+                    GridNearLockMapping mapping = iter.next();
 
-            // Create mini futures.
-            for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
-                GridNearLockMapping mapping = iter.next();
+                    ClusterNode node = mapping.node();
+                    Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
 
-                ClusterNode node = mapping.node();
-                Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
+                    assert !mappedKeys.isEmpty();
 
-                assert !mappedKeys.isEmpty();
+                    GridNearLockRequest req = null;
 
-                GridNearLockRequest req = null;
+                    Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
 
-                Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
+                    boolean explicit = false;
 
-                boolean explicit = false;
+                    for (KeyCacheObject key : mappedKeys) {
+                        IgniteTxKey txKey = cctx.txKey(key);
 
-                for (KeyCacheObject key : mappedKeys) {
-                    IgniteTxKey txKey = cctx.txKey(key);
+                        while (true) {
+                            GridNearCacheEntry entry = null;
 
-                    while (true) {
-                        GridNearCacheEntry entry = null;
+                            try {
+                                entry = cctx.near().entryExx(
+                                    key,
+                                    topVer);
 
-                        try {
-                            entry = cctx.near().entryExx(key, topVer);
+                                if (!cctx.isAll(
+                                    entry,
+                                    filter)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Entry being locked did not pass filter (will not lock): " + entry);
 
-                            if (!cctx.isAll(entry, filter)) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Entry being locked did not pass filter (will not lock): " + entry);
+                                    onComplete(
+                                        false,
+                                        false);
 
-                                onComplete(false, false);
+                                    return;
+                                }
 
-                                return;
-                            }
+                                // Removed exception may be thrown here.
+                                GridCacheMvccCandidate cand = addEntry(
+                                    topVer,
+                                    entry,
+                                    node.id());
 
-                            // Removed exception may be thrown here.
-                            GridCacheMvccCandidate cand = addEntry(topVer, entry, node.id());
+                                if (isDone()) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Abandoning (re)map because future is done after addEntry attempt " +
+                                            "[fut=" + this + ", entry=" + entry + ']');
 
-                            if (isDone()) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Abandoning (re)map because future is done after addEntry attempt " +
-                                        "[fut=" + this + ", entry=" + entry + ']');
+                                    return;
+                                }
 
-                                return;
-                            }
+                                if (cand != null) {
+                                    if (tx == null && !cand.reentry())
+                                        cctx.mvcc().addExplicitLock(
+                                            threadId,
+                                            cand,
+                                            topVer);
 
-                            if (cand != null) {
-                                if (tx == null && !cand.reentry())
-                                    cctx.mvcc().addExplicitLock(threadId, cand, topVer);
+                                    IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
 
-                                IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
+                                    if (val == null) {
+                                        GridDhtCacheEntry dhtEntry = dht().peekExx(key);
 
-                                if (val == null) {
-                                    GridDhtCacheEntry dhtEntry = dht().peekExx(key);
+                                        try {
+                                            if (dhtEntry != null)
+                                                val = dhtEntry.versionedValue(topVer);
+                                        }
+                                        catch (GridCacheEntryRemovedException ignored) {
+                                            assert dhtEntry.obsolete() : dhtEntry;
 
-                                    try {
-                                        if (dhtEntry != null)
-                                            val = dhtEntry.versionedValue(topVer);
+                                            if (log.isDebugEnabled())
+                                                log.debug("Got removed exception for DHT entry in map (will ignore): "
+                                                    + dhtEntry);
+                                        }
                                     }
-                                    catch (GridCacheEntryRemovedException ignored) {
-                                        assert dhtEntry.obsolete() : " Got removed exception for non-obsolete entry: "
-                                            + dhtEntry;
 
-                                        if (log.isDebugEnabled())
-                                            log.debug("Got removed exception for DHT entry in map (will ignore): "
-                                                + dhtEntry);
-                                    }
-                                }
+                                    GridCacheVersion dhtVer = null;
 
-                                GridCacheVersion dhtVer = null;
+                                    if (val != null) {
+                                        dhtVer = val.get1();
 
-                                if (val != null) {
-                                    dhtVer = val.get1();
+                                        valMap.put(
+                                            key,
+                                            val);
+                                    }
 
-                                    valMap.put(key, val);
-                                }
+                                    if (!cand.reentry()) {
+                                        if (req == null) {
+                                            boolean clientFirst = false;
 
-                                if (!cand.reentry()) {
-                                    if (req == null) {
-                                        boolean clientFirst = false;
+                                            if (first) {
+                                                clientFirst = clientNode &&
+                                                    !topLocked &&
+                                                    (tx == null || !tx.hasRemoteLocks());
 
-                                        if (first) {
-                                            clientFirst = clientNode &&
-                                                !topLocked &&
-                                                (tx == null || !tx.hasRemoteLocks());
+                                                first = false;
+                                            }
 
-                                            first = false;
+                                            req = new GridNearLockRequest(
+                                                cctx.cacheId(),
+                                                topVer,
+                                                cctx.nodeId(),
+                                                threadId,
+                                                futId,
+                                                lockVer,
+                                                inTx(),
+                                                implicitTx(),
+                                                implicitSingleTx(),
+                                                read,
+                                                retval,
+                                                isolation(),
+                                                isInvalidate(),
+                                                timeout,
+                                                mappedKeys.size(),
+                                                inTx() ? tx.size() : mappedKeys.size(),
+                                                inTx() && tx.syncCommit(),
+                                                inTx() ? tx.subjectId() : null,
+                                                inTx() ? tx.taskNameHash() : 0,
+                                                read ? accessTtl : -1L,
+                                                skipStore,
+                                                clientFirst,
+                                                cctx.deploymentEnabled());
+
+                                            mapping.request(req);
                                         }
 
-                                        req = new GridNearLockRequest(
-                                            cctx.cacheId(),
-                                            topVer,
-                                            cctx.nodeId(),
-                                            threadId,
-                                            futId,
-                                            lockVer,
-                                            inTx(),
-                                            implicitTx(),
-                                            implicitSingleTx(),
-                                            read,
-                                            retval,
-                                            isolation(),
-                                            isInvalidate(),
-                                            timeout,
-                                            mappedKeys.size(),
-                                            inTx() ? tx.size() : mappedKeys.size(),
-                                            inTx() && tx.syncCommit(),
-                                            inTx() ? tx.subjectId() : null,
-                                            inTx() ? tx.taskNameHash() : 0,
-                                            read ? accessTtl : -1L,
-                                            skipStore,
-                                            clientFirst,
-                                            cctx.deploymentEnabled());
-
-                                        mapping.request(req);
-                                    }
+                                        distributedKeys.add(key);
 
-                                    distributedKeys.add(key);
+                                        if (tx != null)
+                                            tx.addKeyMapping(
+                                                txKey,
+                                                mapping.node());
 
-                                    if (tx != null)
-                                        tx.addKeyMapping(txKey, mapping.node());
+                                        req.addKeyBytes(
+                                            key,
+                                            retval && dhtVer == null,
+                                            dhtVer,
+                                            // Include DHT version to match remote DHT entry.
+                                            cctx);
+                                    }
 
-                                    req.addKeyBytes(
-                                        key,
-                                        retval && dhtVer == null,
-                                        dhtVer, // Include DHT version to match remote DHT entry.
-                                        cctx);
+                                    if (cand.reentry())
+                                        explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
                                 }
-
-                                if (cand.reentry())
+                                else
+                                    // Ignore reentries within transactions.
                                     explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
-                            }
-                            else
-                                // Ignore reentries within transactions.
-                                explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
 
-                            if (explicit)
-                                tx.addKeyMapping(txKey, mapping.node());
+                                if (explicit)
+                                    tx.addKeyMapping(
+                                        txKey,
+                                        mapping.node());
 
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignored) {
-                            assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
+                                break;
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
 
-                            if (log.isDebugEnabled())
-                                log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+                                if (log.isDebugEnabled())
+                                    log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+                            }
                         }
-                    }
 
-                    // Mark mapping explicit lock flag.
-                    if (explicit) {
-                        boolean marked = tx != null && tx.markExplicit(node.id());
+                        // Mark mapping explicit lock flag.
+                        if (explicit) {
+                            boolean marked = tx != null && tx.markExplicit(node.id());
 
-                        assert tx == null || marked;
+                            assert tx == null || marked;
+                        }
                     }
-                }
 
-                if (!distributedKeys.isEmpty())
-                    mapping.distributedKeys(distributedKeys);
-                else {
-                    assert mapping.request() == null;
+                    if (!distributedKeys.isEmpty())
+                        mapping.distributedKeys(distributedKeys);
+                    else {
+                        assert mapping.request() == null;
 
-                    iter.remove();
+                        iter.remove();
+                    }
                 }
             }
 
             cctx.mvcc().recheckPendingLocks();
 
-            proceedMapping(mappings);
+            proceedMapping();
         }
         catch (IgniteCheckedException ex) {
             onError(ex);
@@ -1050,13 +1079,16 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to
      * remote primary node.
      *
-     * @param mappings Queue of mappings.
      * @throws IgniteCheckedException If mapping can not be completed.
      */
     @SuppressWarnings("unchecked")
-    private void proceedMapping(final ConcurrentLinkedDeque8<GridNearLockMapping> mappings)
+    private void proceedMapping()
         throws IgniteCheckedException {
-        GridNearLockMapping map = mappings.poll();
+        GridNearLockMapping map;
+
+        synchronized (this) {
+            map = mappings.poll();
+        }
 
         // If there are no more mappings to process, complete the future.
         if (map == null)
@@ -1139,7 +1171,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
 
                                         // Lock is held at this point, so we can set the
                                         // returned value if any.
-                                        entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer.get());
+                                        entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
 
                                         entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
                                             res.rolledbackVersions(), res.pending());
@@ -1181,9 +1213,11 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                             log.debug("Failed to add candidates because entry was " +
                                                 "removed (will renew).");
 
-                                        // Replace old entry with new one.
-                                        entries.set(i, (GridDistributedCacheEntry)
-                                            cctx.cache().entryEx(entry.key()));
+                                        synchronized (GridNearLockFuture.this) {
+                                            // Replace old entry with new one.
+                                            entries.set(i,
+                                                (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+                                        }
                                     }
                                 }
 
@@ -1191,7 +1225,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                             }
 
                             // Proceed and add new future (if any) before completing embedded future.
-                            proceedMapping(mappings);
+                            proceedMapping();
                         }
                         catch (IgniteCheckedException ex) {
                             onError(ex);
@@ -1205,7 +1239,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                 fut));
         }
         else {
-            final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings);
+            final MiniFuture fut = new MiniFuture(node, mappedKeys);
 
             req.miniId(fut.futureId());
 
@@ -1302,7 +1336,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
             "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
 
-        topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+        topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
 
         return topEx;
     }
@@ -1354,23 +1388,19 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         @GridToStringInclude
         private Collection<KeyCacheObject> keys;
 
-        /** Mappings to proceed. */
-        @GridToStringExclude
-        private ConcurrentLinkedDeque8<GridNearLockMapping> mappings;
-
         /** */
-        private AtomicBoolean rcvRes = new AtomicBoolean(false);
+        private boolean rcvRes;
 
         /**
          * @param node Node.
          * @param keys Keys.
-         * @param mappings Mappings to proceed.
          */
-        MiniFuture(ClusterNode node, Collection<KeyCacheObject> keys,
-            ConcurrentLinkedDeque8<GridNearLockMapping> mappings) {
+        MiniFuture(
+            ClusterNode node,
+            Collection<KeyCacheObject> keys
+        ) {
             this.node = node;
             this.keys = keys;
-            this.mappings = mappings;
         }
 
         /**
@@ -1395,197 +1425,194 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         }
 
         /**
-         * @param e Error.
-         */
-        void onResult(Throwable e) {
-            if (rcvRes.compareAndSet(false, true)) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-                // Fail.
-                onDone(e);
-            }
-            else
-                U.warn(log, "Received error after another result has been processed [fut=" + GridNearLockFuture.this +
-                    ", mini=" + this + ']', e);
-        }
-
-        /**
          * @param e Node left exception.
          */
         void onResult(ClusterTopologyCheckedException e) {
             if (isDone())
                 return;
 
-            if (rcvRes.compareAndSet(false, true)) {
-                if (log.isDebugEnabled())
-                    log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+            synchronized (this) {
+                if (!rcvRes)
+                    rcvRes = true;
+                else
+                    return;
+            }
 
-                if (tx != null)
-                    tx.removeMapping(node.id());
+            if (log.isDebugEnabled())
+                log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
 
-                // Primary node left the grid, so fail the future.
-                GridNearLockFuture.this.onDone(newTopologyException(e, node.id()));
+            if (tx != null)
+                tx.removeMapping(node.id());
 
-                onDone(true);
-            }
+            // Primary node left the grid, so fail the future.
+            GridNearLockFuture.this.onDone(newTopologyException(e, node.id()));
+
+            onDone(true);
         }
 
         /**
          * @param res Result callback.
          */
         void onResult(GridNearLockResponse res) {
-            if (rcvRes.compareAndSet(false, true)) {
-                if (res.error() != null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
-                            ", res=" + res + ']');
+            synchronized (this) {
+                if (!rcvRes)
+                    rcvRes = true;
+                else
+                    return;
+            }
 
-                    // Fail.
-                    if (res.error() instanceof GridCacheLockTimeoutException)
-                        onDone(false);
-                    else
-                        onDone(res.error());
+            if (res.error() != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
+                        ", res=" + res + ']');
 
-                    return;
-                }
+                // Fail.
+                if (res.error() instanceof GridCacheLockTimeoutException)
+                    onDone(false);
+                else
+                    onDone(res.error());
 
-                if (res.clientRemapVersion() != null) {
-                    assert cctx.kernalContext().clientNode();
+                return;
+            }
 
-                    IgniteInternalFuture<?> affFut =
-                        cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
+            if (res.clientRemapVersion() != null) {
+                assert cctx.kernalContext().clientNode();
 
-                    if (affFut != null && !affFut.isDone()) {
-                        affFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                            @Override public void apply(IgniteInternalFuture<?> fut) {
-                                try {
-                                    fut.get();
+                IgniteInternalFuture<?> affFut =
+                    cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
 
-                                    remap();
-                                }
-                                catch (IgniteCheckedException e) {
-                                    onDone(e);
-                                }
-                                finally {
-                                    cctx.shared().txContextReset();
-                                }
+                if (affFut != null && !affFut.isDone()) {
+                    affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> fut) {
+                            try {
+                                fut.get();
+
+                                remap();
                             }
-                        });
-                    }
-                    else
-                        remap();
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                            finally {
+                                cctx.shared().txContextReset();
+                            }
+                        }
+                    });
                 }
-                else {
-                    int i = 0;
+                else
+                    remap();
+            }
+            else {
+                int i = 0;
 
-                    AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+                AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer;
 
-                    for (KeyCacheObject k : keys) {
-                        while (true) {
-                            GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+                for (KeyCacheObject k : keys) {
+                    while (true) {
+                        GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
 
-                            try {
-                                if (res.dhtVersion(i) == null) {
-                                    onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
-                                        "(will fail the lock): " + res));
+                        try {
+                            if (res.dhtVersion(i) == null) {
+                                onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+                                    "(will fail the lock): " + res));
 
-                                    return;
-                                }
+                                return;
+                            }
 
-                                IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+                            IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
 
-                                CacheObject oldVal = entry.rawGet();
-                                boolean hasOldVal = false;
-                                CacheObject newVal = res.value(i);
+                            CacheObject oldVal = entry.rawGet();
+                            boolean hasOldVal = false;
+                            CacheObject newVal = res.value(i);
 
-                                boolean readRecordable = false;
+                            boolean readRecordable = false;
 
-                                if (retval) {
-                                    readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+                            if (retval) {
+                                readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
 
-                                    if (readRecordable)
-                                        hasOldVal = entry.hasValue();
-                                }
+                                if (readRecordable)
+                                    hasOldVal = entry.hasValue();
+                            }
 
-                                GridCacheVersion dhtVer = res.dhtVersion(i);
-                                GridCacheVersion mappedVer = res.mappedVersion(i);
+                            GridCacheVersion dhtVer = res.dhtVersion(i);
+                            GridCacheVersion mappedVer = res.mappedVersion(i);
 
-                                if (newVal == null) {
-                                    if (oldValTup != null) {
-                                        if (oldValTup.get1().equals(dhtVer))
-                                            newVal = oldValTup.get2();
+                            if (newVal == null) {
+                                if (oldValTup != null) {
+                                    if (oldValTup.get1().equals(dhtVer))
+                                        newVal = oldValTup.get2();
 
-                                        oldVal = oldValTup.get2();
-                                    }
+                                    oldVal = oldValTup.get2();
                                 }
+                            }
 
-                                // Lock is held at this point, so we can set the
-                                // returned value if any.
-                                entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
+                            // Lock is held at this point, so we can set the
+                            // returned value if any.
+                            entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
 
-                                if (inTx()) {
-                                    tx.hasRemoteLocks(true);
+                            if (inTx()) {
+                                tx.hasRemoteLocks(true);
 
-                                    if (implicitTx() && tx.onePhaseCommit()) {
-                                        boolean pass = res.filterResult(i);
+                                if (implicitTx() && tx.onePhaseCommit()) {
+                                    boolean pass = res.filterResult(i);
 
-                                        tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
-                                    }
+                                    tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
                                 }
+                            }
 
-                                entry.readyNearLock(lockVer,
-                                    mappedVer,
-                                    res.committedVersions(),
-                                    res.rolledbackVersions(),
-                                    res.pending());
-
-                                if (retval) {
-                                    if (readRecordable)
-                                        cctx.events().addEvent(
-                                            entry.partition(),
-                                            entry.key(),
-                                            tx,
-                                            null,
-                                            EVT_CACHE_OBJECT_READ,
-                                            newVal,
-                                            newVal != null,
-                                            oldVal,
-                                            hasOldVal,
-                                            CU.subjectId(tx, cctx.shared()),
-                                            null,
-                                            inTx() ? tx.resolveTaskName() : null);
-
-                                    if (cctx.cache().configuration().isStatisticsEnabled())
-                                        cctx.cache().metrics0().onRead(false);
-                                }
+                            entry.readyNearLock(lockVer,
+                                mappedVer,
+                                res.committedVersions(),
+                                res.rolledbackVersions(),
+                                res.pending());
+
+                            if (retval) {
+                                if (readRecordable)
+                                    cctx.events().addEvent(
+                                        entry.partition(),
+                                        entry.key(),
+                                        tx,
+                                        null,
+                                        EVT_CACHE_OBJECT_READ,
+                                        newVal,
+                                        newVal != null,
+                                        oldVal,
+                                        hasOldVal,
+                                        CU.subjectId(tx, cctx.shared()),
+                                        null,
+                                        inTx() ? tx.resolveTaskName() : null);
+
+                                if (cctx.cache().configuration().isStatisticsEnabled())
+                                    cctx.cache().metrics0().onRead(false);
+                            }
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
 
-                                break; // Inner while loop.
-                            }
-                            catch (GridCacheEntryRemovedException ignored) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to add candidates because entry was removed (will renew).");
+                            break; // Inner while loop.
+                        }
+                        catch (GridCacheEntryRemovedException ignored) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to add candidates because entry was removed (will renew).");
 
+                            synchronized (GridNearLockFuture.this) {
                                 // Replace old entry with new one.
-                                entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+                                entries.set(i,
+                                    (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
                             }
                         }
-
-                        i++;
                     }
 
-                    try {
-                        proceedMapping(mappings);
-                    }
-                    catch (IgniteCheckedException e) {
-                        onDone(e);
-                    }
+                    i++;
+                }
 
-                    onDone(true);
+                try {
+                    proceedMapping();
+                }
+                catch (IgniteCheckedException e) {
+                    onDone(e);
                 }
+
+                onDone(true);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
index b4f689c..6c8e388 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -35,7 +35,7 @@ public class GridNearLockMapping {
 
     /** Collection of mapped keys. */
     @GridToStringInclude
-    private Collection<KeyCacheObject> mappedKeys = new LinkedList<>();
+    private final Collection<KeyCacheObject> mappedKeys = new ArrayList<>();
 
     /** Near lock request. */
     @GridToStringExclude
@@ -115,4 +115,4 @@ public class GridNearLockMapping {
     public String toString() {
         return S.toString(GridNearLockMapping.class, this);
     }
-}
\ No newline at end of file
+}


Mime
View raw message