ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held.
Date Tue, 22 Dec 2015 13:47:29 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 5b2f375d5 -> 7cf8ed26b


ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock
is held.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7cf8ed26
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7cf8ed26
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7cf8ed26

Branch: refs/heads/ignite-1537
Commit: 7cf8ed26b6ba692b7860cd9e258064baf0bd554c
Parents: 5b2f375
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Dec 22 16:47:19 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Dec 22 16:47:19 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |  12 --
 .../processors/cache/GridCacheMvccManager.java  |  51 +++----
 .../dht/atomic/GridDhtAtomicCache.java          |  10 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 137 ++++++++++++-------
 5 files changed, 106 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 0c2f67c..c10ebf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -244,9 +244,6 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** */
     private boolean deferredDel;
 
-    /** */
-    private boolean marshallerCache;
-
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -364,15 +361,6 @@ public class GridCacheContext<K, V> implements Externalizable {
             expiryPlc = null;
 
         itHolder = new CacheWeakQueryIteratorsHolder(log);
-
-        marshallerCache = cacheType == CacheType.MARSHALLER;
-    }
-
-    /**
-     * @return {@code True} if marshaller cache.
-     */
-    public boolean marshallerCache() {
-        return marshallerCache;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 46bd093..dbc6992 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -106,10 +106,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
     private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts
=
         new ConcurrentHashMap8<>();
 
-    /** Pending atomic futures. */
-    private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> sysCacheAtomicFuts
=
-        new ConcurrentHashMap8<>();
-
     /** */
     private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>();
 
@@ -219,28 +215,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
             for (GridCacheFuture<?> fut : activeFutures())
                 fut.onNodeLeft(discoEvt.eventNode().id());
 
-            onNodeLeft(discoEvt.eventNode().id(), sysCacheAtomicFuts);
-
-            onNodeLeft(discoEvt.eventNode().id(), atomicFuts);
-        }
-    };
-
-    /**
-     * @param nodeId Failed node ID.
-     * @param atomicFuts Futures collection.
-     */
-    private void onNodeLeft(UUID nodeId, ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>>
atomicFuts) {
-        for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) {
-            cacheFut.onNodeLeft(nodeId);
+            for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) {
+                cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
-            if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                GridCacheVersion futVer = cacheFut.version();
+                if (cacheFut.isCancelled() || cacheFut.isDone()) {
+                    GridCacheVersion futVer = cacheFut.version();
 
-                if (futVer != null)
-                    atomicFuts.remove(futVer, cacheFut);
+                    if (futVer != null)
+                        atomicFuts.remove(futVer, cacheFut);
+                }
             }
         }
-    }
+    };
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
@@ -364,7 +350,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
         for (GridCacheFuture<?> fut : activeFutures())
             ((GridFutureAdapter)fut).onDone(err);
 
-        for (GridCacheAtomicFuture<?> future : atomicFutures())
+        for (GridCacheAtomicFuture<?> future : atomicFuts.values())
             ((GridFutureAdapter)future).onDone(err);
     }
 
@@ -404,12 +390,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
 
     /**
      * @param futVer Future ID.
-     * @param sysCache If {@code true} uses special futures collection.
      * @param fut Future.
      * @return {@code False} if future was forcibly completed with error.
      */
-    public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?>
fut, boolean sysCache) {
-        IgniteInternalFuture<?> old = sysCache ? sysCacheAtomicFuts.put(futVer, fut)
: atomicFuts.put(futVer, fut);
+    public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?>
fut) {
+        IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
 
         assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut
+ ", old=" + old + ']';
 
@@ -420,27 +405,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
      * @return Collection of pending atomic futures.
      */
     public Collection<GridCacheAtomicFuture<?>> atomicFutures() {
-        return F.concat(false, sysCacheAtomicFuts.values(), atomicFuts.values());
+        return atomicFuts.values();
     }
 
     /**
      * Gets future by given future ID.
      *
      * @param futVer Future ID.
-     * @param sysCache If {@code true} uses special futures collection.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer,
boolean sysCache) {
-        return sysCache ? sysCacheAtomicFuts.get(futVer) : atomicFuts.get(futVer);
+    @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer)
{
+        return atomicFuts.get(futVer);
     }
 
     /**
      * @param futVer Future ID.
-     * @param sysCache If {@code true} uses special futures collection.
      * @return Removed future.
      */
-    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer,
boolean sysCache) {
-        return sysCache ? sysCacheAtomicFuts.remove(futVer) : atomicFuts.remove(futVer);
+    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer)
{
+        return atomicFuts.remove(futVer);
     }
 
     /**
@@ -1003,7 +986,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
 
         res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class);
 
-        for (GridCacheAtomicFuture<?> fut : atomicFutures()) {
+        for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) {
             IgniteInternalFuture<Void> complete = fut.completeFuture(topVer);
 
             if (complete != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 3c8b7d4..6942d87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1350,7 +1350,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         res.returnValue(retVal);
 
                         if (dhtFut != null)
-                            ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut, false);
+                            ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
                     }
                     else
                         // Should remap all keys.
@@ -2622,8 +2622,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
         res.nodeId(ctx.localNodeId());
 
-        GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion(),
-            ctx.marshallerCache());
+        GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
         if (fut != null)
             fut.onResult(nodeId, res);
@@ -2794,8 +2793,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         if (log.isDebugEnabled())
             log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res="
+ res + ']');
 
-        GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().
-            atomicFuture(res.futureVersion(), false);
+        GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
         if (updateFut != null)
             updateFut.onResult(nodeId, res);
@@ -2814,7 +2812,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId
+ ", res=" + res + ']');
 
         for (GridCacheVersion ver : res.futureVersions()) {
-            GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver,
false);
+            GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver);
 
             if (updateFut != null)
                 updateFut.onResult(nodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index e01ffc9..e31af19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -349,7 +349,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
-            cctx.mvcc().removeAtomicFuture(version(), false);
+            cctx.mvcc().removeAtomicFuture(version());
 
             if (err != null) {
                 if (!mappings.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9aa6fa6..e617f43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
@@ -288,7 +287,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             // Cannot remap.
             remapCnt = 1;
 
-            state.map(topVer);
+            state.map(topVer, null);
         }
     }
 
@@ -327,7 +326,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             GridCacheVersion futVer = state.onFutureDone();
 
             if (futVer != null)
-                cctx.mvcc().removeAtomicFuture(futVer, cctx.marshallerCache());
+                cctx.mvcc().removeAtomicFuture(futVer);
 
             return true;
         }
@@ -415,7 +414,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             cache.topology().readUnlock();
         }
 
-        state.map(topVer);
+        state.map(topVer, null);
     }
 
     /**
@@ -721,7 +720,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                         topCompleteFut = null;
 
-                        cctx.mvcc().removeAtomicFuture(futVer, cctx.marshallerCache());
+                        cctx.mvcc().removeAtomicFuture(futVer);
 
                         futVer = null;
                         topVer = AffinityTopologyVersion.ZERO;
@@ -790,7 +789,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                                 try {
                                     AffinityTopologyVersion topVer = fut.get();
 
-                                    map(topVer);
+                                    map(topVer, remapKeys);
                                 }
                                 catch (IgniteCheckedException e) {
                                     onDone(e);
@@ -826,8 +825,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         /**
          * @param topVer Topology version.
+         * @param remapKeys Keys to remap.
          */
-        void map(AffinityTopologyVersion topVer) {
+        void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject>
remapKeys) {
             Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
             if (F.isEmpty(topNodes)) {
@@ -839,68 +839,86 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             Exception err = null;
             GridNearAtomicUpdateRequest singleReq0 = null;
-            Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
+            Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
 
             int size = keys.size();
 
-            synchronized (this) {
-                assert futVer == null : this;
-                assert this.topVer == AffinityTopologyVersion.ZERO : this;
+            GridCacheVersion futVer = cctx.versions().next(topVer);
+
+            if (storeFuture()) {
+                if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this))
{
+                    assert isDone() : GridNearAtomicUpdateFuture.this;
 
-                resCnt = 0;
+                    return;
+                }
+            }
 
-                this.topVer = topVer;
+            GridCacheVersion updVer;
 
-                futVer = cctx.versions().next(topVer);
+            // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+            if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+                updVer = this.updVer;
 
-                if (storeFuture()) {
-                    if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this,
cctx.marshallerCache())) {
-                        assert isDone() : GridNearAtomicUpdateFuture.this;
+                if (updVer == null) {
+                    updVer = cctx.versions().next(topVer);
 
-                        return;
-                    }
+                    if (log.isDebugEnabled())
+                        log.debug("Assigned fast-map version for update on near node: " +
updVer);
                 }
+            }
+            else
+                updVer = null;
 
-                // Assign version on near node in CLOCK ordering mode even if fastMap is
false.
-                if (updVer == null)
-                    updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer)
: null;
+            try {
+                if (size == 1 && !fastMap) {
+                    assert remapKeys == null || remapKeys.size() == 1;
 
-                if (updVer != null && log.isDebugEnabled())
-                    log.debug("Assigned fast-map version for update on near node: " + updVer);
+                    singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+                }
+                else {
+                    Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+                        topVer,
+                        futVer,
+                        updVer,
+                        remapKeys);
+
+                    if (pendingMappings.size() == 1)
+                        singleReq0 = F.firstValue(pendingMappings);
+                    else {
+                        if (syncMode == PRIMARY_SYNC) {
+                            mappings0 = U.newHashMap(pendingMappings.size());
 
-                try {
-                    if (size == 1 && !fastMap) {
-                        assert remapKeys == null || remapKeys.size() == 1;
+                            for (GridNearAtomicUpdateRequest req : pendingMappings.values())
{
+                                if (req.hasPrimary())
+                                    mappings0.put(req.nodeId(), req);
+                            }
+                        }
+                        else
+                            mappings0 = pendingMappings;
 
-                        singleReq0 = singleReq = mapSingleUpdate();
+                        assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
                     }
-                    else {
-                        pendingMappings = mapUpdate(topNodes);
+                }
 
-                        if (pendingMappings.size() == 1)
-                            singleReq0 = singleReq = F.firstValue(pendingMappings);
-                        else {
-                            if (syncMode == PRIMARY_SYNC) {
-                                mappings = U.newHashMap(pendingMappings.size());
+                synchronized (this) {
+                    assert this.futVer == null : this;
+                    assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
-                                for (GridNearAtomicUpdateRequest req : pendingMappings.values())
{
-                                    if (req.hasPrimary())
-                                        mappings.put(req.nodeId(), req);
-                                }
-                            }
-                            else
-                                mappings = new HashMap<>(pendingMappings);
+                    this.topVer = topVer;
+                    this.updVer = updVer;
+                    this.futVer = futVer;
 
-                            assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
-                        }
-                    }
+                    resCnt = 0;
 
-                    remapKeys = null;
-                }
-                catch (Exception e) {
-                    err = e;
+                    singleReq = singleReq0;
+                    mappings = mappings0;
+
+                    this.remapKeys = null;
                 }
             }
+            catch (Exception e) {
+                err = e;
+            }
 
             if (err != null) {
                 onDone(err);
@@ -912,12 +930,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             if (singleReq0 != null)
                 mapSingle(singleReq0.nodeId(), singleReq0);
             else {
-                assert pendingMappings != null;
+                assert mappings0 != null;
 
                 if (size == 0)
                     onDone(new GridCacheReturn(cctx, true, true, null, true));
                 else
-                    doUpdate(pendingMappings);
+                    doUpdate(mappings0);
             }
         }
 
@@ -965,10 +983,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         /**
          * @param topNodes Cache nodes.
+         * @param topVer Topology version.
+         * @param futVer Future version.
+         * @param updVer Update version.
+         * @param remapKeys Keys to remap.
          * @return Mapping.
          * @throws Exception If failed.
          */
-        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode>
topNodes) throws Exception {
+        private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode>
topNodes,
+            AffinityTopologyVersion topVer,
+            GridCacheVersion futVer,
+            @Nullable GridCacheVersion updVer,
+            @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
             Iterator<?> it = null;
 
             if (vals != null)
@@ -1089,10 +1115,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         /**
+         * @param topVer Topology version.
+         * @param futVer Future version.
+         * @param updVer Update version.
          * @return Request.
          * @throws Exception If failed.
          */
-        private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
+        private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+            GridCacheVersion futVer,
+            @Nullable GridCacheVersion updVer) throws Exception {
             Object key = F.first(keys);
 
             Object val;


Mime
View raw message