ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/4] ignite git commit: ignite-3547
Date Fri, 29 Jul 2016 14:12:08 GMT
ignite-3547


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53d50901
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53d50901
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53d50901

Branch: refs/heads/ignite-3547
Commit: 53d5090187639177e9e483de4a4a40bd4d99e62e
Parents: e8f8104
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 29 15:09:34 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 29 16:27:58 2016 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  | 43 +++++----
 .../IgniteCacheConnectionRecoveryTest.java      | 92 ++++++++++++++++----
 2 files changed, 93 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53d50901/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7c7da6c..f847bd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -353,6 +353,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 UUID id = ses.meta(NODE_ID_META);
 
                 if (id != null) {
+                    GridCommunicationClient client = clients.get(id);
+
+                    if (client instanceof GridTcpNioCommunicationClient &&
+                        ((GridTcpNioCommunicationClient) client).session() == ses) {
+                        client.close();
+
+                        clients.remove(id, client);
+                    }
+
                     if (!stopping) {
                         boolean reconnect = false;
 
@@ -372,9 +381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 recoveryData.onNodeLeft();
                         }
 
-                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id,
-                            ses,
-                            recoveryData,
+                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
                             reconnect);
 
                         commWorker.addProcessDisconnectRequest(disconnectData);
@@ -1400,6 +1407,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(']').append(U.nl());
             }
 
+            sb.append("Communication SPI clients: ").append(U.nl());
+
+            for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet())
{
+                sb.append("    [node=").append(entry.getKey())
+                    .append(", client=").append(entry.getValue())
+                    .append(']').append(U.nl());
+            }
+
             U.warn(log, sb.toString());
         }
 
@@ -3179,12 +3194,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          * @param sesInfo Disconnected session information.
          */
         private void processDisconnect(DisconnectedSessionInfo sesInfo) {
-            GridCommunicationClient client = clients.get(sesInfo.nodeId);
-
-            if (client instanceof GridTcpNioCommunicationClient &&
-                ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses)
-                    clients.remove(sesInfo.nodeId, client);
-
             if (sesInfo.reconnect) {
                 GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
 
@@ -3197,7 +3206,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id()
+ ']');
 
-                    client = reserveClient(node);
+                    GridCommunicationClient client = reserveClient(node);
 
                     client.release();
                 }
@@ -3748,29 +3757,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private static class DisconnectedSessionInfo {
         /** */
-        private final UUID nodeId;
-
-        /** */
-        private final GridNioSession ses;
-
-        /** */
         private final GridNioRecoveryDescriptor recoveryDesc;
 
         /** */
         private final boolean reconnect;
 
         /**
-         * @param nodeId Node ID.
-         * @param ses Session.
          * @param recoveryDesc Recovery descriptor.
          * @param reconnect Reconnect flag.
          */
-        public DisconnectedSessionInfo(UUID nodeId,
-            GridNioSession ses,
-            @Nullable GridNioRecoveryDescriptor recoveryDesc,
+        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc,
             boolean reconnect) {
-            this.nodeId = nodeId;
-            this.ses = ses;
             this.recoveryDesc = recoveryDesc;
             this.reconnect = reconnect;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53d50901/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
index 826b0e7..4a07674 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
@@ -17,16 +17,21 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -51,6 +56,12 @@ public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest
{
     /** */
     private boolean client;
 
+    /** */
+    private static final int SRVS = 5;
+
+    /** */
+    private static final int CLIENTS = 5;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -59,6 +70,10 @@ public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest
{
 
         cfg.setClientMode(client);
 
+        cfg.setCacheConfiguration(
+            cacheConfiguration("cache1", TRANSACTIONAL),
+            cacheConfiguration("cache2", ATOMIC));
+
         return cfg;
     }
 
@@ -66,11 +81,11 @@ public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest
{
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGridsMultiThreaded(5);
+        startGridsMultiThreaded(SRVS);
 
         client = true;
 
-        startGridsMultiThreaded(5, 5);
+        startGridsMultiThreaded(SRVS, CLIENTS);
     }
 
     /** {@inheritDoc} */
@@ -84,47 +99,86 @@ public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testConnectionRecovery() throws Exception {
-        ignite(0).createCache(cacheConfiguration("cache1", TRANSACTIONAL));
-        ignite(0).createCache(cacheConfiguration("cache2", ATOMIC));
-
-        final Map<Integer, Integer> data = new HashMap<>();
+        final Map<Integer, Integer> data = new TreeMap<>();
 
         for (int i = 0; i < 500; i++)
             data.put(i, i);
 
         final AtomicInteger idx = new AtomicInteger();
 
-        final long stopTime = U.currentTimeMillis() + 10_000;
+        final long stopTime = U.currentTimeMillis() + 30_000;
+
+        final AtomicReference<CyclicBarrier> barrierRef = new AtomicReference<>();
+
+        final int TEST_THREADS = (CLIENTS + SRVS) * 2;
 
         IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>()
{
             @Override public Object call() throws Exception {
-                Ignite node = ignite(idx.incrementAndGet() % 10);
+                int idx0 = idx.getAndIncrement();
+                Ignite node = ignite(idx0 % (SRVS + CLIENTS));
+
+                Thread.currentThread().setName("test-thread-" + idx0 + "-" + node.name());
 
                 IgniteCache cache1 = node.cache("cache1").withAsync();
                 IgniteCache cache2 = node.cache("cache2").withAsync();
 
-                int cnt = 0;
+                int iter = 0;
 
                 while (U.currentTimeMillis() < stopTime) {
-                    cache1.putAll(data);
-                    cache1.future().get(30, SECONDS);
+                    try {
+                        cache1.putAll(data);
+                        cache1.future().get(15, SECONDS);
+
+                        cache2.putAll(data);
+                        cache2.future().get(15, SECONDS);
 
+                        CyclicBarrier b = barrierRef.get();
 
-                    cache2.putAll(data);
-                    cache2.future().get(5, SECONDS);
+                        if (b != null)
+                            b.await(15, SECONDS);
+                    }
+                    catch (Exception e) {
+                        synchronized (IgniteCacheConnectionRecoveryTest.class) {
+                            log.error("Failed to execute update, will dump debug information"
+
+                                " [err=" + e+ ", iter=" + iter + ']', e);
 
-                    log.info("Iteration: " + cnt++);
+                            List<Ignite> nodes = IgnitionEx.allGridsx();
+
+                            for (Ignite node0 : nodes)
+                                ((IgniteKernal)node0).dumpDebugInfo();
+
+                            U.dumpThreads(log);
+                        }
+
+                        throw e;
+                    }
                 }
 
                 return null;
             }
-        }, 30, "test-thread");
+        }, TEST_THREADS, "test-thread");
 
         while  (System.currentTimeMillis() < stopTime) {
-            for (Ignite node : G.allGrids())
-                IgniteCacheMessageRecoveryAbstractTest.closeSessions(node);
+            boolean closed = false;
+
+            for (Ignite node : G.allGrids()) {
+                if (IgniteCacheMessageRecoveryAbstractTest.closeSessions(node))
+                    closed = true;
+            }
+
+            if (closed) {
+                CyclicBarrier b = new CyclicBarrier(TEST_THREADS + 1, new Runnable() {
+                    @Override public void run() {
+                        barrierRef.set(null);
+                    }
+                });
+
+                barrierRef.set(b);
+
+                b.await();
+            }
 
-            U.sleep(500);
+            U.sleep(50);
         }
 
         fut.get();


Mime
View raw message