ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1452 Cancel cache operations on node stop
Date Wed, 16 Sep 2015 14:19:22 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1452 4ee280f02 -> b0b33f0d2


ignite-1452 Cancel cache operations on node stop


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0b33f0d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0b33f0d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0b33f0d

Branch: refs/heads/ignite-1452
Commit: b0b33f0d2d7a9787a6539ec09f9a855018e764fb
Parents: 4ee280f
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 16 17:19:12 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 16 17:19:12 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 42 +++++++++++++++-----
 .../colocated/GridDhtColocatedLockFuture.java   | 12 +++++-
 .../distributed/near/GridNearLockFuture.java    | 12 +++++-
 .../near/GridNearOptimisticTxPrepareFuture.java | 16 +++++++-
 4 files changed, 69 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b0b33f0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 20340d1..714017e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -147,6 +147,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
 
+    /** */
+    private volatile IgniteCheckedException stopErr;
+
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -381,7 +385,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
         cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
 
-        IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ?
+        stopErr = cctx.kernalContext().clientDisconnected() ?
             new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
                 "Client node disconnected: " + cctx.gridName()) :
             new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
@@ -391,11 +395,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         if (exchFuts0 != null) {
             for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
-                f.onDone(err);
+                f.onDone(stopErr);
         }
 
         for (AffinityReadyFuture f : readyFuts.values())
-            f.onDone(err);
+            f.onDone(stopErr);
+
+        for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts)
+            f.onDone(stopErr);
+
+        if (locExchFut != null)
+            locExchFut.onDone(stopErr);
 
         U.cancel(exchWorker);
 
@@ -519,6 +529,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             fut.onDone(topVer);
         }
+        else if (stopErr != null)
+            fut.onDone(stopErr);
 
         return fut;
     }
@@ -791,6 +803,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (discoEvt != null)
             fut.onEvent(exchId, discoEvt);
 
+        if (stopErr != null)
+            fut.onDone(stopErr);
+
         return fut;
     }
 
@@ -799,12 +814,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param err Error.
      */
     public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable
err) {
-        if (err == null) {
-            AffinityTopologyVersion topVer = exchFut.topologyVersion();
+        AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
-            if (log.isDebugEnabled())
-                log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ']');
+        if (log.isDebugEnabled())
+            log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" +
err + ']');
 
+        if (err == null) {
             while (true) {
                 AffinityTopologyVersion readyVer = readyTopVer.get();
 
@@ -825,8 +840,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
             }
         }
-        else if (log.isDebugEnabled())
-            log.debug("Exchange done with error [fut=" + exchFut + ", err=" + err + ']');
+        else {
+            for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet())
{
+                if (entry.getKey().compareTo(topVer) <= 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Completing created topology ready future with error "
+
+                            "[ver=" + topVer + ", fut=" + entry.getValue() + ']');
+
+                    entry.getValue().onDone(err);
+                }
+            }
+        }
 
         ExchangeFutureSet exchFuts0 = exchFuts;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0b33f0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 596ec77..7e07649 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -643,10 +643,15 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             }
             else {
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
                         try {
+                            fut.get();
+
                             mapOnTopology(remap, c);
                         }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
                         finally {
                             cctx.shared().txContextReset();
                         }
@@ -1327,8 +1332,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                         affFut.listen(new CI1<IgniteInternalFuture<?>>() {
                             @Override public void apply(IgniteInternalFuture<?> fut)
{
                                 try {
+                                    fut.get();
+
                                     remap();
                                 }
+                                catch (IgniteCheckedException e) {
+                                    onDone(e);
+                                }
                                 finally {
                                     cctx.shared().txContextReset();
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0b33f0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index f3e5ca3..090fe28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -777,10 +777,15 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             }
             else {
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
                         try {
+                            fut.get();
+
                             mapOnTopology(remap);
                         }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
                         finally {
                             cctx.shared().txContextReset();
                         }
@@ -1435,8 +1440,13 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                         affFut.listen(new CI1<IgniteInternalFuture<?>>() {
                             @Override public void apply(IgniteInternalFuture<?> fut)
{
                                 try {
+                                    fut.get();
+
                                     remap();
                                 }
+                                catch (IgniteCheckedException e) {
+                                    onDone(e);
+                                }
                                 finally {
                                     cctx.shared().txContextReset();
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0b33f0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2048fdf..8b171a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -343,12 +343,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         }
         else {
             topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
+                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion>
fut) {
                     cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
                         @Override public void run() {
                             try {
+                                fut.get();
+
                                 prepareOnTopology(remap, c);
                             }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
                             finally {
                                 cctx.txContextReset();
                             }
@@ -841,7 +846,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                         if (affFut != null && !affFut.isDone()) {
                             affFut.listen(new CI1<IgniteInternalFuture<?>>()
{
                                 @Override public void apply(IgniteInternalFuture<?>
fut) {
-                                    remap();
+                                    try {
+                                        fut.get();
+
+                                        remap();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        onDone(e);
+                                    }
                                 }
                             });
                         }


Mime
View raw message