ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ra...@apache.org
Subject [18/50] [abbrv] ignite git commit: ignite-1758 Fixed client reconnect issues
Date Wed, 11 Nov 2015 00:10:33 GMT
ignite-1758 Fixed client reconnect issues


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/627510c3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/627510c3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/627510c3

Branch: refs/heads/ignite-1527
Commit: 627510c398e57bbe02d40cc83a970154030b5494
Parents: cb1a334
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Nov 3 17:42:01 2015 +0300
Committer: Raul Kripalani <raulk@apache.org>
Committed: Wed Nov 11 00:09:41 2015 +0000

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        |  6 ++
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 21 +++--
 .../TcpDiscoveryClientReconnectMessage.java     |  1 +
 .../messages/TcpDiscoveryDiscardMessage.java    |  1 +
 ...gniteClientReconnectMassiveShutdownTest.java | 84 +++++++++++++-------
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |  5 +-
 6 files changed, 79 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 77e47a7..cef38e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1447,6 +1447,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                                             ", old=" + oldest.id() + ", new=" + newOldest.id()
+ ']');
                                 }
                             }
+                            else {
+                                ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed
to " +
+                                    "wait for exchange future, all server nodes left.");
+
+                                onDone(err);
+                            }
 
                             if (set) {
                                 // If received any messages, process them.

http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ee9f818..0fe2881 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1454,18 +1454,18 @@ class ServerImpl extends TcpDiscoveryImpl {
             tmp = U.arrayList(readers);
         }
 
-        for (ClientMessageWorker msgWorker : clientMsgWorkers.values()) {
-            U.interrupt(msgWorker);
-
-            U.join(msgWorker, log);
-        }
-
         U.interrupt(tmp);
         U.joinThreads(tmp, log);
 
         U.interrupt(msgWorker);
         U.join(msgWorker, log);
 
+        for (ClientMessageWorker msgWorker : clientMsgWorkers.values()) {
+            U.interrupt(msgWorker);
+
+            U.join(msgWorker, log);
+        }
+
         U.interrupt(statsPrinter);
         U.join(statsPrinter, log);
     }
@@ -1778,7 +1778,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     Collection<TcpDiscoveryNode> top = new ArrayList<>(allNodes.size());
 
                     for (TcpDiscoveryNode n0 : allNodes) {
-                        if (n0.internalOrder() != 0 && n0.internalOrder() < node.internalOrder())
+                        assert n0.internalOrder() > 0 : n0;
+
+                        if (n0.internalOrder() < node.internalOrder())
                             top.add(n0);
                     }
 
@@ -3239,6 +3241,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
             else {
+                if (isLocalNodeCoordinator())
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
+
                 if (isLocNodeRouter) {
                     ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
 
@@ -3249,7 +3254,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             locNodeId + ", clientNodeId=" + nodeId + ']');
                 }
                 else {
-                    if (ring.hasRemoteNodes() && !locNodeId.equals(msg.verifierNodeId()))
+                    if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
                         sendMessageAcrossRing(msg);
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
index c232e6c..7c0cd5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java
@@ -26,6 +26,7 @@ import org.apache.ignite.lang.IgniteUuid;
 /**
  * Message telling that client node is reconnecting to topology.
  */
+@TcpDiscoveryEnsureDelivery
 public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
index 145f19e..4b4eb9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
@@ -40,6 +40,7 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage
{
      *
      * @param creatorNodeId Creator node ID.
      * @param msgId Message ID.
+     * @param customMsgDiscard Flag indicating whether the ID to discard is for a custom
message or not.
      */
     public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard)
{
         super(creatorNodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
index 6f0e887..5282cf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java
@@ -22,6 +22,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.CacheException;
@@ -35,12 +36,14 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -131,7 +134,7 @@ public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstract
 
         assertTrue(client.configuration().isClientMode());
 
-        CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>();
+        final CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>();
 
         cfg.setCacheMode(PARTITIONED);
         cfg.setAtomicityMode(TRANSACTIONAL);
@@ -141,6 +144,8 @@ public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstract
 
         IgniteCache<String, Integer> cache = client.getOrCreateCache(cfg);
 
+        assertNotNull(cache);
+
         HashMap<String, Integer> put = new HashMap<>();
 
         // Load some data.
@@ -155,59 +160,80 @@ public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstract
         for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
             clientIdx.add(i);
 
+        final CountDownLatch latch = new CountDownLatch(CLIENT_GRID_CNT);
+
         IgniteInternalFuture<?> clientsFut = multithreadedAsync(
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    int idx = clientIdx.take();
+                    try {
+                        int idx = clientIdx.take();
 
-                    Ignite ignite = grid(idx);
+                        Ignite ignite = grid(idx);
 
-                    Thread.currentThread().setName("client-thread-" + ignite.name());
+                        Thread.currentThread().setName("client-thread-" + ignite.name());
 
-                    assertTrue(ignite.configuration().isClientMode());
+                        assertTrue(ignite.configuration().isClientMode());
 
-                    IgniteCache<String, Integer> cache = ignite.cache(null);
+                        IgniteCache<String, Integer> cache = ignite.getOrCreateCache(cfg);
 
-                    IgniteTransactions txs = ignite.transactions();
+                        assertNotNull(cache);
 
-                    Random rand = new Random();
+                        IgniteTransactions txs = ignite.transactions();
 
-                    while (!done.get()) {
-                        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ))
{
-                            cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000));
+                        Random rand = new Random();
 
-                            tx.commit();
-                        }
-                        catch (ClusterTopologyException ex) {
-                            ex.retryReadyFuture().get();
-                        }
-                        catch (IgniteException | CacheException e) {
-                            if (X.hasCause(e, IgniteClientDisconnectedException.class)) {
-                                IgniteClientDisconnectedException cause = X.cause(e,
-                                    IgniteClientDisconnectedException.class);
+                        latch.countDown();
 
-                                assert cause != null;
+                        while (!done.get()) {
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                                cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000));
 
-                                cause.reconnectFuture().get();
+                                tx.commit();
                             }
-                            else if (X.hasCause(e, ClusterTopologyException.class)) {
-                                ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+                            catch (ClusterTopologyException ex) {
+                                ex.retryReadyFuture().get();
+                            }
+                            catch (IgniteException | CacheException e) {
+                                if (X.hasCause(e, IgniteClientDisconnectedException.class))
{
+                                    IgniteClientDisconnectedException cause = X.cause(e,
+                                        IgniteClientDisconnectedException.class);
+
+                                    assert cause != null;
+
+                                    cause.reconnectFuture().get();
+                                }
+                                else if (X.hasCause(e, ClusterTopologyException.class)) {
+                                    ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
 
-                                assert cause != null;
+                                    assert cause != null;
 
-                                cause.retryReadyFuture().get();
+                                    cause.retryReadyFuture().get();
+                                }
+                                else
+                                    throw e;
                             }
-                            else
-                                throw e;
                         }
+
+                        return null;
                     }
+                    catch (Throwable e) {
+                        log.error("Unexpected error: " + e, e);
 
-                    return null;
+                        throw e;
+                    }
                 }
             },
             CLIENT_GRID_CNT, "client-thread");
 
         try {
+            if (!latch.await(30, SECONDS)) {
+                log.warning("Failed to wait for for clients start.");
+
+                U.dumpThreads(log);
+
+                fail("Failed to wait for for clients start.");
+            }
+
             // Killing a half of server nodes.
             final int srvsToKill = GRID_CNT / 2;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/627510c3/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 09b3ef8..55474dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -161,8 +162,6 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
      * @throws Exception If any error occurs.
      */
     public void testMultiThreadedClientsRestart() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1123");
-
         final AtomicBoolean done = new AtomicBoolean();
 
         try {
@@ -271,6 +270,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
                                         if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class)
||
                                             X.hasCause(e, IgniteClientDisconnectedException.class))
                                             log.info("Client disconnected: " + e);
+                                        else if (X.hasCause(e, ClusterTopologyCheckedException.class))
+                                            log.info("Client failed to start: " + e);
                                         else {
                                             if (failedNodes.contains(id) && X.hasCause(e,
IgniteSpiException.class))
                                                 log.info("Client failed: " + e);


Mime
View raw message