ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [25/50] [abbrv] ignite git commit: ignite-6519 Race in SplitAwareTopologyValidator on activator and server node join
Date Wed, 25 Oct 2017 09:40:59 GMT
ignite-6519 Race in SplitAwareTopologyValidator on activator and server node join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d90b8fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d90b8fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d90b8fe

Branch: refs/heads/ignite-3478
Commit: 5d90b8feb5eb65ce190ca4106d31f386c7be42a3
Parents: 01daee6
Author: Alexandr Kuramshin <akuramshin@gridgain.com>
Authored: Mon Oct 23 15:28:28 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 23 15:28:28 2017 +0300

----------------------------------------------------------------------
 .../internal/TestRecordingCommunicationSpi.java |  12 +
 ...niteTopologyValidatorGridSplitCacheTest.java | 358 +++++++++++++++----
 .../IgniteCacheTopologySplitAbstractTest.java   | 266 ++++++++++++++
 3 files changed, 564 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index ab61687..cf4f059 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -71,6 +72,12 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi
{
     /** {@inheritDoc} */
     @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackC)
         throws IgniteSpiException {
+        // All ignite code expects that 'send' fails after discovery listener for node fail
finished.
+        if (getSpiContext().node(node.id()) == null) {
+            throw new IgniteSpiException(new ClusterTopologyCheckedException("Failed to send
message" +
+                " (node left topology): " + node));
+        }
+
         if (msg instanceof GridIoMessage) {
             GridIoMessage ioMsg = (GridIoMessage)msg;
 
@@ -115,6 +122,11 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi
{
         super.sendMessage(node, msg, ackC);
     }
 
+    /** {@inheritDoc} */
+    @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException
{
+        sendMessage(node, msg, null);
+    }
+
     /**
      * @param recordP Record predicate.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
index 1f3b875..1885e9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
@@ -17,32 +17,43 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.TopologyValidator;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.CacheNameResource;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT;
 
 /**
  * Tests complex scenario with topology validator. Grid is split between to data centers,
defined by attribute {@link
  * #DC_NODE_ATTR}. If only nodes from single DC are left in topology, grid is moved into
inoperative state until special
  * activator node'll enter a topology, enabling grid operations.
  */
-public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstractTest {
+public class IgniteTopologyValidatorGridSplitCacheTest extends IgniteCacheTopologySplitAbstractTest
{
+
     /** */
     private static final String DC_NODE_ATTR = "dc";
 
@@ -50,10 +61,10 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
     private static final String ACTIVATOR_NODE_ATTR = "split.resolved";
 
     /** */
-    private static final int GRID_CNT = 8;
+    private static final int GRID_CNT = 32;
 
     /** */
-    private static final int CACHES_CNT = 100;
+    private static final int CACHES_CNT = 50;
 
     /** */
     private static final int RESOLVER_GRID_IDX = GRID_CNT;
@@ -62,7 +73,62 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
     private static final int CONFIGLESS_GRID_IDX = GRID_CNT + 1;
 
     /** */
-    private boolean useCacheGrp = false;
+    private static final String STATIC_IP = "127.0.0.1";
+
+    /** */
+    private static final Collection<String> SEG_FINDER_0;
+
+    /** */
+    private static final Collection<String> SEG_FINDER_1;
+
+    /** */
+    private static final Collection<String> SEG_FINDER_ALL;
+
+    static {
+        Collection<String> seg0 = new ArrayList<>();
+
+        Collection<String> seg1 = new ArrayList<>();
+
+        for (int i = 0; i < GRID_CNT; i += 2) {
+            seg0.add(STATIC_IP + ':' + (DFLT_PORT + i));
+
+            seg1.add(STATIC_IP + ':' + (DFLT_PORT + i + 1));
+        }
+        SEG_FINDER_0 = Collections.unmodifiableCollection(seg0);
+
+        SEG_FINDER_1 = Collections.unmodifiableCollection(seg1);
+
+        SEG_FINDER_ALL = F.concat(false, SEG_FINDER_0, SEG_FINDER_1);
+    }
+
+    /** */
+    private boolean useCacheGrp;
+
+    /**  */
+    private int getDiscoPort(int gridIdx) {
+        return DFLT_PORT + gridIdx;
+    }
+
+    /**  */
+    private boolean isDiscoPort(int port) {
+        return port >= DFLT_PORT &&
+            port <= (DFLT_PORT + TcpDiscoverySpi.DFLT_PORT_RANGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isBlocked(int locPort, int rmtPort) {
+        return isDiscoPort(locPort) && isDiscoPort(rmtPort) && segment(locPort)
!= segment(rmtPort);
+    }
+
+    /**  */
+    private int segment(int discoPort) {
+        return (discoPort - DFLT_PORT) % 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int segment(ClusterNode node) {
+        return node.attribute(DC_NODE_ATTR);
+    }
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
@@ -70,17 +136,32 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
 
         int idx = getTestIgniteInstanceIndex(gridName);
 
-        cfg.setUserAttributes(F.asMap(DC_NODE_ATTR, idx % 2));
+        Map<String, Object> userAttrs = new HashMap<>(4);
+
+        int segment = idx % 2;
+
+        userAttrs.put(DC_NODE_ATTR, segment);
+
+        TcpDiscoverySpi disco = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+        disco.setLocalPort(getDiscoPort(idx));
+
+        disco.setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(segmented() ?
+            (segment == 0 ? SEG_FINDER_0 : SEG_FINDER_1) : SEG_FINDER_ALL));
 
         if (idx != CONFIGLESS_GRID_IDX) {
             if (idx == RESOLVER_GRID_IDX) {
                 cfg.setClientMode(true);
 
-                cfg.setUserAttributes(F.asMap(ACTIVATOR_NODE_ATTR, "true"));
+                userAttrs.put(ACTIVATOR_NODE_ATTR, "true");
             }
             else
                 cfg.setActiveOnStart(false);
         }
+        cfg.setUserAttributes(userAttrs);
+
+        cfg.setMemoryConfiguration(new MemoryConfiguration().
+            setDefaultMemoryPolicySize((50L << 20) + (100L << 20) * CACHES_CNT
/ GRID_CNT));
 
         return cfg;
     }
@@ -129,6 +210,12 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
         stopAllGrids();
     }
 
+    /**  */
+    protected void stopGrids(int... grids) {
+        for (int idx : grids)
+            stopGrid(idx);
+    }
+
     /**
      * Tests topology split scenario.
      *
@@ -149,8 +236,8 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
 
     /**
      * Tests topology split scenario.
-     * @param useCacheGrp Use cache group.
      *
+     * @param useCacheGrp Use cache group.
      * @throws Exception If failed.
      */
     private void testTopologyValidator0(boolean useCacheGrp) throws Exception {
@@ -161,31 +248,26 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
         grid.getOrCreateCaches(getCacheConfigurations());
 
         // Init grid index arrays
-        int[] dc1 = new int[GRID_CNT / 2];
+        int[] seg1 = new int[GRID_CNT / 2];
 
-        for (int i = 0; i < dc1.length; ++i)
-            dc1[i] = i * 2 + 1;
+        for (int i = 0; i < seg1.length; ++i)
+            seg1[i] = i * 2 + 1;
 
-        int[] dc0 = new int[GRID_CNT - dc1.length];
+        int[] seg0 = new int[GRID_CNT - seg1.length];
 
-        for (int i = 0; i < dc0.length; ++i)
-            dc0[i] = i * 2;
+        for (int i = 0; i < seg0.length; ++i)
+            seg0[i] = i * 2;
 
         // Tests what each node is able to do puts.
-        tryPut(dc0);
-
-        tryPut(dc1);
+        tryPut(seg0, seg1);
 
         clearAll();
 
         // Force segmentation.
-        for (int idx : dc1)
-            stopGrid(idx);
-
-        awaitPartitionMapExchange();
+        splitAndWait();
 
         try {
-            tryPut(dc0);
+            tryPut(seg0, seg1);
 
             fail();
         }
@@ -196,24 +278,41 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
         // Repair split by adding activator node in topology.
         resolveSplit();
 
-        tryPut(dc0);
+        tryPut(seg0);
 
         clearAll();
 
+        try {
+            tryPut(seg1);
+
+            fail();
+        }
+        catch (Exception e) {
+            // No-op.
+        }
+
+        stopGrids(seg1);
+
         // Fix split by adding node from second DC.
+        unsplit();
+
         startGrid(CONFIGLESS_GRID_IDX);
 
         awaitPartitionMapExchange();
 
+        tryPut(seg0);
+
         tryPut(CONFIGLESS_GRID_IDX);
 
+        clearAll();
+
         // Force split by removing last node from second DC.
         stopGrid(CONFIGLESS_GRID_IDX);
 
         awaitPartitionMapExchange();
 
         try {
-            tryPut(dc0);
+            tryPut(seg0);
 
             fail();
         }
@@ -221,10 +320,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
             // No-op.
         }
 
+        // Repair split with concurrent server node join race.
+        resolveSplitWithRace(CONFIGLESS_GRID_IDX);
+
         // Repair split by adding activator node in topology.
         resolveSplit();
 
-        tryPut(dc0);
+        tryPut(seg0);
 
         clearAll();
 
@@ -233,9 +335,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
 
         awaitPartitionMapExchange();
 
-        for (int i = 0; i < dc0.length; i++) {
-            int idx = dc0[i];
-
+        for (int idx : seg0) {
             if (idx == 0)
                 continue;
 
@@ -249,7 +349,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
 
         awaitPartitionMapExchange();
 
-        assertEquals("Expecting put count", CACHES_CNT * dc0.length, tryPut(dc0));
+        assertEquals("Expecting put count", CACHES_CNT * seg0.length, tryPut(seg0));
     }
 
     /**
@@ -277,39 +377,132 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
     }
 
     /**
-     * @param grids Grids to test.
+     * Resolves split by client node join with server node join race simulation.
+     *
+     * @param srvNode server node index to simulate join race
+     * @throws Exception If failed.
      */
-    private int tryPut(int... grids) {
+    private void resolveSplitWithRace(int srvNode) throws Exception {
+        startGrid(RESOLVER_GRID_IDX);
+
+        startGrid(srvNode);
+
+        awaitPartitionMapExchange();
+
+        tryPut(srvNode);
+
+        clearAll();
+
+        stopGrid(srvNode);
+
+        awaitPartitionMapExchange();
+
+        try {
+            tryPut(0);
+
+            fail();
+        }
+        catch (Exception e) {
+            // No-op.
+        }
+
+        stopGrid(RESOLVER_GRID_IDX);
+    }
+
+    /**
+     * @param idx Grid to test.
+     * @return number of successful puts to caches
+     * @throws IgniteException If all tries to put was failed.
+     * @throws AssertionError If some of tries to put was failed.
+     */
+    private int tryPut(int idx) {
+        IgniteEx g = grid(idx);
+
         int putCnt = 0;
 
-        for (int i = 0; i < grids.length; i++) {
-            IgniteEx g = grid(grids[i]);
-            for (int cnt = 0; cnt < CACHES_CNT; cnt++) {
-                String cacheName = testCacheName(cnt);
+        IgniteException ex = null;
 
-                for (int k = 0; k < 100; k++) {
-                    if (g.affinity(cacheName).isPrimary(g.localNode(), k)) {
-                        IgniteCache<Object, Object> cache = g.cache(cacheName);
+        for (int cnt = 0; cnt < CACHES_CNT; cnt++) {
+            String cacheName = testCacheName(cnt);
 
-                        try {
-                            cache.put(k, k);
-                        }
-                        catch (Throwable t) {
-                            log.error("Failed to put entry: [cache=" + cacheName + ", key="
+ k + ", nodeId=" +
-                                g.name() + ']', t);
+            int key = -1;
 
-                            throw t;
-                        }
+            Affinity<Object> aff = g.affinity(cacheName);
+
+            for (int k = 0; k < aff.partitions(); k++) {
+                if (aff.isPrimary(g.cluster().localNode(), k)) {
+                    key = k;
 
-                        assertEquals(1, cache.localSize());
+                    break;
+                }
+            }
 
-                        putCnt++;
+            assertTrue("Failed to find affinity key [gridIdx=" + idx +", cache=" + cacheName
+ ']',
+                key != -1);
 
-                        break;
-                    }
+            IgniteCache<Object, Object> cache = g.cache(cacheName);
+
+            try {
+                cache.put(key, key);
+
+                assertEquals(1, cache.localSize());
+
+                if (ex != null)
+                    throw new AssertionError("Successful tryPut after failure [gridIdx="
+ idx +
+                        ", cacheName=" + cacheName + ']', ex);
+
+                putCnt++;
+            }
+            catch (Throwable t) {
+                IgniteException e = new IgniteException("Failed to put entry [cache=" + cacheName
+ ", key=" +
+                    key + ']', t);
+
+                log.error(e.getMessage(), e.getCause());
+
+                if (ex == null)
+                    ex = new IgniteException("Failed to put entry [node=" + g.name() + ']');
+
+                ex.addSuppressed(t);
+            }
+        }
+        if (ex != null)
+            throw ex;
+
+        return putCnt;
+    }
+
+    /**
+     * @param grids Grids to test.
+     * @return number of successful puts to caches
+     * @throws IgniteException If all tries to put was failed.
+     * @throws AssertionError If some of tries to put was failed.
+     */
+    private int tryPut(int[]... grids) {
+        int putCnt = 0;
+
+        IgniteException ex = null;
+
+        for (int[] idxs : grids) {
+            for (int idx : idxs) {
+                try {
+                    int cnt = tryPut(idx);
+
+                    if (ex != null)
+                        throw new AssertionError("Successful tryPut after failure [gridIdx="
+ idx +
+                            ", sucessful puts = " + cnt + ']', ex);
+
+                    putCnt += cnt;
+                }
+                catch (Exception e) {
+                    if (ex == null)
+                        ex = new IgniteException("Failed to put entry");
+
+                    ex.addSuppressed(e);
                 }
             }
         }
+        if (ex != null)
+            throw ex;
 
         return putCnt;
     }
@@ -318,20 +511,21 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
      * Prevents cache from performing any operation if only nodes from single data center
are left in topology.
      */
     private static class SplitAwareTopologyValidator implements TopologyValidator {
+
         /** */
         private static final long serialVersionUID = 0L;
 
         /** */
         @CacheNameResource
-        private String cacheName;
+        private transient String cacheName;
 
         /** */
         @IgniteInstanceResource
-        private Ignite ignite;
+        private transient Ignite ignite;
 
         /** */
         @LoggerResource
-        private IgniteLogger log;
+        private transient IgniteLogger log;
 
         /** State. */
         private transient State state;
@@ -340,12 +534,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
         @Override public boolean validate(Collection<ClusterNode> nodes) {
             initIfNeeded(nodes);
 
-            if (!F.view(nodes, new IgnitePredicate<ClusterNode>() {
+            for (ClusterNode node : F.view(nodes, new IgnitePredicate<ClusterNode>()
{
                 @Override public boolean apply(ClusterNode node) {
                     return !node.isClient() && node.attribute(DC_NODE_ATTR) == null;
                 }
-            }).isEmpty()) {
-                log.error("No valid server nodes are detected in topology: [cacheName=" +
cacheName + ']');
+            })) {
+                log.error("Not valid server nodes are detected in topology: [cacheName="
+ cacheName + ", node=" +
+                    node + ']');
 
                 return false;
             }
@@ -353,7 +548,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
             boolean segmented = segmented(nodes);
 
             if (!segmented)
-                state = State.VALID; // Also clears possible REPAIRED state.
+                state = State.VALID; // Also clears possible BEFORE_REPAIRED and REPAIRED
states.
             else {
                 if (state == State.REPAIRED) // Any topology change in segmented grid in
repaired mode is valid.
                     return true;
@@ -361,23 +556,40 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
                 // Find discovery event node.
                 ClusterNode evtNode = evtNode(nodes);
 
-                if (activator(evtNode)) {
-                    if (log.isInfoEnabled())
-                        log.info("Grid segmentation is repaired: [cacheName=" + cacheName
+ ']');
-
-                    state = State.REPAIRED;
-                }
+                if (activator(evtNode))
+                    state = State.BEFORE_REPARED;
                 else {
-                    if (state == State.VALID) {
-                        if (log.isInfoEnabled())
-                            log.info("Grid segmentation is detected: [cacheName=" + cacheName
+ ']');
+                    if (state == State.BEFORE_REPARED) {
+                        boolean activatorLeft = true;
+
+                        // Check if activator is no longer in topology.
+                        for (ClusterNode node : nodes) {
+                            if (node.isClient() && activator(node)) {
+                                activatorLeft = false;
+
+                                break;
+                            }
+                        }
+
+                        if (activatorLeft) {
+                            if (log.isInfoEnabled())
+                                log.info("Grid segmentation is repaired: [cacheName=" + cacheName
+ ']');
+
+                            state = State.REPAIRED; // Switch to REPAIRED state only when
activator leaves.
+                        } // Else stay in BEFORE_REPARED state.
                     }
+                    else {
+                        if (state == State.VALID) {
+                            if (log.isInfoEnabled())
+                                log.info("Grid segmentation is detected: [cacheName=" + cacheName
+ ']');
+                        }
 
-                    state = State.NOTVALID;
+                        state = State.NOTVALID;
+                    }
                 }
             }
 
-            return state != State.NOTVALID;
+            return state == State.VALID || state == State.REPAIRED;
         }
 
         /** */
@@ -418,7 +630,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
             // Search for activator node in history on start.
             long topVer = evtNode(nodes).order();
 
-            while(topVer > 0) {
+            while (topVer > 0) {
                 Collection<ClusterNode> top = ignite.cluster().topology(topVer--);
 
                 // Stop on reaching history limit.
@@ -460,11 +672,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
 
         /** States. */
         private enum State {
-            /** Topology valid. */
+            /** Topology is valid. */
             VALID,
-            /** Topology not valid */
+            /** Topology is not valid */
             NOTVALID,
-            /** Topology repaired (valid) */
+            /** Before topology will be repaired (valid) */
+            BEFORE_REPARED,
+            /** Topology is repaired (valid) */
             REPAIRED;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
new file mode 100644
index 0000000..196681d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Abstract class for tests over split in two half topology.
+ */
+public abstract class IgniteCacheTopologySplitAbstractTest extends GridCommonAbstractTest
{
+
+    /** Segmentation state. */
+    private volatile boolean segmented;
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setFailureDetectionTimeout(3_000L);
+
+        cfg.setDiscoverySpi(new SplitTcpDiscoverySpi());
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /**
+     * Trigger segmentation and wait for results. Should be called on stable topology.
+     *
+     * @throws InterruptedException If interrupted while waiting.
+     * @throws IgniteCheckedException On error.
+     */
+    protected void splitAndWait() throws InterruptedException, IgniteCheckedException {
+        if (log.isInfoEnabled())
+            log.info(">>> Simulating split");
+
+        long topVer = grid(0).cluster().topologyVersion();
+
+        // Trigger segmentation.
+        segmented = true;
+
+        for (Ignite ignite : G.allGrids()) {
+            TestRecordingCommunicationSpi comm = (TestRecordingCommunicationSpi)
+                ignite.configuration().getCommunicationSpi();
+
+            comm.blockMessages(new SegmentBlocker(ignite.cluster().localNode()));
+        }
+
+        Collection<Ignite> seg0 = F.view(G.allGrids(), new IgnitePredicate<Ignite>()
{
+            @Override public boolean apply(Ignite ignite) {
+                return segment(ignite.cluster().localNode()) == 0;
+            }
+        });
+
+        Collection<Ignite> seg1 = F.view(G.allGrids(), new IgnitePredicate<Ignite>()
{
+            @Override public boolean apply(Ignite ignite) {
+                return segment(ignite.cluster().localNode()) == 1;
+            }
+        });
+
+        for (Ignite grid : seg0)
+            ((IgniteKernal)grid).context().discovery().topologyFuture(topVer + seg1.size()).get();
+
+        for (Ignite grid : seg1)
+            ((IgniteKernal)grid).context().discovery().topologyFuture(topVer + seg0.size()).get();
+
+        // awaitPartitionMapExchange won't work because coordinator is wrong for second segment.
+        for (Ignite grid : G.allGrids())
+            ((IgniteKernal)grid).context().cache().context().exchange().lastTopologyFuture().get();
+
+        if (log.isInfoEnabled())
+            log.info(">>> Finished waiting for split");
+    }
+
+    /**
+     * Restore initial state
+     */
+    protected void unsplit() {
+        if (log.isInfoEnabled())
+            log.info(">>> Restoring from split");
+
+        segmented = false;
+
+        for (Ignite ignite : G.allGrids()) {
+            TestRecordingCommunicationSpi comm = (TestRecordingCommunicationSpi)
+                ignite.configuration().getCommunicationSpi();
+
+            comm.stopBlock();
+        }
+    }
+
+    /**
+     * @return Segmented status.
+     */
+    protected boolean segmented() {
+        return segmented;
+    }
+
+    /**
+     * Defines split matrix.
+     *
+     * @param locPort Local port.
+     * @param rmtPort Rmt port.
+     * @return {@code true} is link is blocked.
+     */
+    protected abstract boolean isBlocked(int locPort, int rmtPort);
+
+    /**
+     * Defines instance segment: 0 or 1.
+     *
+     * @param node Node.
+     * @return Index of instance segment.
+     */
+    protected abstract int segment(ClusterNode node);
+
+    /**
+     * Discovery SPI which can simulate network split.
+     */
+    protected class SplitTcpDiscoverySpi extends TcpDiscoverySpi {
+        /**
+         * @param sockAddr Remote socket address.
+         * @return Segmented status.
+         */
+        protected boolean segmented(InetSocketAddress sockAddr) {
+            if (!segmented)
+                return false;
+
+            int rmtPort = sockAddr.getPort();
+
+            boolean b = isBlocked(getLocalPort(), rmtPort);
+
+            if (b && log.isDebugEnabled())
+                log.debug("Block cross-segment communication [locPort=" + getLocalPort()
+ ", rmtPort=" + rmtPort + ']');
+
+            return b;
+        }
+
+        /**
+         * @param sockAddr Socket address.
+         * @param timeout Socket timeout.
+         * @throws SocketTimeoutException If segmented.
+         */
+        protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws SocketTimeoutException
{
+            if (segmented(sockAddr)) {
+                if (timeout > 0) {
+                    try {
+                        Thread.sleep(timeout);
+                    }
+                    catch (InterruptedException e) {
+                        // No-op.
+                    }
+                }
+
+                throw new SocketTimeoutException("Fake socket timeout.");
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
+            IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException
{
+            checkSegmented(remAddr, timeoutHelper.nextTimeoutChunk(getSocketTimeout()));
+
+            return super.openSocket(sock, remAddr, timeoutHelper);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(
+            Socket sock,
+            TcpDiscoveryAbstractMessage msg,
+            byte[] data,
+            long timeout
+        ) throws IOException {
+            checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
+
+            super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
+            TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(
+            Socket sock,
+            TcpDiscoveryAbstractMessage msg,
+            long timeout
+        ) throws IOException, IgniteCheckedException {
+            checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
+
+            super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock,
int res,
+            long timeout) throws IOException {
+            checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+    }
+
+    /**  */
+    protected class SegmentBlocker implements IgniteBiPredicate<ClusterNode, Message>
{
+        /**  */
+        private final ClusterNode locNode;
+
+        /**
+         * @param locNode Local node.
+         */
+        SegmentBlocker(ClusterNode locNode) {
+            assert locNode != null;
+
+            this.locNode = locNode;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node, Message message) {
+            return segment(locNode) != segment(node);
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message