ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [01/24] ignite git commit: ignite-3116 Cancel force keys futures on node stop (cherry picked from commit 0428018)
Date Tue, 17 May 2016 04:02:28 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 223708a89 -> b52edcaae


ignite-3116 Cancel force keys futures on node stop (cherry picked from commit 0428018)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db8a9b2b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db8a9b2b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db8a9b2b

Branch: refs/heads/master
Commit: db8a9b2b68ba505376c3be4abe789c84fc8e47cd
Parents: ea909bc
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 16 09:08:14 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 16 09:44:28 2016 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtGetFuture.java | 77 ++++++++++----------
 .../distributed/dht/GridDhtGetSingleFuture.java |  4 +
 .../dht/GridDhtTransactionalCacheAdapter.java   | 57 ++++++++++++++-
 .../dht/atomic/GridDhtAtomicCache.java          | 52 ++++++++++++-
 .../dht/preloader/GridDhtForceKeysFuture.java   |  6 +-
 .../dht/preloader/GridDhtPreloader.java         | 28 ++++++-
 .../communication/tcp/TcpCommunicationSpi.java  | 32 ++++----
 7 files changed, 198 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index fbfca82..e12e1ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -167,9 +169,44 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
      * Initializes future.
      */
     void init() {
-        map(keys);
+        GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(),
topVer);
+
+        if (fut != null) {
+            if (!F.isEmpty(fut.invalidPartitions())) {
+                if (retries == null)
+                    retries = new HashSet<>();
+
+                retries.addAll(fut.invalidPartitions());
+            }
+
+            fut.listen(new CI1<IgniteInternalFuture<Object>>() {
+                @Override public void apply(IgniteInternalFuture<Object> fut) {
+                    try {
+                        fut.get();
+                    }
+                    catch (NodeStoppingException e) {
+                        return;
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to request keys from preloader [keys=" + keys
+ ", err=" + e + ']');
+
+                        onDone(e);
+
+                        return;
+                    }
+
+                    map0(keys);
+
+                    markInitialized();
+                }
+            });
+        }
+        else {
+            map0(keys);
 
-        markInitialized();
+            markInitialized();
+        }
     }
 
     /** {@inheritDoc} */
@@ -205,42 +242,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     }
 
     /**
-     * @param keys Keys.
-     */
-    private void map(final Map<KeyCacheObject, Boolean> keys) {
-        GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(),
topVer);
-
-        if (fut != null) {
-            if (!F.isEmpty(fut.invalidPartitions())) {
-                if (retries == null)
-                    retries = new HashSet<>();
-
-                retries.addAll(fut.invalidPartitions());
-            }
-
-            add(new GridEmbeddedFuture<>(
-                new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>()
{
-                    @Override public Collection<GridCacheEntryInfo> apply(Object o,
Exception e) {
-                        if (e != null) { // Check error first.
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to request keys from preloader [keys="
+ keys + ", err=" + e + ']');
-
-                            onDone(e);
-                        }
-                        else
-                            map0(keys);
-
-                        // Finish this one.
-                        return Collections.emptyList();
-                    }
-                },
-                fut));
-        }
-        else
-            map0(keys);
-    }
-
-    /**
      * @param keys Keys to map.
      */
     private void map0(Map<KeyCacheObject, Boolean> keys) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 2de92b1..9394937 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -220,6 +221,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
                                     log.debug("Failed to request keys from preloader " +
                                         "[keys=" + key + ", err=" + e + ']');
 
+                                if (e instanceof NodeStoppingException)
+                                    return;
+
                                 onDone(e);
                             }
                             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 7bace73..0a99621 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -378,11 +379,38 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends
GridDhtCach
         IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
             ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion());
 
-        if (keyFut == null || keyFut.isDone())
+        if (keyFut == null || keyFut.isDone()) {
+            if (keyFut != null) {
+                try {
+                    keyFut.get();
+                }
+                catch (NodeStoppingException e) {
+                    return;
+                }
+                catch (IgniteCheckedException e) {
+                    onForceKeysError(nodeId, req, e);
+
+                    return;
+                }
+            }
+
             processDhtLockRequest0(nodeId, req);
+        }
         else {
             keyFut.listen(new CI1<IgniteInternalFuture<Object>>() {
-                @Override public void apply(IgniteInternalFuture<Object> t) {
+                @Override public void apply(IgniteInternalFuture<Object> fut) {
+                    try {
+                        fut.get();
+                    }
+                    catch (NodeStoppingException e) {
+                        return;
+                    }
+                    catch (IgniteCheckedException e) {
+                        onForceKeysError(nodeId, req, e);
+
+                        return;
+                    }
+
                     processDhtLockRequest0(nodeId, req);
                 }
             });
@@ -392,6 +420,31 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends
GridDhtCach
     /**
      * @param nodeId Node ID.
      * @param req Request.
+     * @param e Error.
+     */
+    private void onForceKeysError(UUID nodeId, GridDhtLockRequest req, IgniteCheckedException
e) {
+        GridDhtLockResponse res = new GridDhtLockResponse(ctx.cacheId(),
+            req.version(),
+            req.futureId(),
+            req.miniId(),
+            e,
+            ctx.deploymentEnabled());
+
+        try {
+            ctx.io().send(nodeId, res, ctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException e0) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send lock reply to remote node because it left grid:
" + nodeId);
+        }
+        catch (IgniteCheckedException e0) {
+            U.error(log, "Failed to send lock reply to node: " + nodeId, e);
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
      */
     protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) {
         assert nodeId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1f01a76..6a30f7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -39,6 +39,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -1466,11 +1467,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
 
-        if (forceFut == null || forceFut.isDone())
+        if (forceFut == null || forceFut.isDone()) {
+            try {
+                if (forceFut != null)
+                    forceFut.get();
+            }
+            catch (NodeStoppingException e) {
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                onForceKeysError(nodeId, req, completionCb, e);
+
+                return;
+            }
+
             updateAllAsyncInternal0(nodeId, req, completionCb);
+        }
         else {
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
-                @Override public void apply(IgniteInternalFuture<Object> t) {
+                @Override public void apply(IgniteInternalFuture<Object> fut) {
+                    try {
+                        fut.get();
+                    }
+                    catch (NodeStoppingException e) {
+                        return;
+                    }
+                    catch (IgniteCheckedException e) {
+                        onForceKeysError(nodeId, req, completionCb, e);
+
+                        return;
+                    }
+
                     updateAllAsyncInternal0(nodeId, req, completionCb);
                 }
             });
@@ -1478,6 +1505,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param req Update request.
+     * @param completionCb Completion callback.
+     * @param e Error.
+     */
+    private void onForceKeysError(final UUID nodeId,
+        final GridNearAtomicUpdateRequest req,
+        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        IgniteCheckedException e
+    ) {
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+            nodeId,
+            req.futureVersion(),
+            ctx.deploymentEnabled());
+
+        res.addFailedKeys(req.keys(), e);
+
+        completionCb.apply(req, res);
+    }
+
+    /**
      * Executes local update after preloader fetched values.
      *
      * @param nodeId Node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 7970a44..4da1f38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -246,7 +246,11 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
 
             int curTopVer = topCntr.get();
 
-            preloader.addFuture(this);
+            if (!preloader.addFuture(this)) {
+                assert isDone() : this;
+
+                return false;
+            }
 
             trackable = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0de3197..09aec81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -115,6 +116,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** */
     private final AtomicInteger partsEvictOwning = new AtomicInteger();
 
+    /** */
+    private volatile boolean stopping;
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -210,6 +214,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("DHT rebalancer onKernalStop callback.");
 
+        stopping = true;
+
         cctx.events().removeListener(discoLsnr);
 
         // Acquire write busy lock.
@@ -221,8 +227,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         if (demander != null)
             demander.stop();
 
+        IgniteCheckedException err = stopError();
+
+        for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
+            fut.onDone(err);
+
         top = null;
     }
+    /**
+     * @return Node stop exception.
+     */
+    private IgniteCheckedException stopError() {
+        return new NodeStoppingException("Operation has been cancelled (cache or node is
stopping).");
+    }
 
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
@@ -711,9 +728,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * Adds future to future map.
      *
      * @param fut Future to add.
+     * @return {@code False} if node cache is stopping and future was completed with error.
      */
-    void addFuture(GridDhtForceKeysFuture<?, ?> fut) {
+    boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
         forceKeyFuts.put(fut.futureId(), fut);
+
+        if (stopping) {
+            fut.onDone(stopError());
+
+            return false;
+        }
+
+        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a9b2b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 904047e..875131d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1381,25 +1381,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public void dumpStats() {
-        StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
+        IgniteLogger log = this.log;
 
-        for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet())
{
-            GridNioRecoveryDescriptor desc = entry.getValue();
+        if (log != null) {
+            StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors:
").append(U.nl());
 
-            sb.append("    [key=").append(entry.getKey())
-                .append(", msgsSent=").append(desc.sent())
-                .append(", msgsAckedByRmt=").append(desc.acked())
-                .append(", msgsRcvd=").append(desc.received())
-                .append(", descIdHash=").append(System.identityHashCode(desc))
-                .append(']').append(U.nl());
-        }
+            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet())
{
+                GridNioRecoveryDescriptor desc = entry.getValue();
 
-        U.warn(log, sb.toString());
+                sb.append("    [key=").append(entry.getKey())
+                    .append(", msgsSent=").append(desc.sent())
+                    .append(", msgsAckedByRmt=").append(desc.acked())
+                    .append(", msgsRcvd=").append(desc.received())
+                    .append(", descIdHash=").append(System.identityHashCode(desc))
+                    .append(']').append(U.nl());
+            }
 
-        GridNioServer<Message> nioSrvr1 = nioSrvr;
+            U.warn(log, sb.toString());
+        }
 
-        if (nioSrvr1 != null)
-            nioSrvr1.dumpStats();
+        GridNioServer<Message> nioSrvr = this.nioSrvr;
+
+        if (nioSrvr != null)
+            nioSrvr.dumpStats();
     }
 
     /** {@inheritDoc} */


Mime
View raw message