ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held.
Date Mon, 21 Dec 2015 12:24:29 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 111e44743 -> 1e9d9cc17


ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock
is held.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e9d9cc1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e9d9cc1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e9d9cc1

Branch: refs/heads/ignite-1537
Commit: 1e9d9cc1785afc9050c2af936a0d165f7dbafd73
Parents: 111e447
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Dec 21 15:24:20 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Dec 21 15:24:20 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      | 12 +++++
 .../processors/cache/GridCacheMvccManager.java  | 51 +++++++++++++-------
 .../dht/atomic/GridDhtAtomicCache.java          |  9 ++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 12 +++--
 5 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index c10ebf3..0c2f67c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -244,6 +244,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** */
     private boolean deferredDel;
 
+    /** */
+    private boolean marshallerCache;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -361,6 +364,15 @@ public class GridCacheContext<K, V> implements Externalizable {
             expiryPlc = null;
 
         itHolder = new CacheWeakQueryIteratorsHolder(log);
+
+        marshallerCache = cacheType == CacheType.MARSHALLER;
+    }
+
+    /**
+     * @return {@code True} if marshaller cache.
+     */
+    public boolean marshallerCache() {
+        return marshallerCache;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dbc6992..46bd093 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -106,6 +106,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
     private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts
=
         new ConcurrentHashMap8<>();
 
+    /** Pending atomic futures. */
+    private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> sysCacheAtomicFuts
=
+        new ConcurrentHashMap8<>();
+
     /** */
     private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>();
 
@@ -215,18 +219,28 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
             for (GridCacheFuture<?> fut : activeFutures())
                 fut.onNodeLeft(discoEvt.eventNode().id());
 
-            for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) {
-                cacheFut.onNodeLeft(discoEvt.eventNode().id());
+            onNodeLeft(discoEvt.eventNode().id(), sysCacheAtomicFuts);
 
-                if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                    GridCacheVersion futVer = cacheFut.version();
+            onNodeLeft(discoEvt.eventNode().id(), atomicFuts);
+        }
+    };
 
-                    if (futVer != null)
-                        atomicFuts.remove(futVer, cacheFut);
-                }
+    /**
+     * @param nodeId Failed node ID.
+     * @param atomicFuts Futures collection.
+     */
+    private void onNodeLeft(UUID nodeId, ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>>
atomicFuts) {
+        for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) {
+            cacheFut.onNodeLeft(nodeId);
+
+            if (cacheFut.isCancelled() || cacheFut.isDone()) {
+                GridCacheVersion futVer = cacheFut.version();
+
+                if (futVer != null)
+                    atomicFuts.remove(futVer, cacheFut);
             }
         }
-    };
+    }
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
@@ -350,7 +364,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
         for (GridCacheFuture<?> fut : activeFutures())
             ((GridFutureAdapter)fut).onDone(err);
 
-        for (GridCacheAtomicFuture<?> future : atomicFuts.values())
+        for (GridCacheAtomicFuture<?> future : atomicFutures())
             ((GridFutureAdapter)future).onDone(err);
     }
 
@@ -390,11 +404,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
 
     /**
      * @param futVer Future ID.
+     * @param sysCache If {@code true} uses special futures collection.
      * @param fut Future.
      * @return {@code False} if future was forcibly completed with error.
      */
-    public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?>
fut) {
-        IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
+    public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?>
fut, boolean sysCache) {
+        IgniteInternalFuture<?> old = sysCache ? sysCacheAtomicFuts.put(futVer, fut)
: atomicFuts.put(futVer, fut);
 
         assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut
+ ", old=" + old + ']';
 
@@ -405,25 +420,27 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
      * @return Collection of pending atomic futures.
      */
     public Collection<GridCacheAtomicFuture<?>> atomicFutures() {
-        return atomicFuts.values();
+        return F.concat(false, sysCacheAtomicFuts.values(), atomicFuts.values());
     }
 
     /**
      * Gets future by given future ID.
      *
      * @param futVer Future ID.
+     * @param sysCache If {@code true} uses special futures collection.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer)
{
-        return atomicFuts.get(futVer);
+    @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer,
boolean sysCache) {
+        return sysCache ? sysCacheAtomicFuts.get(futVer) : atomicFuts.get(futVer);
     }
 
     /**
      * @param futVer Future ID.
+     * @param sysCache If {@code true} uses special futures collection.
      * @return Removed future.
      */
-    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer)
{
-        return atomicFuts.remove(futVer);
+    @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer,
boolean sysCache) {
+        return sysCache ? sysCacheAtomicFuts.remove(futVer) : atomicFuts.remove(futVer);
     }
 
     /**
@@ -986,7 +1003,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
 
         res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class);
 
-        for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) {
+        for (GridCacheAtomicFuture<?> fut : atomicFutures()) {
             IgniteInternalFuture<Void> complete = fut.completeFuture(topVer);
 
             if (complete != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 481317a..8cb5249 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1339,7 +1339,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         res.returnValue(retVal);
 
                         if (dhtFut != null)
-                            ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+                            ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut, false);
                     }
                     else
                         // Should remap all keys.
@@ -2611,7 +2611,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
         res.nodeId(ctx.localNodeId());
 
-        GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+        GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion(),
+            ctx.marshallerCache());
 
         if (fut != null)
             fut.onResult(nodeId, res);
@@ -2783,7 +2784,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res="
+ res + ']');
 
         GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().
-            atomicFuture(res.futureVersion());
+            atomicFuture(res.futureVersion(), false);
 
         if (updateFut != null)
             updateFut.onResult(nodeId, res);
@@ -2802,7 +2803,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId
+ ", res=" + res + ']');
 
         for (GridCacheVersion ver : res.futureVersions()) {
-            GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver);
+            GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver,
false);
 
             if (updateFut != null)
                 updateFut.onResult(nodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index e31af19..e01ffc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -349,7 +349,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
-            cctx.mvcc().removeAtomicFuture(version());
+            cctx.mvcc().removeAtomicFuture(version(), false);
 
             if (err != null) {
                 if (!mappings.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index eefdc73..9d8f4c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -327,7 +327,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             GridCacheVersion futVer = state.onFutureDone();
 
             if (futVer != null)
-                cctx.mvcc().removeAtomicFuture(futVer);
+                cctx.mvcc().removeAtomicFuture(futVer, cctx.marshallerCache());
 
             return true;
         }
@@ -582,7 +582,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     req = mappings != null ? mappings.get(nodeId) : null;
 
                 if (req != null && req.response() == null) {
-                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(),
+                    res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+                        nodeId,
+                        req.futureVersion(),
                         cctx.deploymentEnabled());
 
                     ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary
node left grid " +
@@ -718,7 +720,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                         topCompleteFut = null;
 
-                        cctx.mvcc().removeAtomicFuture(futVer);
+                        cctx.mvcc().removeAtomicFuture(futVer, cctx.marshallerCache());
 
                         futVer = null;
                         topVer = AffinityTopologyVersion.ZERO;
@@ -847,7 +849,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 futVer = cctx.versions().next(topVer);
 
                 if (storeFuture()) {
-                    if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this))
{
+                    if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this,
cctx.marshallerCache())) {
                         assert isDone() : GridNearAtomicUpdateFuture.this;
 
                         return;
@@ -999,7 +1001,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                         throw new NullPointerException("Null value.");
                 }
                 else if (conflictPutVals != null) {
-                    GridCacheDrInfo conflictPutVal =  conflictPutValsIt.next();
+                    GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
 
                     val = conflictPutVal.value();
                     conflictVer = conflictPutVal.version();


Mime
View raw message