ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [45/46] ignite git commit: 1093
Date Fri, 02 Oct 2015 10:58:59 GMT
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0f7c32c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0f7c32c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0f7c32c

Branch: refs/heads/ignite-1093-2
Commit: f0f7c32caa1a9566c77d0a66bb533a4a07a2338a
Parents: 9abfc60
Author: Anton Vinogradov <av@apache.org>
Authored: Tue Sep 29 18:20:01 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Tue Sep 29 18:20:01 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java |   2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java | 196 +++++++++++++------
 .../GridCacheRebalancingSyncSelfTest.java       |  30 ++-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  17 --
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java  |  46 +++++
 .../testframework/junits/GridAbstractTest.java  |   3 +-
 6 files changed, 194 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 5d4db40..d1d475c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -818,7 +818,7 @@ public class GridDhtPartitionDemander {
                         @Override public void apply(IgniteInternalFuture<Long> future)
{
                             SyncFuture.this.cancel();
                         }
-                    });
+                    }); // todo: is it necessary?
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 81e2fa4..b5bb25d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -24,7 +24,6 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -41,15 +40,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T4;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jsr166.ConcurrentHashMap8;
 
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
@@ -71,8 +67,8 @@ class GridDhtPartitionSupplier {
     /** Preload predicate. */
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
-    /** Supply context map. T4: nodeId, idx, topologyVersion, updateSequence. */
-    private final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, Long>,
SupplyContext> scMap =
+    /** Supply context map. T2: nodeId, idx. */
+    private final ConcurrentHashMap8<T2<UUID, Integer>, SupplyContext> scMap
=
         new ConcurrentHashMap8<>();
 
     /** Rebalancing listener. */
@@ -100,26 +96,18 @@ class GridDhtPartitionSupplier {
         lsnr = new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
                 if (evt instanceof DiscoveryEvent) {
-                    for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, Long>,
SupplyContext> entry : scMap.entrySet()) {
-                        T4<UUID, Integer, AffinityTopologyVersion, Long> t = entry.getKey();
+                    for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry :
scMap.entrySet()) {
+                        T2<UUID, Integer> t = entry.getKey();
 
-                        SupplyContext sc = entry.getValue();
+                        if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) {
+                            SupplyContext sctx = entry.getValue();
 
-                        if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion())
&& sc != null)
-                            clearContext(scMap, t, sc, log);
-                    }
-                }
-                else if (evt instanceof CacheRebalancingEvent) {
-                    CacheRebalancingEvent e = (CacheRebalancingEvent)evt;
+                            clearContext(sctx, log);
 
-                    if (cctx.name().equals(e.cacheName())) {
-                        UUID id = e.discoveryNode().id();
+                            U.log(log, "Supply context removed for failed node [node=" +
t.get1() + "]");
 
-                        for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, Long>,
SupplyContext> entry : scMap.entrySet()) {
-                            if (id.equals(entry.getKey().get1()))
-                                clearContext(scMap, entry.getKey(), entry.getValue(), log);
+                            scMap.remove(t, sctx);
                         }
-
                     }
                 }
                 else {
@@ -128,7 +116,7 @@ class GridDhtPartitionSupplier {
             }
         };
 
-        cctx.events().addListener(lsnr, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
+        cctx.events().addListener(lsnr, EVT_NODE_FAILED);
 
         startOldListeners();
     }
@@ -145,32 +133,38 @@ class GridDhtPartitionSupplier {
     /**
      * Clear context.
      *
-     * @param map Context map.
-     * @param t id.
      * @param sc Supply context.
      * @param log Logger.
      * @return true in case context was removed.
      */
-    private static boolean clearContext(
-        final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, Long>,
SupplyContext> map,
-        final T4<UUID, Integer, AffinityTopologyVersion, Long> t,
+    private static void clearContext(
         final SupplyContext sc,
         final IgniteLogger log) {
-        final Iterator it = sc.entryIt;
+        if (sc != null) {
+            final Iterator it = sc.entryIt;
 
-        if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed())
{
-            try {
-                synchronized (it) {
-                    if (!((GridCloseableIterator)it).isClosed())
-                        ((GridCloseableIterator)it).close();
+            if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed())
{
+                try {
+                    synchronized (it) {
+                        if (!((GridCloseableIterator)it).isClosed())
+                            ((GridCloseableIterator)it).close();
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Iterator close failed.", e);
                 }
             }
-            catch (IgniteCheckedException e) {
-                log.error("Iterator close failed.", e);
+
+            final GridDhtLocalPartition loc = sc.loc;
+
+            if (loc != null && loc.reservations() > 0) {
+                synchronized (loc) {
+                    if (loc.reservations() > 0)
+                        loc.release();
+                }
+
             }
         }
-
-        return map.remove(t, sc);
     }
 
     /**
@@ -199,10 +193,16 @@ class GridDhtPartitionSupplier {
 
         ClusterNode node = cctx.discovery().node(id);
 
-        T4<UUID, Integer, AffinityTopologyVersion, Long> scId = new T4<>(id,
idx, d.topologyVersion(), d.updateSequence());
+        T2<UUID, Integer> scId = new T2<>(id, idx);
 
         try {
-            SupplyContext sctx = scMap.get(scId);
+            SupplyContext sctx = scMap.remove(scId);
+
+            if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence()
!= sctx.updateSeq)) {
+                clearContext(sctx, log);
+
+                sctx = null;
+            }
 
             if (sctx == null && d.partitions() == null)
                 return;
@@ -230,18 +230,27 @@ class GridDhtPartitionSupplier {
 
                 newReq = false;
 
-                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(),
false);
+                GridDhtLocalPartition loc;
 
-                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
-                    // Reply with partition of "-1" to let sender know that
-                    // this node is no longer an owner.
-                    s.missed(part);
+                if (sctx != null && sctx.loc != null) {
+                    loc = sctx.loc;
 
-                    if (log.isDebugEnabled())
-                        log.debug("Requested partition is not owned by local node [part="
+ part +
-                            ", demander=" + id + ']');
+                    assert loc.reservations() > 0;
+                }
+                else {
+                    loc = top.localPartition(part, d.topologyVersion(), false);
 
-                    continue;
+                    if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                        // Reply with partition of "-1" to let sender know that
+                        // this node is no longer an owner.
+                        s.missed(part);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Requested partition is not owned by local node [part="
+ part +
+                                ", demander=" + id + ']');
+
+                        continue;
+                    }
                 }
 
                 GridCacheEntryInfoCollectSwapListener swapLsnr = null;
@@ -279,9 +288,18 @@ class GridDhtPartitionSupplier {
 
                             if (s.messageSize() >= cctx.config().getRebalanceBatchSize())
{
                                 if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+                                    saveSupplyContext(scId,
+                                        phase,
+                                        partIt,
+                                        part,
+                                        entIt,
+                                        swapLsnr,
+                                        loc,
+                                        d.topologyVersion(),
+                                        d.updateSequence());
 
                                     swapLsnr = null;
+                                    loc = null;
 
                                     reply(node, d, s);
 
@@ -323,7 +341,10 @@ class GridDhtPartitionSupplier {
                                 partIt,
                                 null,
                                 swapLsnr,
-                                part);
+                                part,
+                                loc,
+                                d.topologyVersion(),
+                                d.updateSequence());
                         }
                     }
 
@@ -354,9 +375,18 @@ class GridDhtPartitionSupplier {
 
                                 if (s.messageSize() >= cctx.config().getRebalanceBatchSize())
{
                                     if (++bCnt >= maxBatchesCnt) {
-                                        saveSupplyContext(scId, phase, partIt, part, iter,
swapLsnr);
+                                        saveSupplyContext(scId,
+                                            phase,
+                                            partIt,
+                                            part,
+                                            iter,
+                                            swapLsnr,
+                                            loc,
+                                            d.topologyVersion(),
+                                            d.updateSequence());
 
                                         swapLsnr = null;
+                                        loc = null;
 
                                         reply(node, d, s);
 
@@ -437,7 +467,10 @@ class GridDhtPartitionSupplier {
                                 partIt,
                                 null,
                                 null,
-                                part);
+                                part,
+                                loc,
+                                d.topologyVersion(),
+                                d.updateSequence());
                         }
                     }
 
@@ -465,9 +498,17 @@ class GridDhtPartitionSupplier {
 
                             if (s.messageSize() >= cctx.config().getRebalanceBatchSize())
{
                                 if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt,
swapLsnr);
-
-                                    swapLsnr = null;
+                                    saveSupplyContext(scId,
+                                        phase,
+                                        partIt,
+                                        part,
+                                        lsnrIt,
+                                        swapLsnr,
+                                        loc,
+                                        d.topologyVersion(),
+                                        d.updateSequence());
+
+                                    loc = null;
 
                                     reply(node, d, s);
 
@@ -500,7 +541,8 @@ class GridDhtPartitionSupplier {
                     sctx = null;
                 }
                 finally {
-                    loc.release();
+                    if (loc != null)
+                        loc.release();
 
                     if (swapLsnr != null) {
                         cctx.swap().removeOffHeapListener(part, swapLsnr);
@@ -561,12 +603,24 @@ class GridDhtPartitionSupplier {
      * @param swapLsnr Swap listener.
      */
     private void saveSupplyContext(
-        T4 t,
+        T2<UUID, Integer> t,
         int phase,
         Iterator<Integer> partIt,
         int part,
-        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
-        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
+        GridDhtLocalPartition loc,
+        AffinityTopologyVersion topVer,
+        long updateSeq) {
+        SupplyContext old = scMap.putIfAbsent(t, new SupplyContext(phase,
+            partIt,
+            entryIt,
+            swapLsnr,
+            part,
+            loc,
+            topVer,
+            updateSeq));
+
+        assert old == null;
     }
 
     /**
@@ -588,6 +642,15 @@ class GridDhtPartitionSupplier {
         /** Partition. */
         private final int part;
 
+        /** Local partition. */
+        GridDhtLocalPartition loc;
+
+        /** Topology version. */
+        AffinityTopologyVersion topVer;
+
+        /** Update seq. */
+        long updateSeq;
+
         /**
          * @param phase Phase.
          * @param partIt Partition iterator.
@@ -595,13 +658,22 @@ class GridDhtPartitionSupplier {
          * @param swapLsnr Swap listener.
          * @param part Partition.
          */
-        public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?>
entryIt,
-            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+        public SupplyContext(int phase,
+            Iterator<Integer> partIt,
+            Iterator<?> entryIt,
+            GridCacheEntryInfoCollectSwapListener swapLsnr,
+            int part,
+            GridDhtLocalPartition loc,
+            AffinityTopologyVersion topVer,
+            long updateSeq) {
             this.phase = phase;
             this.partIt = partIt;
             this.entryIt = entryIt;
             this.swapLsnr = swapLsnr;
             this.part = part;
+            this.loc = loc;
+            this.topVer = topVer;
+            this.updateSeq = updateSeq;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index be8e24b..bb40f31 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -59,18 +60,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
 
     /** */
-    private volatile boolean concurrentStartFinished = false;
+    private volatile boolean concurrentStartFinished;
 
     /** */
-    private volatile boolean concurrentStartFinished2 = false;
-
-    private volatile FailableTcpDiscoverySpi spi;
-
-    public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi {
-        public void fail() {
-            simulateNodeFailure();
-        }
-    }
+    private volatile boolean concurrentStartFinished2;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
@@ -78,11 +71,6 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         iCfg.setRebalanceThreadPoolSize(4);
 
-        iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
-
-        if (getTestGridName(20).equals(gridName))
-            spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
-
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
 
@@ -96,7 +84,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cachePCfg.setBackups(1);
         cachePCfg.setRebalanceBatchSize(1);
-        cachePCfg.setRebalanceBatchesCount(1);
+        //cachePCfg.setRebalanceBatchesCount(1);
+        cachePCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
 
         CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
 
@@ -285,6 +274,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         long start = System.currentTimeMillis();
 
+        concurrentStartFinished = false;
+        concurrentStartFinished2 = false;
+
         Thread t1 = new Thread() {
             @Override public void run() {
                 try {
@@ -409,11 +401,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         waitForRebalancing(1, 2);
 
-        startGrid(20);
+        startGrid(2);
 
-        waitForRebalancing(20, 3);
+        waitForRebalancing(2, 3);
 
-        spi.fail();
+        ((TestTcpDiscoverySpi)grid(2).configuration().getDiscoverySpi()).simulateNodeFailure();
 
         waitForRebalancing(0, 4);
         waitForRebalancing(1, 4);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0280e9c..51d8a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -511,23 +511,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
-     */
-    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
-        /** */
-        private boolean ignorePingResponse;
-
-        /** {@inheritDoc} */
-        protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException,
-            IgniteCheckedException {
-            if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
-                return;
-            else
-                super.writeToSocket(sock, msg, timeout);
-        }
-    }
-
-    /**
      * @throws Exception If any error occurs.
      */
     public void testNodeAdded() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
new file mode 100644
index 0000000..dbc54bc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+
+/**
+ *
+ */
+public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+    /** */
+    public boolean ignorePingResponse;
+
+    /** {@inheritDoc} */
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException,
+        IgniteCheckedException {
+        if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
+            return;
+        else
+            super.writeToSocket(sock, msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void simulateNodeFailure() {
+        super.simulateNodeFailure();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index f54fe06..546549b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -76,6 +76,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -1227,7 +1228,7 @@ public abstract class GridAbstractTest extends TestCase {
 
         cfg.setCommunicationSpi(commSpi);
 
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
 
         if (isDebug()) {
             discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);


Mime
View raw message