ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3212 Fixed issue with message send failure and late discovery event.
Date Thu, 02 Jun 2016 10:26:58 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3212 bbb15cc83 -> 17999d6f3


ignite-3212 Fixed issue with message send failure and late discovery event.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17999d6f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17999d6f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17999d6f

Branch: refs/heads/ignite-3212
Commit: 17999d6f3e58c1227562b06f3fb4561d2417b56b
Parents: bbb15cc
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Jun 2 13:26:43 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jun 2 13:26:43 2016 +0300

----------------------------------------------------------------------
 .../distributed/GridCacheTxRecoveryFuture.java  | 36 ++++++++++++--------
 .../cache/transactions/IgniteTxManager.java     |  9 ++---
 2 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/17999d6f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 29e4c5b..7525114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
@@ -69,8 +71,8 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
     /** All involved nodes. */
     private final Map<UUID, ClusterNode> nodes;
 
-    /** ID of failed node started transaction. */
-    private final UUID failedNodeId;
+    /** ID of failed nodes started transaction. */
+    private final Set<UUID> failedNodeIds;
 
     /** Transaction nodes mapping. */
     private final Map<UUID, Collection<UUID>> txNodes;
@@ -81,13 +83,13 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
     /**
      * @param cctx Context.
      * @param tx Transaction.
-     * @param failedNodeId ID of failed node started transaction.
+     * @param failedNodeIds ID of failed nodes started transaction.
      * @param txNodes Transaction mapping.
      */
     @SuppressWarnings("ConstantConditions")
     public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
         IgniteInternalTx tx,
-        UUID failedNodeId,
+        Set<UUID> failedNodeIds,
         Map<UUID, Collection<UUID>> txNodes)
     {
         super(CU.boolReducer());
@@ -95,7 +97,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
         this.cctx = cctx;
         this.tx = tx;
         this.txNodes = txNodes;
-        this.failedNodeId = failedNodeId;
+        this.failedNodeIds = failedNodeIds;
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class);
@@ -105,7 +107,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
         UUID locNodeId = cctx.localNodeId();
 
         for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet())
{
-            if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey())
&& !nodes.containsKey(e.getKey())) {
+            if (!locNodeId.equals(e.getKey()) && !failedNodeIds.contains(e.getKey())
&& !nodes.containsKey(e.getKey())) {
                 ClusterNode node = cctx.discovery().node(e.getKey());
 
                 if (node != null)
@@ -115,7 +117,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
             }
 
             for (UUID nodeId : e.getValue()) {
-                if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) &&
!nodes.containsKey(nodeId)) {
+                if (!locNodeId.equals(nodeId) && !failedNodeIds.contains(nodeId)
&& !nodes.containsKey(nodeId)) {
                     ClusterNode node = cctx.discovery().node(nodeId);
 
                     if (node != null)
@@ -128,7 +130,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
 
         UUID nearNodeId = tx.eventNodeId();
 
-        nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
+        nearTxCheck = !failedNodeIds.contains(nearNodeId) && cctx.discovery().alive(nearNodeId);
     }
 
     /**
@@ -170,7 +172,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
                     cctx.io().send(nearNodeId, req, tx.ioPolicy());
                 }
                 catch (ClusterTopologyCheckedException ignore) {
-                    fut.onNodeLeft();
+                    fut.onNodeLeft(nearNodeId);
                 }
                 catch (IgniteCheckedException e) {
                     fut.onError(e);
@@ -255,7 +257,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
              * send message only to primary node.
              */
 
-            if (nodeId.equals(failedNodeId)) {
+            if (failedNodeIds.contains(nodeId)) {
                 for (UUID id : entry.getValue()) {
                     // Skip backup node if it is local node or if it is also was mapped as
primary.
                     if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
@@ -276,7 +278,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
                         cctx.io().send(id, req, tx.ioPolicy());
                     }
                     catch (ClusterTopologyCheckedException ignored) {
-                        fut.onNodeLeft();
+                        fut.onNodeLeft(id);
                     }
                     catch (IgniteCheckedException e) {
                         fut.onError(e);
@@ -302,7 +304,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
                     cctx.io().send(nodeId, req, tx.ioPolicy());
                 }
                 catch (ClusterTopologyCheckedException ignored) {
-                    fut.onNodeLeft();
+                    fut.onNodeLeft(nodeId);
                 }
                 catch (IgniteCheckedException e) {
                     fut.onError(e);
@@ -401,7 +403,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
                 MiniFuture f = (MiniFuture)fut;
 
                 if (f.nodeId().equals(nodeId))
-                    f.onNodeLeft();
+                    f.onNodeLeft(nodeId);
             }
 
         return true;
@@ -514,14 +516,18 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
         }
 
         /**
+         * @param nodeId Failed node ID.
          */
-        private void onNodeLeft() {
+        private void onNodeLeft(UUID nodeId) {
             if (log.isDebugEnabled())
                 log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
 
             if (nearTxCheck) {
+                Set<UUID> failedNodeIds0 = new HashSet<>(failedNodeIds);
+                failedNodeIds0.add(nodeId);
+
                 // Near and originating nodes left, need initiate tx check.
-                cctx.tm().commitIfPrepared(tx);
+                cctx.tm().commitIfPrepared(tx, failedNodeIds0);
 
                 onDone(new ClusterTopologyCheckedException("Transaction node left grid (will
ignore)."));
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/17999d6f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 789ef8d..4ec280f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1872,8 +1872,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * transactions were prepared (invalidates transaction if it is not fully prepared).
      *
      * @param tx Transaction.
+     * @param failedNodeIds Failed nodes IDs.
      */
-    public void commitIfPrepared(IgniteInternalTx tx) {
+    public void commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) {
         assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote  : tx;
         assert !F.isEmpty(tx.transactionNodes()) : tx;
         assert tx.nearXidVersion() != null : tx;
@@ -1881,7 +1882,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
             cctx,
             tx,
-            tx.originatingNodeId(),
+            failedNodeIds,
             tx.transactionNodes());
 
         cctx.mvcc().addFuture(fut, fut.futureId());
@@ -2147,7 +2148,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         // Check prepare only if originating node ID failed. Otherwise parent
node will finish this tx.
                         if (tx.originatingNodeId().equals(evtNodeId)) {
                             if (tx.state() == PREPARED)
-                                commitIfPrepared(tx);
+                                commitIfPrepared(tx, Collections.singleton(evtNodeId));
                             else {
                                 IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
 
@@ -2155,7 +2156,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                                     prepFut.listen(new CI1<IgniteInternalFuture<?>>()
{
                                         @Override public void apply(IgniteInternalFuture<?>
fut) {
                                             if (tx.state() == PREPARED)
-                                                commitIfPrepared(tx);
+                                                commitIfPrepared(tx, Collections.singleton(evtNodeId));
                                             else if (tx.setRollbackOnly())
                                                 tx.rollbackAsync();
                                         }


Mime
View raw message