ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [09/54] [abbrv] ignite git commit: IGNITE-7222 Added ZooKeeper discovery SPI
Date Fri, 13 Apr 2018 09:33:21 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
new file mode 100644
index 0000000..c42fa57
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ * Tcp Communication Connection Check Future.
+ */
+public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener {
+    /** Session future. */
+    public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** */
+    private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done");
+
+    /** */
+    private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater =
+        AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt");
+
+    /** */
+    private final AtomicInteger resCntr = new AtomicInteger();
+
+    /** */
+    private final List<ClusterNode> nodes;
+
+    /** */
+    private volatile ConnectFuture[] futs;
+
+    /** */
+    private final GridNioServer nioSrvr;
+
+    /** */
+    private final TcpCommunicationSpi spi;
+
+    /** */
+    private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid();
+
+    /** */
+    private final BitSet resBitSet;
+
+    /** */
+    private long endTime;
+
+    /** */
+    private final IgniteLogger log;
+
+    /**
+     * @param spi SPI instance.
+     * @param log Logger.
+     * @param nioSrvr NIO server.
+     * @param nodes Nodes to check.
+     */
+    public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi,
+        IgniteLogger log,
+        GridNioServer nioSrvr,
+        List<ClusterNode> nodes)
+    {
+        this.spi = spi;
+        this.log = log;
+        this.nioSrvr = nioSrvr;
+        this.nodes = nodes;
+
+        resBitSet = new BitSet(nodes.size());
+    }
+
+    /**
+     * @param timeout Connect timeout.
+     */
+    public void init(long timeout) {
+        ConnectFuture[] futs = new ConnectFuture[nodes.size()];
+
+        UUID locId = spi.getSpiContext().localNode().id();
+
+        for (int i = 0; i < nodes.size(); i++) {
+            ClusterNode node = nodes.get(i);
+
+            if (!node.id().equals(locId)) {
+                if (spi.getSpiContext().node(node.id()) == null) {
+                    receivedConnectionStatus(i, false);
+
+                    continue;
+                }
+
+                Collection<InetSocketAddress> addrs;
+
+                try {
+                    addrs = spi.nodeAddresses(node, false);
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to get node addresses: " + node, e);
+
+                    receivedConnectionStatus(i, false);
+
+                    continue;
+                }
+
+                if (addrs.size() == 1) {
+                    SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i);
+
+                    fut.init(addrs.iterator().next(), node.id());
+
+                    futs[i] = fut;
+                }
+                else {
+                    MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i);
+
+                    fut.init(addrs, node.id());
+
+                    futs[i] = fut;
+                }
+            }
+            else
+                receivedConnectionStatus(i, true);
+        }
+
+        this.futs = futs;
+
+        spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        if (!isDone()) {
+            endTime = System.currentTimeMillis() + timeout;
+
+            spi.getSpiContext().addTimeoutObject(this);
+        }
+    }
+
+    /**
+     * @param idx Node index.
+     * @param res Success flag.
+     */
+    private void receivedConnectionStatus(int idx, boolean res) {
+        assert resCntr.get() < nodes.size();
+
+        synchronized (resBitSet) {
+            resBitSet.set(idx, res);
+        }
+
+        if (resCntr.incrementAndGet() == nodes.size())
+            onDone(resBitSet);
+    }
+
+    /**
+     * @param nodeIdx Node index.
+     * @return Node ID.
+     */
+    private UUID nodeId(int nodeIdx) {
+        return nodes.get(nodeIdx).id();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return timeoutObjId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return endTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onEvent(Event evt) {
+        if (isDone())
+            return;
+
+        assert evt instanceof DiscoveryEvent : evt;
+        assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
+
+        UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+        for (int i = 0; i < nodes.size(); i++) {
+            if (nodes.get(i).id().equals(nodeId)) {
+                ConnectFuture fut = futs[i];
+
+                if (fut != null)
+                    fut.onNodeFailed();
+
+                return;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (isDone())
+            return;
+
+        ConnectFuture[] futs = this.futs;
+
+        for (int i = 0; i < futs.length; i++) {
+            ConnectFuture fut = futs[i];
+
+            if (fut != null)
+                fut.onTimeout();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) {
+        if (super.onDone(res, err)) {
+            spi.getSpiContext().removeTimeoutObject(this);
+
+            spi.getSpiContext().removeLocalEventListener(this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     *
+     */
+    private interface ConnectFuture {
+        /**
+         *
+         */
+        void onTimeout();
+
+        /**
+         *
+         */
+        void onNodeFailed();
+    }
+
+    /**
+     *
+     */
+    private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture {
+        /** */
+        final int nodeIdx;
+
+        /** */
+        volatile int done;
+
+        /** */
+        Map<Integer, Object> sesMeta;
+
+        /** */
+        private SocketChannel ch;
+
+        /**
+         * @param nodeIdx Node index.
+         */
+        SingleAddressConnectFuture(int nodeIdx) {
+            this.nodeIdx = nodeIdx;
+        }
+
+        /**
+         * @param addr Node address.
+         * @param rmtNodeId Id of node to open connection check session with.
+         */
+        public void init(InetSocketAddress addr, UUID rmtNodeId) {
+            boolean connect;
+
+            try {
+                ch = SocketChannel.open();
+
+                ch.configureBlocking(false);
+
+                ch.socket().setTcpNoDelay(true);
+                ch.socket().setKeepAlive(false);
+
+                connect = ch.connect(addr);
+            }
+            catch (Exception e) {
+                finish(false);
+
+                return;
+            }
+
+            if (!connect) {
+                sesMeta = new GridLeanMap<>(3);
+
+                // Set dummy key to identify connection-check outgoing connection.
+                sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, new ConnectionKey(rmtNodeId, -1, -1, true));
+                sesMeta.put(SES_FUT_META, this);
+
+                nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+                    @Override public void apply(IgniteInternalFuture<GridNioSession> fut) {
+                        if (fut.error() != null)
+                            finish(false);
+                    }
+                });
+            }
+        }
+
+        /**
+         *
+         */
+        @SuppressWarnings("unchecked")
+        void cancel() {
+            if (finish(false))
+                nioSrvr.cancelConnect(ch, sesMeta);
+        }
+
+        /** {@inheritDoc} */
+        public void onTimeout() {
+            cancel();
+        }
+
+        /** {@inheritDoc} */
+        public void onConnected(UUID rmtNodeId) {
+            finish(nodeId(nodeIdx).equals(rmtNodeId));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onNodeFailed() {
+            cancel();
+        }
+
+        /**
+         * @param res Result.
+         * @return {@code True} if result was set by this call.
+         */
+        public boolean finish(boolean res) {
+            if (connFutDoneUpdater.compareAndSet(this, 0, 1)) {
+                onStatusReceived(res);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @param res Result.
+         */
+        void onStatusReceived(boolean res) {
+            receivedConnectionStatus(nodeIdx, res);
+        }
+    }
+
+    /**
+     *
+     */
+    private class MultipleAddressesConnectFuture implements ConnectFuture {
+        /** */
+        volatile int resCnt;
+
+        /** */
+        volatile SingleAddressConnectFuture[] futs;
+
+        /** */
+        final int nodeIdx;
+
+        /**
+         * @param nodeIdx Node index.
+         */
+        MultipleAddressesConnectFuture(int nodeIdx) {
+            this.nodeIdx = nodeIdx;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onNodeFailed() {
+            SingleAddressConnectFuture[] futs = this.futs;
+
+            for (int i = 0; i < futs.length; i++) {
+                ConnectFuture fut = futs[i];
+
+                if (fut != null)
+                    fut.onNodeFailed();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            SingleAddressConnectFuture[] futs = this.futs;
+
+            for (int i = 0; i < futs.length; i++) {
+                ConnectFuture fut = futs[i];
+
+                if (fut != null)
+                    fut.onTimeout();
+            }
+        }
+
+        /**
+         * @param addrs Node addresses.
+         * @param rmtNodeId Id of node to open connection check session with.
+         */
+        void init(Collection<InetSocketAddress> addrs, UUID rmtNodeId) {
+            SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()];
+
+            for (int i = 0; i < addrs.size(); i++) {
+                SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) {
+                    @Override void onStatusReceived(boolean res) {
+                        receivedAddressStatus(res);
+                    }
+                };
+
+                futs[i] = fut;
+            }
+
+            this.futs = futs;
+
+            int idx = 0;
+
+            for (InetSocketAddress addr : addrs) {
+                futs[idx++].init(addr, rmtNodeId);
+
+                if (resCnt == Integer.MAX_VALUE)
+                    return;
+            }
+
+            // Close race.
+            if (done())
+                cancelFutures();
+        }
+
+        /**
+         * @return {@code True}
+         */
+        private boolean done() {
+            int resCnt0 = resCnt;
+
+            return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length;
+        }
+
+        /**
+         *
+         */
+        private void cancelFutures() {
+            SingleAddressConnectFuture[] futs = this.futs;
+
+            if (futs != null) {
+                for (int i = 0; i < futs.length; i++) {
+                    SingleAddressConnectFuture fut = futs[i];
+
+                    fut.cancel();
+                }
+            }
+        }
+
+        /**
+         * @param res Result.
+         */
+        void receivedAddressStatus(boolean res) {
+            if (res) {
+                for (;;) {
+                    int resCnt0 = resCnt;
+
+                    if (resCnt0 == Integer.MAX_VALUE)
+                        return;
+
+                    if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) {
+                        receivedConnectionStatus(nodeIdx, true);
+
+                        cancelFutures(); // Cancel others connects if they are still in progress.
+
+                        return;
+                    }
+                }
+            }
+            else {
+                for (;;) {
+                    int resCnt0 = resCnt;
+
+                    if (resCnt0 == Integer.MAX_VALUE)
+                        return;
+
+                    int resCnt1 = resCnt0 + 1;
+
+                    if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) {
+                        if (resCnt1 == futs.length)
+                            receivedConnectionStatus(nodeIdx, false);
+
+                        return;
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
new file mode 100644
index 0000000..cbf27b5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.util.UUID;
+
+/**
+ * Tcp Communication Node Connection Check Future.
+ */
+public interface TcpCommunicationNodeConnectionCheckFuture {
+    /**
+     * @param nodeId Remote node ID.
+     */
+    public void onConnected(UUID nodeId);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index a0f9b75..f26ad33 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -30,12 +30,23 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface DiscoverySpiCustomMessage extends Serializable {
     /**
-     * Called when message passed the ring.
+     * Called when custom message has been handled by all nodes.
+     *
+     * @return Ack message or {@code null} if ack is not required.
      */
     @Nullable public DiscoverySpiCustomMessage ackMessage();
 
     /**
-     * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+     * @return {@code True} if message can be modified during listener notification. Changes will be send to next nodes.
      */
     public boolean isMutable();
+
+    /**
+     * Called on discovery coordinator node after listener is notified. If returns {@code true}
+     * then message is not passed to others nodes, if after this method {@link #ackMessage()} returns non-null ack
+     * message, it is sent to all nodes.
+     *
+     * @return {@code True} if message should not be sent to all nodes.
+     */
+    public boolean stopProcess();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
new file mode 100644
index 0000000..37aa323
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation is for all implementations of {@link DiscoverySpi} that support
+ * topology mutable {@link DiscoverySpiCustomMessage}s.
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface DiscoverySpiMutableCustomMessageSupport {
+    /**
+     * @return Whether or not target SPI supports mutable {@link DiscoverySpiCustomMessage}s.
+     */
+    public boolean value();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 2d9a314..f0a5186 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -55,6 +55,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -88,6 +90,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
 import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
@@ -103,6 +106,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessag
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
@@ -223,7 +227,8 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 @IgniteSpiMultipleInstancesSupport(true)
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
-public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
+@DiscoverySpiMutableCustomMessageSupport(true)
+public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscoverySpi {
     /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
 
@@ -409,6 +414,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
     /** */
     protected IgniteSpiContext spiCtx;
 
+    /** */
+    private IgniteDiscoverySpiInternalListener internalLsnr;
+
     /**
      * Gets current SPI state.
      *
@@ -473,6 +481,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
 
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+        IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr;
+
+        if (internalLsnr != null) {
+            if (!internalLsnr.beforeSendCustomEvent(this, log, msg))
+                return;
+        }
+
         impl.sendCustomEvent(msg);
     }
 
@@ -1559,6 +1574,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
         OutputStream out,
         TcpDiscoveryAbstractMessage msg,
         long timeout) throws IOException, IgniteCheckedException {
+        if (internalLsnr != null && msg instanceof TcpDiscoveryJoinRequestMessage)
+            internalLsnr.beforeJoin(locNode, log);
+
         assert sock != null;
         assert msg != null;
         assert out != null;
@@ -2118,15 +2136,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
         return ignite().configuration().getSslContextFactory() != null;
     }
 
-    /**
-     * Force reconnect to cluster.
-     *
-     * @throws IgniteSpiException If failed.
-     */
-    public void reconnect() throws IgniteSpiException {
+    /** {@inheritDoc} */
+    public void clientReconnect() throws IgniteSpiException {
         impl.reconnect();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean knownNode(UUID nodeId) {
+        return getNode0(nodeId) != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean clientReconnectSupported() {
+        return !clientReconnectDisabled;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean supportsCommunicationFailureResolve() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
@@ -2148,6 +2182,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
         sndMsgLsnrs.add(lsnr);
     }
 
+    /** {@inheritDoc} */
+    @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
+        this.internalLsnr = lsnr;
+    }
+
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
@@ -2185,7 +2224,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
      * <p>
      * This method is intended for test purposes only.
      */
-    protected void simulateNodeFailure() {
+    public void simulateNodeFailure() {
         impl.simulateNodeFailure();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 01534f7..55fe4e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -33,9 +33,9 @@ import java.util.UUID;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -58,7 +58,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTE
  * <strong>This class is not intended for public use</strong> and has been made
  * <tt>public</tt> due to certain limitations of Java technology.
  */
-public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode,
+public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements IgniteClusterNode,
     Comparable<TcpDiscoveryNode>, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
@@ -291,26 +291,14 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
         return metrics;
     }
 
-    /**
-     * Sets node metrics.
-     *
-     * @param metrics Node metrics.
-     */
+    /** {@inheritDoc} */
     public void setMetrics(ClusterMetrics metrics) {
         assert metrics != null;
 
         this.metrics = metrics;
     }
 
-    /**
-     * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated
-     * and provide up to date information about caches.
-     * <p>
-     * Cache metrics are updated with some delay which is directly related to metrics update
-     * frequency. For example, by default the update will happen every {@code 2} seconds.
-     *
-     * @return Runtime metrics snapshots for this node.
-     */
+    /** {@inheritDoc} */
     public Map<Integer, CacheMetrics> cacheMetrics() {
         if (metricsProvider != null) {
             Map<Integer, CacheMetrics> cacheMetrics0 = metricsProvider.cacheMetrics();
@@ -323,11 +311,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
         return cacheMetrics;
     }
 
-    /**
-     * Sets node cache metrics.
-     *
-     * @param cacheMetrics Cache metrics.
-     */
+    /** {@inheritDoc} */
     public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) {
         this.cacheMetrics = cacheMetrics != null ? cacheMetrics : Collections.<Integer, CacheMetrics>emptyMap();
     }
@@ -544,11 +528,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
         return node;
     }
 
-    /**
-     * Whether this node is cache client (see {@link IgniteConfiguration#isClientMode()}).
-     *
-     * @return {@code True if client}.
-     */
+    /** {@inheritDoc} */
     public boolean isCacheClient() {
         if (!cacheCliInit) {
             cacheCli = CU.clientNodeDirect(this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index f0f143d..6dc3d85 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -832,6 +832,7 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$1
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$2
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments
+org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap2
 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap
@@ -1129,6 +1130,7 @@ org.apache.ignite.internal.processors.closure.GridClosureProcessor$T8
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T9
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$TaskNoReduceAdapter
 org.apache.ignite.internal.processors.closure.GridPeerDeployAwareTaskAdapter
+org.apache.ignite.internal.processors.cluster.ClusterNodeMetrics
 org.apache.ignite.internal.processors.cluster.BaselineTopology
 org.apache.ignite.internal.processors.cluster.BaselineTopologyHistory
 org.apache.ignite.internal.processors.cluster.BaselineTopologyHistoryItem

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
index 900d4f5..eee47c7 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -124,12 +123,9 @@ public abstract class AffinityFunctionExcludeNeighborsAbstractSelfTest extends G
 
                 Affinity<Object> aff = g.affinity(DEFAULT_CACHE_NAME);
 
-                List<TcpDiscoveryNode> top = new ArrayList<>();
+                List<ClusterNode> top = new ArrayList<>(g.cluster().nodes());
 
-                for (ClusterNode node : g.cluster().nodes())
-                    top.add((TcpDiscoveryNode) node);
-
-                Collections.sort(top);
+                Collections.sort((List)top);
 
                 assertEquals(grids, top.size());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
index 4e4d75a..5eca7d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
@@ -120,6 +120,10 @@ public class FailureHandlerTriggeredTest extends GridCommonAbstractTest {
                 @Override public boolean isMutable() {
                     return false;
                 }
+
+                @Override public boolean stopProcess() {
+                    return false;
+                }
             });
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java
index 2328c84..141f4af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java
@@ -61,6 +61,9 @@ public class ClusterGroupHostsSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testForHosts() throws Exception {
+        if (!tcpDiscovery())
+            return;
+
         Ignite ignite = grid();
 
         assertEquals(1, ignite.cluster().forHost("h_1").nodes().size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
index 9df561a..99006d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
@@ -68,6 +68,8 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
                 if (i == 0)
                     ignite = g;
             }
+
+            waitForTopology(NODES_CNT);
         }
         finally {
             Ignition.setClientMode(false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
new file mode 100644
index 0000000..6e6b4a4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import junit.framework.AssertionFailedError;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setMetricsUpdateFrequency(500);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMetrics() throws Exception {
+        int NODES = 6;
+
+        Ignite srv0 = startGridsMultiThreaded(NODES / 2);
+
+        client = true;
+
+        startGridsMultiThreaded(NODES / 2, NODES / 2);
+
+        Map<UUID, Integer> expJobs = new HashMap<>();
+
+        for (int i = 0; i < NODES; i++)
+            expJobs.put(nodeId(i), 0);
+
+        checkMetrics(NODES, expJobs);
+
+        for (int i = 0; i < NODES; i++) {
+            UUID nodeId = nodeId(i);
+
+            IgniteCompute c = srv0.compute(srv0.cluster().forNodeId(nodeId(i)));
+
+            c.call(new DummyCallable(null));
+
+            expJobs.put(nodeId, 1);
+        }
+    }
+
+    /**
+     * @param expNodes Expected nodes.
+     * @param expJobs Expected jobs number per node.
+     */
+    private void checkMetrics0(int expNodes, Map<UUID, Integer> expJobs) {
+        List<Ignite> nodes = Ignition.allGrids();
+
+        assertEquals(expNodes, nodes.size());
+        assertEquals(expNodes, expJobs.size());
+
+        int totalJobs = 0;
+
+        for (Integer c : expJobs.values())
+            totalJobs += c;
+
+        for (final Ignite ignite : nodes) {
+            ClusterMetrics m = ignite.cluster().metrics();
+
+            assertEquals(expNodes, m.getTotalNodes());
+            assertEquals(totalJobs, m.getTotalExecutedJobs());
+
+            for (Map.Entry<UUID, Integer> e : expJobs.entrySet()) {
+                UUID nodeId = e.getKey();
+
+                ClusterGroup g = ignite.cluster().forNodeId(nodeId);
+
+                ClusterMetrics nodeM = g.metrics();
+
+                assertEquals(e.getValue(), (Integer)nodeM.getTotalExecutedJobs());
+            }
+        }
+    }
+
+    /**
+     * @param expNodes Expected nodes.
+     * @param expJobs Expected jobs number per node.
+     * @throws Exception If failed.
+     */
+    private void checkMetrics(final int expNodes, final Map<UUID, Integer> expJobs) throws Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    checkMetrics0(expNodes, expJobs);
+                }
+                catch (AssertionFailedError e) {
+                    return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        checkMetrics0(expNodes, expJobs);
+    }
+
+    /**
+     *
+     */
+    private static class DummyCallable implements IgniteCallable<Object> {
+        /** */
+        private byte[] data;
+
+        /**
+         * @param data Data.
+         */
+        DummyCallable(byte[] data) {
+            this.data = data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return data;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
new file mode 100644
index 0000000..46d9edc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test callback for discovery SPI.
+ * <p>
+ * Allows block/delay node join and custom event sending.
+ */
+public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListener {
+    /** */
+    private volatile CountDownLatch joinLatch;
+
+    /** */
+    private Set<Class<?>> blockCustomEvtCls;
+
+    /** */
+    private final Object mux = new Object();
+
+    /** */
+    private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>();
+
+    /** */
+    private volatile DiscoverySpi spi;
+
+    /** */
+    private volatile IgniteLogger log;
+
+    /**
+     *
+     */
+    public void startBlockJoin() {
+        joinLatch = new CountDownLatch(1);
+    }
+
+    /**
+     *
+     */
+    public void stopBlockJoin() {
+        joinLatch.countDown();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) {
+        try {
+            CountDownLatch writeLatch0 = joinLatch;
+
+            if (writeLatch0 != null) {
+                log.info("Block join");
+
+                U.await(writeLatch0);
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) {
+        this.spi = spi;
+        this.log = log;
+
+        synchronized (mux) {
+            if (blockCustomEvtCls != null) {
+                DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate");
+
+                if (blockCustomEvtCls.contains(msg0.getClass())) {
+                    log.info("Block custom message: " + msg0);
+
+                    blockedMsgs.add(msg);
+
+                    mux.notifyAll();
+
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * @param blockCustomEvtCls Event class to block.
+     */
+    public void blockCustomEvent(Class<?> cls0, Class<?> ... blockCustomEvtCls) {
+        synchronized (mux) {
+            assert blockedMsgs.isEmpty() : blockedMsgs;
+
+            this.blockCustomEvtCls = new HashSet<>();
+
+            this.blockCustomEvtCls.add(cls0);
+
+            Collections.addAll(this.blockCustomEvtCls, blockCustomEvtCls);
+        }
+    }
+
+    /**
+     * @throws InterruptedException If interrupted.
+     */
+    public void waitCustomEvent() throws InterruptedException {
+        synchronized (mux) {
+            while (blockedMsgs.isEmpty())
+                mux.wait();
+        }
+    }
+
+    /**
+     *
+     */
+    public void stopBlockCustomEvents() {
+        if (spi == null)
+            return;
+
+        List<DiscoverySpiCustomMessage> msgs;
+
+        synchronized (this) {
+            msgs = new ArrayList<>(blockedMsgs);
+
+            blockCustomEvtCls = null;
+
+            blockedMsgs.clear();
+        }
+
+        for (DiscoverySpiCustomMessage msg : msgs) {
+            log.info("Resend blocked message: " + msg);
+
+            spi.sendCustomEvent(msg);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
index e6b678b..883d677 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.lang.IgniteProductVersion.fromString;
@@ -158,10 +159,10 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
 
         final AtomicInteger cnt = new AtomicInteger();
 
-        /** Joined nodes counter. */
+        // Joined nodes counter.
         final CountDownLatch joinedCnt = new CountDownLatch(NODES_CNT);
 
-        /** Left nodes counter. */
+        // Left nodes counter.
         final CountDownLatch leftCnt = new CountDownLatch(NODES_CNT);
 
         IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() {
@@ -171,7 +172,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
 
                     joinedCnt.countDown();
                 }
-                else if (EVT_NODE_LEFT == evt.type()) {
+                else if (EVT_NODE_LEFT == evt.type() || EVT_NODE_FAILED == evt.type()) {
                     int i = cnt.decrementAndGet();
 
                     assert i >= 0;
@@ -185,7 +186,10 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
             }
         };
 
-        ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_JOINED);
+        int[] evts = tcpDiscovery() ? new int[]{EVT_NODE_LEFT, EVT_NODE_JOINED} :
+            new int[]{EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED};
+
+        ignite.events().localListen(lsnr, evts);
 
         try {
             for (int i = 0; i < NODES_CNT; i++)
@@ -242,6 +246,8 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest {
             for (int i = 0; i < NODES_CNT; i++)
                 stopGrid(i);
 
+            waitForTopology(1);
+
             final long topVer = discoMgr.topologyVersion();
 
             assert topVer == topVer0 + NODES_CNT * 2 : "Unexpected topology version: " + topVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
index cd6b2c0..a8be541 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
@@ -259,6 +259,8 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
         // Now we stop master grid.
         stopGrid(lastGridIdx, true);
 
+        waitForTopology(GRID_CNT - 1);
+
         // Release communication SPI wait latches. As master node is stopped, job worker will receive and exception.
         for (int i = 0; i < lastGridIdx; i++)
             ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).releaseWaitLatch();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
index f3a19aa..6824d51 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
@@ -187,6 +187,8 @@ public class GridJobStealingSelfTest extends GridCommonAbstractTest {
     public void testProjectionPredicateInternalStealing() throws Exception {
         final Ignite ignite3 = startGrid(3);
 
+        waitForTopology(3);
+
         final UUID node1 = ignite1.cluster().localNode().id();
         final UUID node3 = ignite3.cluster().localNode().id();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
index 66e9cf4..a04c38e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -75,8 +76,10 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
 
             ignite2.events().localListen(new IgnitePredicate<Event>() {
                 @Override public boolean apply(Event evt) {
-                    assert evt.type() != EVT_NODE_FAILED :
-                        "Node1 did not exit gracefully.";
+                    boolean tcpDiscovery = tcpDiscovery();
+
+                    if (tcpDiscovery)
+                        assert evt.type() != EVT_NODE_FAILED : "Node1 did not exit gracefully.";
 
                     if (evt instanceof DiscoveryEvent) {
                         // Local node can send METRICS_UPDATED event.
@@ -86,8 +89,14 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
                                 ((DiscoveryEvent) evt).eventNode().id() + ", expected=" + grid1LocNodeId +
                                 ", type=" + evt.type() + ']';
 
-                        if (evt.type() == EVT_NODE_LEFT)
-                            latch.countDown();
+                        if (tcpDiscovery) {
+                            if (evt.type() == EVT_NODE_LEFT)
+                                latch.countDown();
+                        }
+                        else {
+                            if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
+                                latch.countDown();
+                        }
                     }
 
                     return true;
@@ -96,7 +105,7 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest {
 
             stopGrid(1);
 
-            latch.await();
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
 
             Collection<ClusterNode> top2 = ignite2.cluster().forRemotes().nodes();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
index 7e368cb..f71ffb0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
@@ -45,6 +45,8 @@ public class GridSelfTest extends ClusterGroupAbstractTest {
 
         for (int i = 0; i < NODES_CNT; i++)
             startGrid(i);
+
+        waitForTopology(NODES_CNT);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index fa9cc35..e68ea13 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
@@ -143,6 +145,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
 
     /**
      * @param ignite Node.
+     * @return Discovery SPI.
+     */
+    protected static IgniteDiscoverySpi spi0(Ignite ignite) {
+        return ((IgniteDiscoverySpi)ignite.configuration().getDiscoverySpi());
+    }
+
+    /**
+     * @param ignite Node.
      * @return Communication SPI.
      */
     protected BlockTcpCommunicationSpi commSpi(Ignite ignite) {
@@ -185,16 +195,28 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
      * @return Server node client connected to.
      */
     protected Ignite clientRouter(Ignite client) {
-        TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
+        if (tcpDiscovery()) {
+            TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
+
+            assertTrue(node.isClient());
+            assertNotNull(node.clientRouterNodeId());
 
-        assertTrue(node.isClient());
-        assertNotNull(node.clientRouterNodeId());
+            Ignite srv = G.ignite(node.clientRouterNodeId());
 
-        Ignite srv = G.ignite(node.clientRouterNodeId());
+            assertNotNull(srv);
+
+            return srv;
+        }
+        else {
+            for (Ignite node : G.allGrids()) {
+                if (!node.cluster().localNode().isClient())
+                    return node;
+            }
 
-        assertNotNull(srv);
+            fail();
 
-        return srv;
+            return null;
+        }
     }
 
     /**
@@ -251,15 +273,24 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         List<Ignite> clients, Ignite srv,
         @Nullable Runnable disconnectedC)
         throws Exception {
-        final TestTcpDiscoverySpi srvSpi = spi(srv);
+        final IgniteDiscoverySpi srvSpi = spi0(srv);
 
         final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
         final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
 
         log.info("Block reconnect.");
 
-        for (Ignite client : clients)
-            spi(client).writeLatch = new CountDownLatch(1);
+        List<DiscoverySpiTestListener> blockLsnrs = new ArrayList<>();
+
+        for (Ignite client : clients) {
+            DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+            lsnr.startBlockJoin();
+
+            blockLsnrs.add(lsnr);
+
+            spi0(client).setInternalListener(lsnr);
+        }
 
         IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
             @Override public boolean apply(Event evt) {
@@ -291,8 +322,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
 
         log.info("Allow reconnect.");
 
-        for (Ignite client : clients)
-            spi(client).writeLatch.countDown();
+        for (DiscoverySpiTestListener blockLsnr : blockLsnrs)
+            blockLsnr.stopBlockJoin();
 
         waitReconnectEvent(log, reconnectLatch);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
index 06bde99..43da2d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -51,6 +52,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -99,7 +101,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    public void dataStructureOperationsTest() throws Exception {
+    private void dataStructureOperationsTest() throws Exception {
         clientMode = true;
 
         final Ignite client = startGrid(serverCount());
@@ -219,7 +221,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    public void cacheOperationsTest() throws Exception {
+    private void cacheOperationsTest() throws Exception {
         clientMode = true;
 
         final Ignite client = startGrid(serverCount());
@@ -537,7 +539,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    public void igniteOperationsTest() throws Exception {
+    private void igniteOperationsTest() throws Exception {
         clientMode = true;
 
         final Ignite client = startGrid(serverCount());
@@ -775,11 +777,11 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
         throws Exception {
         assertNotNull(client.cache(DEFAULT_CACHE_NAME));
 
-        final TestTcpDiscoverySpi clientSpi = spi(client);
+        final IgniteDiscoverySpi clientSpi = spi0(client);
 
         Ignite srv = clientRouter(client);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        DiscoverySpi srvSpi = spi0(srv);
 
         final CountDownLatch disconnectLatch = new CountDownLatch(1);
 
@@ -787,7 +789,10 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
 
         log.info("Block reconnect.");
 
-        clientSpi.writeLatch = new CountDownLatch(1);
+        DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+        clientSpi.setInternalListener(lsnr);
+        lsnr.startBlockJoin();
 
         final List<IgniteInternalFuture> futs = new ArrayList<>();
 
@@ -832,7 +837,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
 
             log.info("Allow reconnect.");
 
-            clientSpi.writeLatch.countDown();
+            lsnr.stopBlockJoin();
 
             waitReconnectEvent(reconnectLatch);
 
@@ -857,7 +862,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect
             }
         }
         finally {
-            clientSpi.writeLatch.countDown();
+            lsnr.stopBlockJoin();
 
             for (IgniteInternalFuture fut : futs)
                 fut.cancel();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 00daf5f..d1e3ade 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -111,7 +111,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true);
 
@@ -144,7 +144,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true);
 
@@ -192,7 +192,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         BlockTcpCommunicationSpi commSpi = commSpi(srv);
 
@@ -253,7 +253,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true);
 
@@ -294,7 +294,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final IgniteAtomicReference<String> clientAtomicRef =
             client.atomicReference("atomicRefRemoved", "1st value", true);
@@ -347,7 +347,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final IgniteAtomicReference<String> clientAtomicRef =
             client.atomicReference("atomicRefInProg", "1st value", true);
@@ -414,7 +414,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true);
 
@@ -455,7 +455,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true);
 
@@ -506,7 +506,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true);
 
@@ -574,7 +574,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true);
 
@@ -605,7 +605,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
 
@@ -646,7 +646,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         BlockTcpCommunicationSpi commSpi = commSpi(srv);
 
@@ -701,7 +701,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true);
 
@@ -742,7 +742,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         IgniteSemaphore clientSemaphore = client.semaphore("semaphore1", 3, false, true);
 
@@ -789,7 +789,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         IgniteLock clientLock = client.reentrantLock("lock1", true, fair, true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 518e674..3cb82e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -67,6 +68,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
@@ -155,11 +157,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         IgniteEx client = startGrid(SRV_CNT);
 
-        final TestTcpDiscoverySpi clientSpi = spi(client);
+        final IgniteDiscoverySpi clientSpi = spi0(client);
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi();
 
         final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
 
@@ -188,7 +190,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         log.info("Block reconnect.");
 
-        clientSpi.writeLatch = new CountDownLatch(1);
+        DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+        clientSpi.setInternalListener(lsnr);
+
+        lsnr.startBlockJoin();
 
         final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>();
 
@@ -254,7 +260,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         log.info("Allow reconnect.");
 
-        clientSpi.writeLatch.countDown();
+        lsnr.stopBlockJoin();
 
         assertTrue(reconnectLatch.await(5000, MILLISECONDS));
 
@@ -319,7 +325,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         IgniteEx client = startGrid(SRV_CNT);
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
@@ -412,17 +418,21 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         final TransactionConcurrency txConcurrency,
         final IgniteCache<Object, Object> cache)
         throws Exception {
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
-        final TestTcpDiscoverySpi clientSpi = spi(client);
-        final TestTcpDiscoverySpi srvSpi = spi(srv);
+        final IgniteDiscoverySpi clientSpi = spi0(client);
+        final DiscoverySpi srvSpi = spi0(srv);
 
         final CountDownLatch disconnectLatch = new CountDownLatch(1);
         final CountDownLatch reconnectLatch = new CountDownLatch(1);
 
         log.info("Block reconnect.");
 
-        clientSpi.writeLatch = new CountDownLatch(1);
+        DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
+
+        clientSpi.setInternalListener(lsnr);
+
+        lsnr.startBlockJoin();
 
         client.events().localListen(new IgnitePredicate<Event>() {
             @Override public boolean apply(Event evt) {
@@ -530,7 +540,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         assertTrue(putFailed.await(5000, MILLISECONDS));
 
-        clientSpi.writeLatch.countDown();
+        lsnr.stopBlockJoin();
 
         waitReconnectEvent(reconnectLatch);
 
@@ -604,9 +614,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         IgniteEx client = startGrid(SRV_CNT);
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        DiscoverySpi srvSpi = spi0(srv);
 
         TestCommunicationSpi coordCommSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
 
@@ -691,7 +701,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
                 try {
-                    Ignition.start(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT))));
+                    startGrid(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT))));
 
                     // Commented due to IGNITE-4473, because
                     // IgniteClientDisconnectedException won't
@@ -722,7 +732,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
             }
         });
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        DiscoverySpi srvSpi = spi0(srv);
 
         try {
             if (!joinLatch.await(10_000, MILLISECONDS)) {
@@ -1256,30 +1266,35 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
      *
      */
     static class TestClass1 implements Serializable {
+        // No-op.
     }
 
     /**
      *
      */
     static class TestClass2 implements Serializable {
+        // No-op.
     }
 
     /**
      *
      */
     static class TestClass3 implements Serializable {
+        // No-op.
     }
 
     /**
      *
      */
     static class TestClass4 implements Serializable {
+        // No-op.
     }
 
     /**
      *
      */
     static class TestClass5 implements Serializable {
+        // No-op.
     }
 
     /**
@@ -1294,11 +1309,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         Class<?> msgToBlock,
         final IgniteInClosure<IgniteCache<Object, Object>> c)
         throws Exception {
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final UUID id = client.localNode().id();
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        DiscoverySpi srvSpi = spi0(srv);
 
         final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index 3f0e33d..5be59b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -180,7 +180,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
     private void serverNodeReconnect(CollectionConfiguration colCfg) throws Exception {
         final Ignite client = grid(serverCount());
 
-        final Ignite srv = clientRouter(client);
+        final Ignite srv = ignite(0);
 
         assertNotNull(srv.queue("q", 0, colCfg));
         assertNotNull(srv.set("s", colCfg));
@@ -201,7 +201,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final String setName = "set-" + colCfg.getAtomicityMode();
 
@@ -235,7 +235,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         assertTrue(client.cluster().localNode().isClient());
 
-        final Ignite srv = clientRouter(client);
+        final Ignite srv = ignite(0);
 
         final String setName = "set-rm-" + colCfg.getAtomicityMode();
 
@@ -281,7 +281,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         assertTrue(client.cluster().localNode().isClient());
 
-        final Ignite srv = clientRouter(client);
+        final Ignite srv = ignite(0);
 
         final String setName = "set-in-progress-" + colCfg.getAtomicityMode();
 
@@ -347,7 +347,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final String setName = "queue-" + colCfg.getAtomicityMode();
 
@@ -379,7 +379,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final String setName = "queue-rmv" + colCfg.getAtomicityMode();
 
@@ -423,7 +423,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         final String setName = "queue-rmv" + colCfg.getAtomicityMode();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
index cce0c7e..57d3188 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
@@ -49,7 +49,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         IgniteCache<Integer, Integer> cache = client.getOrCreateCache("test-cache");
 
@@ -103,7 +103,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         BlockTcpCommunicationSpi commSpi = commSpi(srv);
 
@@ -152,7 +152,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
         BlockTcpCommunicationSpi commSpi = commSpi(srv);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index ca0d889..d68fc1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -61,9 +62,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        IgniteDiscoverySpi srvSpi = spi0(srv);
 
         EventListener lsnr = new EventListener();
 
@@ -133,9 +134,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
 
         assertTrue(client.cluster().localNode().isClient());
 
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        IgniteDiscoverySpi srvSpi = spi0(srv);
 
         final String topic = "testTopic";
 
@@ -309,9 +310,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
         CacheEventListener lsnr)
         throws Exception
     {
-        Ignite srv = clientRouter(client);
+        Ignite srv = ignite(0);
 
-        TestTcpDiscoverySpi srvSpi = spi(srv);
+        IgniteDiscoverySpi srvSpi = spi0(srv);
 
         final CountDownLatch reconnectLatch = new CountDownLatch(1);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
index c071ee2..6e77742 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
@@ -64,20 +65,23 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne
         nodeCnt.put(1, 1);
         nodeCnt.put(2, 2);
         nodeCnt.put(3, 3);
-        nodeCnt.put(4, 4);
 
-        for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
-            Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+        if (tcpDiscovery()) {
+            nodeCnt.put(4, 4);
 
-            assertNotNull("No nodes for topology: " + e.getKey(), nodes);
-            assertEquals((int)e.getValue(), nodes.size());
+            for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {
+                Collection<ClusterNode> nodes = cluster.topology(e.getKey());
+
+                assertNotNull("No nodes for topology: " + e.getKey(), nodes);
+                assertEquals((int)e.getValue(), nodes.size());
+            }
         }
 
         ClusterNode locNode = cluster.localNode();
 
         assertEquals(topVer, locNode.order());
 
-        TestTcpDiscoverySpi srvSpi = spi(clientRouter(client));
+        DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi();
 
         final CountDownLatch reconnectLatch = new CountDownLatch(1);
 
@@ -112,7 +116,11 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne
         assertEquals(topVer, locNode.order());
         assertEquals(topVer, cluster.topologyVersion());
 
-        nodeCnt.put(5, 3);
+        if (tcpDiscovery())
+            nodeCnt.put(5, 3);
+        else
+            nodeCnt.clear();
+
         nodeCnt.put(6, 4);
 
         for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {


Mime
View raw message