ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [4/4] ignite git commit: temp
Date Tue, 07 Mar 2017 16:15:59 GMT
temp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3df4c7e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3df4c7e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3df4c7e

Branch: refs/heads/ignite-4473-1
Commit: e3df4c7e01f14a4cf44cefc6016373d3386095a5
Parents: 741ecef
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Tue Mar 7 19:16:08 2017 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Tue Mar 7 19:16:08 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  14 ++
 .../GridDhtPartitionsExchangeFuture.java        |  25 +-
 .../service/GridServiceProcessor.java           |  86 +++----
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  15 +-
 .../ignite/internal/IgniteClientRejoinTest.java | 250 +++++++++++++++----
 5 files changed, 289 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e3df4c7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0dd1a58..9661525 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1682,8 +1682,18 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
 
                             while (true) {
                                 try {
+                                    log().error("Start to wait fut: " + exchFut.topologyVersion());
+
+                                    if (cctx.localNode().isClient() && exchFut.topologyVersion().topologyVersion()
== 4) {
+                                        int z = 0;
+
+                                        ++z;
+                                    }
+
                                     exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(),
TimeUnit.MILLISECONDS);
 
+                                    log().error("Exch fut completed: " + exchFut.topologyVersion());
+
                                     break;
                                 }
                                 catch (IgniteFutureTimeoutCheckedException ignored) {
@@ -1705,8 +1715,12 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
 
                                         dumpedObjects++;
                                     }
+
+                                    log().error("Exch fut failed: " + exchFut.topologyVersion(),
ignored);
                                 }
                                 catch (Exception e) {
+                                    log().error("Exch fut failed: " + exchFut.topologyVersion(),
e);
+
                                     if (cctx.localNode().isClient() && X.hasCause(e,
IOException.class))
                                         throw new IgniteCouldReconnectCheckedException("Reconnect",
e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3df4c7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 96389d0..11c1462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCouldReconnectCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.jetbrains.annotations.Nullable;
@@ -513,9 +515,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             if (cctx.localNode().isClient() && X.hasCause(e,
                 IOException.class,
-                IgniteClientDisconnectedCheckedException.class))
-                onDone(new IgniteCouldReconnectCheckedException("Local node could be reconnected.
[locNodeId="
-                    + cctx.localNodeId() + ']', e));
+                IgniteClientDisconnectedCheckedException.class)) {
+                IgniteInternalFuture<Object> rejoin = cctx.discovery().rejoin();
+
+                try {
+                    rejoin.get();
+                }
+                catch (IgniteCheckedException e0) {
+                    log.error("Failed to rejoin.", e0);
+                }
+
+                IgniteFuture<?> reconnectFut = cctx.kernalContext().cluster().clientReconnectFuture();
+
+                onDone(new IgniteClientDisconnectedException(reconnectFut,
+                    "Client node reconnected [locNodeId=" + cctx.localNodeId() + ']', e));
+            }
             else
                 onDone(e);
 
@@ -691,9 +705,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
             }
             else {
-                if (!centralizedAff)
+                if (!centralizedAff) {
                     sendLocalPartitions(crd);
 
+                    log.error("!!!! Successfully sent to coordinator: " + topologyVersion());
+                }
+
                 initDone();
 
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3df4c7e/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 43918fc..d26242d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1508,60 +1508,60 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         }
     }
 
-    /**
-     * Deployment callback.
-     *
-     * @param dep Service deployment.
-     * @param topVer Topology version.
-     */
-    private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion
topVer) {
-        // Retry forever.
-        try {
-            AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
-
-            // If topology version changed, reassignment will happen from topology event.
-            if (newTopVer.equals(topVer))
-                reassign(dep, topVer);
-        }
-        catch (IgniteCheckedException e) {
-            if (!(e instanceof ClusterTopologyCheckedException))
-                log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(),
e);
-
-            AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
-
-            if (!newTopVer.equals(topVer)) {
-                assert newTopVer.compareTo(topVer) > 0;
+        /**
+         * Deployment callback.
+         *
+         * @param dep Service deployment.
+         * @param topVer Topology version.
+         */
+        private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion
topVer) {
+            // Retry forever.
+            try {
+                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                // Reassignment will happen from topology event.
-                return;
+                // If topology version changed, reassignment will happen from topology event.
+                if (newTopVer.equals(topVer))
+                    reassign(dep, topVer);
             }
+            catch (IgniteCheckedException e) {
+                if (!(e instanceof ClusterTopologyCheckedException))
+                    log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(),
e);
 
-            ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
-                private IgniteUuid id = IgniteUuid.randomUuid();
+                AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
 
-                private long start = System.currentTimeMillis();
+                if (!newTopVer.equals(topVer)) {
+                    assert newTopVer.compareTo(topVer) > 0;
 
-                @Override public IgniteUuid timeoutId() {
-                    return id;
+                    // Reassignment will happen from topology event.
+                    return;
                 }
 
-                @Override public long endTime() {
-                    return start + RETRY_TIMEOUT;
-                }
+                ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+                    private IgniteUuid id = IgniteUuid.randomUuid();
 
-                @Override public void onTimeout() {
-                    if (!busyLock.enterBusy())
-                        return;
+                    private long start = System.currentTimeMillis();
 
-                    try {
-                        // Try again.
-                        onDeployment(dep, topVer);
+                    @Override public IgniteUuid timeoutId() {
+                        return id;
                     }
-                    finally {
-                        busyLock.leaveBusy();
+
+                    @Override public long endTime() {
+                        return start + RETRY_TIMEOUT;
                     }
-                }
-            });
+
+                    @Override public void onTimeout() {
+                        if (!busyLock.enterBusy())
+                            return;
+
+                        try {
+                            // Try again.
+                            onDeployment(dep, topVer);
+                        }
+                        finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+                });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3df4c7e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index bbbd4c7..5ca9794 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1155,6 +1155,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         msg,
                         sockTimeout);
 
+                    // TODO: need to rethinking
                     if (forceFut != null)
                         throw new IgniteCheckedException("Force fail local node.");
 
@@ -1885,10 +1886,16 @@ class ClientImpl extends TcpDiscoveryImpl {
                 if (joining()) {
                     Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
 
-                    if (dataMap != null) {
-                        for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
-                            spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(),
-                                U.resolveClassLoader(spi.ignite().configuration()));
+                    try {
+                        if (dataMap != null) {
+                            for (Map.Entry<UUID, Map<Integer, byte[]>> entry
: dataMap.entrySet())
+                                spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(),
+                                    U.resolveClassLoader(spi.ignite().configuration()));
+                        }
+                    }
+                    catch (Throwable e) {
+                        int z = 0;
+                        ++z;
                     }
 
                     locNode.setAttributes(msg.clientNodeAttributes());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3df4c7e/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
index d91bed6..b34ee89 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -28,14 +28,17 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteCouldReconnectCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -43,6 +46,8 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -51,6 +56,12 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
  * Tests client to be able restore connection to cluster if coordination is not available.
  */
 public class IgniteClientRejoinTest extends GridCommonAbstractTest {
+    /** Keys. */
+    public static final int KEYS = 100;
+
+    /** Static IP finder. */
+    public static final TcpDiscoveryIpFinder finder = new TcpDiscoveryVmIpFinder(true);
+
     /** Block. */
     private volatile boolean block;
 
@@ -58,16 +69,6 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
     private volatile ClusterNode crd;
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true");
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK");
-    }
-
-    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
     }
@@ -76,18 +77,24 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        if (gridName.contains("client")) {
-            cfg.setCommunicationSpi(new TcpCommunicationSpi());
+        boolean client = gridName.contains("client");
 
-            TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
-            DiscoverySpi dspi = new DiscoverySpi();
+        log.error("Grid name: " + gridName + ", client: " + client);
 
-            dspi.setIpFinder(spi.getIpFinder());
+        TcpCommunicationSpi comSpi = new TcpCommunicationSpi(client);
 
-            cfg.setDiscoverySpi(dspi);
+        comSpi.setIdleConnectionTimeout(100);
+        comSpi.setReconnectCount(1);
+        cfg.setCommunicationSpi(comSpi);
 
-            cfg.setClientMode(true);
-        }
+        DiscoverySpi dspi = new DiscoverySpi(client);
+        dspi.setIpFinder(finder);
+
+        if (client)
+            dspi.setJoinTimeout(TimeUnit.MINUTES.toMillis(5));
+
+        cfg.setDiscoverySpi(dspi);
+        cfg.setClientMode(client);
 
         return cfg;
     }
@@ -95,24 +102,27 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testReconnect() throws Exception {
+    public void testReconnectOnStart() throws Exception {
         Ignite srv1 = startGrid("server1");
 
         crd = ((IgniteKernal)srv1).localNode();
 
+        log.info("Coordinator node: " + crd);
+
         Ignite srv2 = startGrid("server2");
 
+        // Block sending messages to coordinator.
         block = true;
 
         IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>()
{
             @Override public Boolean call() throws Exception {
                 Random rnd = new Random();
 
-                U.sleep((rnd.nextInt(15) + 30) * 1000);
+                U.sleep((rnd.nextInt(15) + 30) * 10_000);
 
                 block = false;
 
-                System.out.println("ALLOW connection to coordinator.");
+                log.info("ALLOW connection to coordinator.");
 
                 return true;
             }
@@ -120,16 +130,113 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest
{
 
         Ignite client = startGrid("client");
 
-
-        assert fut.get();
+        assertTrue(fut.get());
 
         IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some");
 
-        for (int i = 0; i < 100; i++)
+        for (int i = 0; i < 100; i++) {
             cache.put(i, i);
 
-        for (int i = 0; i < 100; i++)
-            assert i == cache.get(i);
+            assertEquals(i, (int)cache.get(i));
+        }
+
+        Collection<ClusterNode> clients = client.cluster().forClients().nodes();
+
+        assertEquals("Clients: " + clients, 1, clients.size());
+        assertEquals(1, srv1.cluster().forClients().nodes().size());
+        assertEquals(1, srv2.cluster().forClients().nodes().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        final String CACHE_NAME = "some";
+
+        final Ignite srv1 = startGrid("server1");
+
+        crd = ((IgniteKernal)srv1).localNode();
+
+        Ignite srv2 = startGrid("server2");
+
+        Ignite client = startGrid("client");
+
+        final IgniteCache<Integer, Integer> cache = client.getOrCreateCache(CACHE_NAME);
+
+        final Integer keyOnCrd = primaryKey(srv1.cache(CACHE_NAME));
+
+        for (int i = 0; i < 100; i++) {
+            cache.put(i, i);
+
+            assertEquals(i, (int)cache.get(i));
+        }
+
+        awaitPartitionMapExchange();
+
+        log.error("!!!! Blocked message to coordinator.");
+
+        // Block sending messages to coordinator.
+        block = true;
+
+//        IgniteInternalFuture<Object> f = GridTestUtils.runAsync(new Callable<Object>()
{
+//            @Override public Object call() throws Exception {
+//                try {
+//                    cache.put(keyOnCrd, -42);
+//                }
+//                catch (Exception ignore) {
+//                    ignore.printStackTrace();
+//                }
+//
+//                return null;
+//            }
+//        });
+//
+//        try {
+//            f.get(100);
+//        }
+//        catch (Exception e) {
+//            e.printStackTrace();
+//        }
+
+        IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>()
{
+            @Override public Boolean call() throws Exception {
+                Random rnd = new Random();
+
+                int ms = 30_000;
+
+                log.error("Block time: " + ms);
+
+                U.sleep(ms);
+
+                block = false;
+
+                log.info("ALLOW connection to coordinator.");
+
+                return true;
+            }
+        });
+
+        U.sleep(5_000);
+
+        startGrid("server3");
+
+        fut.get();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return srv1.cluster().forClients().nodes().size() == 1;
+            }
+        }, 100_000);
+
+        IgniteCache<Object, Object> cache0 = client.cache(CACHE_NAME);
+
+        int cnt = 0;
+
+        cache0.put(1, 42);
+
+        log.error("Retries count: " + cnt);
+
+        assertTrue(fut.get());
 
         Collection<ClusterNode> clients = client.cluster().forClients().nodes();
 
@@ -141,7 +248,9 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testManyClientsReconnect() throws Exception {
+    public void testManyClientsReconnectOnStart() throws Exception {
+        final int SERVERS_CNT = 2;
+
         Ignite srv1 = startGrid("server1");
 
         crd = ((IgniteKernal)srv1).localNode();
@@ -154,9 +263,9 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
 
         final CountDownLatch latch = new CountDownLatch(1);
 
-        final int CLIENTS_NUM = 5;
+        final int CLIENTS_CNT = 5;
 
-        for (int i = 0; i < CLIENTS_NUM; i++) {
+        for (int i = 0; i < CLIENTS_CNT; i++) {
             final int idx = i;
 
             IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>()
{
@@ -191,15 +300,17 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
 
             IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name());
 
-            for (int i = 0; i < 100; i++)
+            for (int i = 0; i < KEYS; i++) {
                 cache.put(i, i);
 
-            for (int i = 0; i < 100; i++)
-                assert i == cache.get(i);
+                assertEquals(i, (int)cache.get(i));
+            }
+
+            assertEquals(SERVERS_CNT, client.cluster().forServers().nodes().size());
         }
 
-        assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
-        assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+        assertEquals(CLIENTS_CNT, srv1.cluster().forClients().nodes().size());
+        assertEquals(CLIENTS_CNT, srv2.cluster().forClients().nodes().size());
     }
 
     /** {@inheritDoc} */
@@ -207,14 +318,31 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
         return 2 * 60_000;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isDebug() {
+        return true;
+    }
+
     /**
      *
      */
     private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi
{
+        /** Server. */
+        private final boolean client;
+
+        /**
+         * @param client {@code True} if server.
+         */
+        public TcpCommunicationSpi(boolean client) {
+            this.client = client;
+        }
+
         /** {@inheritDoc} */
         @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException
{
-            if (block && node.id().equals(crd.id()))
-                throw new IgniteSpiException(new SocketException("Test communication exception"));
+            if (client) {
+                if (block && node.id().equals(crd.id())) // Failed send to coordinator.
+                    throw new IgniteSpiException(new SocketException("Test communication
exception"));
+            }
 
             super.sendMessage(node, msg);
         }
@@ -222,8 +350,10 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override public void sendMessage(ClusterNode node, Message msg,
             IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
-            if (block && node.id().equals(crd.id()))
-                throw new IgniteSpiException(new SocketException("Test communication exception"));
+            if (client) {
+                if (block && node.id().equals(crd.id()))
+                    throw new IgniteSpiException(new SocketException("Test communication
exception"));
+            }
 
             super.sendMessage(node, msg, ackC);
         }
@@ -233,11 +363,23 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
      *
      */
     private class DiscoverySpi extends TcpDiscoverySpi {
+        /** Server. */
+        private final boolean client;
+
+        /**
+         * @param client {@code True} if server.
+         */
+        public DiscoverySpi(boolean client) {
+            this.client = client;
+        }
+
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
byte[] data,
             long timeout) throws IOException {
-            if (block && sock.getPort() == 47500)
-                throw new SocketException("Test discovery exception");
+            if (client) {
+                if (block && sock.getPort() == 47500) // Failed send to coordinator
+                    throw new SocketException("Test discovery exception");
+            }
 
             super.writeToSocket(sock, msg, data, timeout);
         }
@@ -245,8 +387,10 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
-            if (block && sock.getPort() == 47500)
-                throw new SocketException("Test discovery exception");
+            if (client) {
+                if (block && sock.getPort() == 47500)
+                    throw new SocketException("Test discovery exception");
+            }
 
             super.writeToSocket(sock, msg, timeout);
         }
@@ -254,8 +398,10 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
             long timeout) throws IOException, IgniteCheckedException {
-            if (block && sock.getPort() == 47500)
-                throw new SocketException("Test discovery exception");
+            if (client) {
+                if (block && sock.getPort() == 47500)
+                    throw new SocketException("Test discovery exception");
+            }
 
             super.writeToSocket(sock, out, msg, timeout);
         }
@@ -263,8 +409,10 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock,
int res,
             long timeout) throws IOException {
-            if (block && sock.getPort() == 47500)
-                throw new SocketException("Test discovery exception");
+            if (client) {
+                if (block && sock.getPort() == 47500)
+                    throw new SocketException("Test discovery exception");
+            }
 
             super.writeToSocket(msg, sock, res, timeout);
         }
@@ -272,8 +420,10 @@ public class IgniteClientRejoinTest extends GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
             IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException
{
-            if (block && sock.getPort() == 47500)
-                throw new SocketException("Test discovery exception");
+            if (client) {
+                if (block && sock.getPort() == 47500)
+                    throw new SocketException("Test discovery exception");
+            }
 
             return super.openSocket(sock, remAddr, timeoutHelper);
         }


Mime
View raw message