ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: zk
Date Wed, 20 Dec 2017 09:49:58 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-zk d6ec00c0e -> a3a625699


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a3a62569
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a3a62569
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a3a62569

Branch: refs/heads/ignite-zk
Commit: a3a6256996586cfebe97b964b56be864277226f5
Parents: d6ec00c
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 20 11:45:12 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 20 12:49:35 2017 +0300

----------------------------------------------------------------------
 .../DefaultCommunicationProblemResolver.java    | 108 ++++++--
 .../communication/tcp/TcpCommunicationSpi.java  |  12 +-
 .../TcpCommunicationConnectionCheckFuture.java  |   3 +-
 .../ZookeeperDiscoverySpiBasicTest.java         | 273 ++++++++++++++++---
 4 files changed, 333 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a3a62569/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
index ed2ddb8..ca7bcd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
@@ -19,27 +19,92 @@ package org.apache.ignite.configuration;
 
 import java.util.BitSet;
 import java.util.List;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.LoggerResource;
 
 /**
  *
  */
 public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver
{
+    @LoggerResource
+    private IgniteLogger log;
+
     /** {@inheritDoc} */
     @Override public void resolve(CommunicationProblemContext ctx) {
-        ClusterGraph graph = new ClusterGraph(ctx);
+        ClusterGraph graph = new ClusterGraph(log, ctx);
 
-        BitSet cluster = graph.findLargestIndependentCluster();
+        ClusterSearch cluster = graph.findLargestIndependentCluster();
 
         List<ClusterNode> nodes = ctx.topologySnapshot();
 
-        if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size())
{
-            for (int i = 0; i < nodes.size(); i++) {
-                if (!cluster.get(i))
-                    ctx.killNode(nodes.get(i));
+        assert nodes.size() > 0;
+        assert cluster != null;
+
+        if (graph.checkFullyConnected(cluster.nodesBitSet)) {
+            assert cluster.nodeCnt <= nodes.size();
+
+            if (cluster.nodeCnt < nodes.size()) {
+                if (log.isInfoEnabled()) {
+                    log.info("Communication problem resolver found fully connected independent
cluster [" +
+                        "clusterSrvCnt=" + cluster.srvCnt +
+                        ", clusterTotalNodes=" + cluster.nodeCnt +
+                        ", totalAliveNodes=" + nodes.size() + "]");
+                }
+
+                for (int i = 0; i < nodes.size(); i++) {
+                    if (!cluster.nodesBitSet.get(i))
+                        ctx.killNode(nodes.get(i));
+                }
             }
+            else
+                U.warn(log, "All alive nodes are fully connected, this should be resolved
automatically.");
+        }
+        else {
+            if (log.isInfoEnabled()) {
+                log.info("Communication problem resolver failed to find fully connected independent
cluster.");
+            }
+        }
+    }
+
+    /**
+     * @param cluster Cluster nodes mask.
+     * @param nodes Nodes.
+     * @param limit IDs limit.
+     * @return Cluster node IDs string.
+     */
+    private static String clusterNodeIds(BitSet cluster, List<ClusterNode> nodes, int
limit) {
+        int startIdx = 0;
+
+        StringBuilder builder = new StringBuilder();
+
+        int cnt = 0;
+
+        for (;;) {
+            int idx = cluster.nextSetBit(startIdx);
+
+            if (idx == -1)
+                break;
+
+            startIdx = idx + 1;
+
+            if (builder.length() == 0) {
+                builder.append('[');
+            }
+            else
+                builder.append(", ");
+
+            builder.append(nodes.get(idx).id());
+
+            if (cnt++ > limit)
+                builder.append(", ...");
         }
+
+        builder.append(']');
+
+        return builder.toString();
     }
 
     /**
@@ -70,13 +135,8 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem
         /** */
         private final static int WORD_IDX_SHIFT = 6;
 
-        /**
-         * @param bitIndex Bit index.
-         * @return Word index containing bit with given index.
-         */
-        private static int wordIndex(int bitIndex) {
-            return bitIndex >> WORD_IDX_SHIFT;
-        }
+        /** */
+        private final IgniteLogger log;
 
         /** */
         private final int nodeCnt;
@@ -91,9 +151,11 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem
         private final List<ClusterNode> nodes;
 
         /**
+         * @param log Logger.
          * @param ctx Context.
          */
-        ClusterGraph(CommunicationProblemContext ctx) {
+        ClusterGraph(IgniteLogger log, CommunicationProblemContext ctx) {
+            this.log = log;
             this.ctx = ctx;
 
             nodes = ctx.topologySnapshot();
@@ -106,6 +168,14 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem
         }
 
         /**
+         * @param bitIndex Bit index.
+         * @return Word index containing bit with given index.
+         */
+        private static int wordIndex(int bitIndex) {
+            return bitIndex >> WORD_IDX_SHIFT;
+        }
+
+        /**
          * @param bitCnt Number of bits.
          * @return Bit set words.
          */
@@ -116,7 +186,7 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem
         /**
          * @return Cluster nodes bit set.
          */
-        BitSet findLargestIndependentCluster() {
+        ClusterSearch findLargestIndependentCluster() {
             ClusterSearch maxCluster = null;
 
             for (int i = 0; i < nodeCnt; i++) {
@@ -127,11 +197,17 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem
 
                 search(cluster, i);
 
+                if (log.isInfoEnabled()) {
+                    log.info("Communication problem resolver found cluster [srvCnt=" + cluster.srvCnt
+
+                        ", totalNodeCnt=" + cluster.nodeCnt +
+                        ", nodeIds=" + clusterNodeIds(cluster.nodesBitSet, nodes, 1000) +
"]");
+                }
+
                 if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt)
                     maxCluster = cluster;
             }
 
-            return maxCluster != null ? maxCluster.nodesBitSet : null;
+            return maxCluster;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3a62569/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 37be29f..4b7199d 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2595,7 +2595,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
         sendMessage0(node, msg, null);
     }
 
-    /** {@inheritDoc} */
+    /**
+     * @param nodes Nodes to check connection with.
+     * @return Result future (each bit in result BitSet contains connection status to corresponding
node).
+     */
     public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
         TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture(
             this,
@@ -2603,7 +2606,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
             nioSrvr,
             nodes);
 
-        fut.init(failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout);
+        long timeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout;
+
+        if (log.isInfoEnabled())
+            log.info("Start check connection process [nodeCnt=" + nodes.size() + ", timeout="
+ timeout + ']');
+
+        fut.init(timeout);
 
         return new IgniteFutureImpl<>(fut);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3a62569/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
index 170ee44..9c161d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -334,6 +334,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
         /**
          *
          */
+        @SuppressWarnings("unchecked")
         void cancel() {
             if (finish(false))
                 nioSrvr.cancelConnect(ch, sesMeta);
@@ -440,7 +441,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit
 
                 futs[idx++] = fut;
 
-                if (done())
+                if (resCnt == Integer.MAX_VALUE)
                     return;
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a3a62569/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 1373da8..6d61ac2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -25,8 +25,10 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -49,6 +51,8 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CommunicationProblemContext;
+import org.apache.ignite.configuration.CommunicationProblemResolver;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -77,10 +81,9 @@ import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.logger.java.JavaLogger;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.configuration.CommunicationProblemContext;
-import org.apache.ignite.configuration.CommunicationProblemResolver;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -103,6 +106,7 @@ import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
 /**
  * TODO ZK: test with max client connections limit error.
  */
+@SuppressWarnings("deprecation")
 public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     /** */
     private static final String IGNITE_ZK_ROOT = ZookeeperDiscoverySpi.DFLT_ROOT_PATH;
@@ -199,6 +203,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
             @IgniteInstanceResource
             private Ignite ignite;
 
+            @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
             @Override public boolean apply(Event evt) {
                 try {
                     DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
@@ -773,7 +778,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
         c1.closeSocket(true);
 
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>()
{
-            @Override public Void call() throws Exception {
+            @Override public Void call() {
                 try {
                     startGrid(2);
                 }
@@ -864,7 +869,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
         final AtomicInteger nodeIdx = new AtomicInteger(initNodes);
 
         IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
-            @Override public Void call() throws Exception {
+            @Override public Void call() {
                 try {
                     startGrid(nodeIdx.getAndIncrement());
                 }
@@ -929,16 +934,16 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @param node
-     * @return
+     * @param node Node.
+     * @return Corresponding znode.
      */
     private static String aliveZkNodePath(Ignite node) {
         return aliveZkNodePath(node.configuration().getDiscoverySpi());
     }
 
     /**
-     * @param spi
-     * @return
+     * @param spi SPI.
+     * @return Znode related to given SPI.
      */
     private static String aliveZkNodePath(DiscoverySpi spi) {
         String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", "locNodeZkPath");
@@ -947,14 +952,15 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @param log
-     * @param connectString
-     * @param failedZkNodes
-     * @param timeout
-     * @throws Exception
+     * @param log Logger.
+     * @param connectString Zookeeper connect string.
+     * @param failedZkNodes Znodes which should be removed.
+     * @param timeout Timeout.
+     * @throws Exception If failed.
      */
     private static void waitNoAliveZkNodes(final IgniteLogger log,
-        String connectString, final List<String> failedZkNodes,
+        String connectString,
+        final List<String> failedZkNodes,
         long timeout)
         throws Exception
     {
@@ -1563,7 +1569,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
         Ignite srv0 = startGrid(0);
 
         // Send large message, single node in topology.
-        IgniteCache cache = srv0.createCache(largeCacheConfiguration("c1"));
+        IgniteCache<Object, Object> cache = srv0.createCache(largeCacheConfiguration("c1"));
 
         for (int i = 0; i < 100; i++)
             cache.put(i, i);
@@ -1893,7 +1899,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
 
         startGridsMultiThreaded(1, 3);
 
-        ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(3));
+        ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3));
 
         commSpi.pingLatch = new CountDownLatch(1);
 
@@ -1970,8 +1976,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * TODO ZK: kill random, kill coordinator multiple times.
-     *
      * @param startNodes Number of nodes to start.
      * @param killNodes Nodes to kill by resolve process.
      * @throws Exception If failed.
@@ -1983,7 +1987,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
 
         startGrids(startNodes);
 
-        ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(0));
+        ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(0));
 
         commSpi.checkRes = new BitSet(startNodes);
 
@@ -2026,15 +2030,102 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testCommunicationErrorResolve_KillCoordinator_5() throws Exception {
+        sesTimeout = 2000;
+
+        testCommSpi = true;
+        commProblemRslvr = KillCoordinatorCommunicationProblemResolver.FACTORY;
+
+        startGrids(10);
+
+        int crd = 0;
+
+        int nodeIdx = 10;
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            for (Ignite node : G.allGrids())
+                ZkTestCommunicationSpi.spi(node).initCheckResult(10);
+
+            UUID crdId = ignite(crd).cluster().localNode().id();
+
+            ZookeeperDiscoverySpi spi = spi(ignite(crd + 1));
+
+            try {
+                spi.resolveCommunicationError(spi.getNode(crdId), new Exception("test"));
+
+                fail("Exception is not thrown");
+            }
+            catch (IgniteSpiException e) {
+                assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException);
+            }
+
+            waitForTopology(9);
+
+            startGrid(nodeIdx++);
+
+            waitForTopology(10);
+
+            crd++;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillRandom() throws Exception {
+        sesTimeout = 2000;
+
+        testCommSpi = true;
+        commProblemRslvr = KillRandomCommunicationProblemResolver.FACTORY;
+
+        startGridsMultiThreaded(10);
+
+        client = true;
+
+        startGridsMultiThreaded(10, 5);
+
+        int nodeIdx = 15;
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            ZookeeperDiscoverySpi spi = null;
+
+            for (Ignite node : G.allGrids()) {
+                ZkTestCommunicationSpi.spi(node).initCheckResult(100);
+
+                spi = spi(node);
+            }
+
+            try {
+                spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new
Exception("test"));
+            }
+            catch (IgniteSpiException ignore) {
+                // No-op.
+            }
+
+            client = ThreadLocalRandom.current().nextBoolean();
+
+            startGrid(nodeIdx++);
+
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testDefaultCommunicationErrorResolver1() throws Exception {
         testCommSpi = true;
         sesTimeout = 5000;
 
         startGrids(3);
 
-        ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(3, 0, 1);
-        ZkTestCommunicationSpi.forNode(ignite(1)).initCheckResult(3, 0, 1);
-        ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(3, 2);
+        ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(3, 0, 1);
+        ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(3, 0, 1);
+        ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(3, 2);
 
         UUID killedId = nodeId(2);
 
@@ -2062,11 +2153,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
 
         startGridsMultiThreaded(3, 2);
 
-        ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(5, 0, 1);
-        ZkTestCommunicationSpi.forNode(ignite(1)).initCheckResult(5, 0, 1);
-        ZkTestCommunicationSpi.forNode(ignite(2)).initCheckResult(5, 2, 3, 4);
-        ZkTestCommunicationSpi.forNode(ignite(3)).initCheckResult(5, 2, 3, 4);
-        ZkTestCommunicationSpi.forNode(ignite(4)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(5, 0, 1);
+        ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(5, 0, 1);
+        ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.spi(ignite(3)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.spi(ignite(4)).initCheckResult(5, 2, 3, 4);
 
         ZookeeperDiscoverySpi spi = spi(ignite(0));
 
@@ -2079,15 +2170,53 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testDefaultCommunicationErrorResolver3() throws Exception {
+        defaultCommunicationErrorResolver_BreakCommunication(3, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultCommunicationErrorResolver4() throws Exception {
+        defaultCommunicationErrorResolver_BreakCommunication(3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultCommunicationErrorResolver5() throws Exception {
+        defaultCommunicationErrorResolver_BreakCommunication(10, 1, 3, 6);
+    }
+
+    /**
+     * @param startNodes Initial nodes number.
+     * @param breakNodes Node indices where communication server is closed.
+     * @throws Exception If failed.
+     */
+    private void defaultCommunicationErrorResolver_BreakCommunication(int startNodes, final
int...breakNodes) throws Exception {
         sesTimeout = 5000;
 
-        startGridsMultiThreaded(3);
+        startGridsMultiThreaded(startNodes);
 
-        info("Close communication");
+        final CyclicBarrier b = new CyclicBarrier(breakNodes.length);
 
-        ((TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).simulateNodeFailure();
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer threadIdx) {
+                try {
+                    b.await();
 
-        waitForTopology(2);
+                    int nodeIdx = breakNodes[threadIdx];
+
+                    info("Close communication: " + nodeIdx);
+
+                    ((TcpCommunicationSpi)ignite(nodeIdx).configuration().getCommunicationSpi()).simulateNodeFailure();
+                }
+                catch (Exception e) {
+                    fail("Unexpected error: " + e);
+                }
+            }
+        }, breakNodes.length, "break-communication");
+
+        waitForTopology(startNodes - breakNodes.length);
     }
 
     /**
@@ -2414,6 +2543,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     /**
      *
      */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     private void checkEventsConsistency() {
         for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet())
{
             UUID nodeId = nodeEvtEntry.getKey();
@@ -2517,6 +2647,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
      */
     private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception
{
         assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
             @Override public boolean apply() {
                 Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId);
 
@@ -2548,17 +2679,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @param node Node.
-     */
-    private static void closeZkClient(Ignite node) {
-        DiscoverySpi spi = node.configuration().getDiscoverySpi();
-
-        assertTrue(spi.getClass().getName(), spi instanceof ZookeeperDiscoverySpi);
-
-        closeZkClient((ZookeeperDiscoverySpi)spi);
-    }
-
-    /**
      * @param spi Spi instance.
      */
     private static void closeZkClient(ZookeeperDiscoverySpi spi) {
@@ -2717,7 +2837,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
      * @param cacheName Cache name.
      * @return Configuration.
      */
-    private CacheConfiguration largeCacheConfiguration(String cacheName) {
+    private CacheConfiguration<Object, Object> largeCacheConfiguration(String cacheName)
{
         CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(cacheName);
 
         ccfg.setAffinity(new TestAffinityFunction(1024 * 1024));
@@ -2802,6 +2922,71 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     /**
      *
      */
+    static class KillCoordinatorCommunicationProblemResolver implements CommunicationProblemResolver
{
+        /** */
+        static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>()
{
+            @Override public CommunicationProblemResolver apply() {
+                return new KillCoordinatorCommunicationProblemResolver();
+            }
+        };
+
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void resolve(CommunicationProblemContext ctx) {
+            List<ClusterNode> nodes = ctx.topologySnapshot();
+
+            ClusterNode node = nodes.get(0);
+
+            log.info("Resolver kills node: " + node.id());
+
+            ctx.killNode(node);
+        }
+    }
+
+    /**
+     *
+     */
+    static class KillRandomCommunicationProblemResolver implements CommunicationProblemResolver
{
+        /** */
+        static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>()
{
+            @Override public CommunicationProblemResolver apply() {
+                return new KillRandomCommunicationProblemResolver();
+            }
+        };
+
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void resolve(CommunicationProblemContext ctx) {
+            List<ClusterNode> nodes = ctx.topologySnapshot();
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            int killNodes = rnd.nextInt(nodes.size() / 2);
+
+            log.info("Resolver kills nodes [total=" + nodes.size() + ", kill=" + killNodes
+ ']');
+
+            Set<Integer> idxs = new HashSet<>();
+
+            while (idxs.size() < killNodes)
+                idxs.add(rnd.nextInt(nodes.size()));
+
+            for (int idx : idxs) {
+                ClusterNode node = nodes.get(idx);
+
+                log.info("Resolver kills node: " + node.id());
+
+                ctx.killNode(node);
+            }
+        }
+    }
+
+    /**
+     *
+     */
     static class TestNodeKillCommunicationProblemResolver implements CommunicationProblemResolver
{
         /**
          * @param killOrders Killed nodes order.
@@ -2852,7 +3037,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
          * @param ignite Node.
          * @return Node's communication SPI.
          */
-        static ZkTestCommunicationSpi forNode(Ignite ignite) {
+        static ZkTestCommunicationSpi spi(Ignite ignite) {
             return (ZkTestCommunicationSpi)ignite.configuration().getCommunicationSpi();
         }
 


Mime
View raw message