ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4705
Date Wed, 01 Mar 2017 15:32:39 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4705 8baff9a6b -> 8b3b9db46


ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8b3b9db4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8b3b9db4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8b3b9db4

Branch: refs/heads/ignite-4705
Commit: 8b3b9db46614ebff0257aaf4c9c668a4047fab2a
Parents: 8baff9a
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Mar 1 18:12:52 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Mar 1 18:32:32 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 21 +++--
 .../GridDhtAtomicDeferredUpdateResponse.java    |  3 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 85 ++++++++++++++++++++
 3 files changed, 103 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8b3b9db4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c6a6a15..a58f1ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3490,18 +3490,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     private void sendDhtNearResponse(final UUID primaryId,
         final GridDhtAtomicAbstractUpdateRequest req,
         GridDhtAtomicNearResponse nearRes) {
+        DeferredResponseClosure c = primaryId != null ?
+            new DeferredResponseClosure(req.partition(), primaryId, req.futureId()) : null;
+
         try {
             ClusterNode node = ctx.discovery().node(req.nearNodeId());
 
             if (node == null)
                 throw new ClusterTopologyCheckedException("Node left: " + req.nearNodeId());
 
-            if (primaryId != null) {
+            if (c != null) {
                 ctx.gridIO().send(node,
                     TOPIC_CACHE,
                     nearRes,
                     ctx.ioPolicy(),
-                    new DeferredResponseClosure(req.partition(), primaryId, req.futureId()));
+                    c);
+
+                c = null;
             }
             else
                 ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy());
@@ -3514,9 +3519,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             }
         }
         catch (ClusterTopologyCheckedException ignored) {
-            U.warn(msgLog, "Failed to send DHT near response, node left [futId=" + req.futureId()
+
-                ", nearFutId=" + req.nearFutureId() +
-                ", node=" + req.nearNodeId() + ']');
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Failed to send DHT near response, node left [futId=" + req.futureId()
+
+                    ", nearFutId=" + req.nearFutureId() +
+                    ", node=" + req.nearNodeId() + ']');
+            }
         }
         catch (IgniteCheckedException e) {
             U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
@@ -3524,6 +3531,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 ", node=" + req.nearNodeId() +
                 ", res=" + nearRes + ']', e);
         }
+        finally {
+            if (c != null)
+                c.apply(null);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b3b9db4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 4e9ee86..9fe183f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -46,6 +47,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage
implem
 
     /** */
     @GridDirectTransient
+    @GridToStringExclude
     private GridTimeoutObject timeoutSnd;
 
     /**
@@ -64,7 +66,6 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage
implem
     public GridDhtAtomicDeferredUpdateResponse(int cacheId, GridLongList futIds) {
         this.cacheId = cacheId;
         this.futIds = futIds;
-        this.timeoutSnd = timeoutSnd;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b3b9db4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 8aeb903..69e9348 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
@@ -329,6 +330,90 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPutNearNodeFailure() throws Exception {
+        startGrids(2);
+
+        client = true;
+
+        Ignite clientNode = startGrid(2);
+
+        final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1,
FULL_SYNC));
+        IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+        awaitPartitionMapExchange();
+
+        final Ignite srv0 = grid(0);
+        final Ignite srv1 = grid(1);
+
+        final Integer key = primaryKey(srv0.cache(TEST_CACHE));
+
+        nearAsyncCache.put(key, key);
+
+        testSpi(srv1).blockMessages(GridDhtAtomicNearResponse.class, clientNode.name());
+
+        stopGrid(2);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return ((IgniteKernal)srv0).context().cache().context().mvcc().atomicFuturesCount()
== 0;
+            }
+        }, 5000);
+
+        assertEquals(0, ((IgniteKernal)srv0).context().cache().context().mvcc().atomicFuturesCount());
+        assertEquals(0, ((IgniteKernal)srv1).context().cache().context().mvcc().atomicFuturesCount());
+
+        checkData(F.asMap(key, key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllNearNodeFailure() throws Exception {
+        final int SRVS = 4;
+
+        startGrids(SRVS);
+
+        client = true;
+
+        Ignite clientNode = startGrid(SRVS);
+
+        final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1,
FULL_SYNC));
+        IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+        awaitPartitionMapExchange();
+
+        for (int i = 0; i < SRVS; i++)
+            testSpi(grid(i)).blockMessages(GridDhtAtomicNearResponse.class, clientNode.name());
+
+        Map<Integer, Integer> map = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            map.put(i, i);
+
+        nearAsyncCache.putAll(map);
+
+        stopGrid(SRVS);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (int i = 0; i < SRVS; i++) {
+                    if (grid(i).context().cache().context().mvcc().atomicFuturesCount() !=
0)
+                        return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        for (int i = 0; i < SRVS; i++)
+            assertEquals(0, grid(i).context().cache().context().mvcc().atomicFuturesCount());
+
+        checkData(map);
+    }
+
+    /**
      * @param expData Expected cache data.
      */
     private void checkData(Map<Integer, Integer> expData) {


Mime
View raw message