ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject incubator-ignite git commit: # IGNITE-709 Revert ping() improvement.
Date Wed, 13 May 2015 16:36:11 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-709_0 [created] 4c021e3dd


# IGNITE-709 Revert ping() improvement.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4c021e3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4c021e3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4c021e3d

Branch: refs/heads/ignite-709_0
Commit: 4c021e3dd2c541c644b9664aa48efcd2e56e33e0
Parents: ebc1a98
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Wed May 13 19:35:49 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Wed May 13 19:35:49 2015 +0300

----------------------------------------------------------------------
 .../internal/util/future/SettableFuture.java    |  94 --------
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  77 +------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 228 +++----------------
 .../messages/TcpDiscoveryClientPingRequest.java |  56 -----
 .../TcpDiscoveryClientPingResponse.java         |  67 ------
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  92 ++------
 6 files changed, 49 insertions(+), 565 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c021e3d/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java
deleted file mode 100644
index 7fe094d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.future;
-
-import java.util.concurrent.*;
-
-/**
- * Simple implementation of {@link Future}
- */
-public class SettableFuture<T> implements Future<T> {
-    /** */
-    private final CountDownLatch latch = new CountDownLatch(1);
-
-    /** Result of computation. */
-    private T res;
-
-    /** Exception threw during the computation. */
-    private ExecutionException err;
-
-    /** {@inheritDoc} */
-    @Override public boolean cancel(boolean mayInterruptIfRunning) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isCancelled() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isDone() {
-        return latch.getCount() == 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public T get() throws InterruptedException, ExecutionException {
-        latch.await();
-
-        if (err != null)
-            throw err;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
-        TimeoutException {
-
-        if (!latch.await(timeout, unit))
-            throw new TimeoutException();
-
-        if (err != null)
-            throw err;
-
-        return res;
-    }
-
-    /**
-     * Computation is done successful.
-     *
-     * @param res Result of computation.
-     */
-    public void set(T res) {
-        this.res = res;
-
-        latch.countDown();
-    }
-
-    /**
-     * Computation failed.
-     *
-     * @param throwable Error.
-     */
-    public void setException(Throwable throwable) {
-        err = new ExecutionException(throwable);
-
-        latch.countDown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c021e3d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 3c46515..5d13298 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -20,7 +20,6 @@ package org.apache.ignite.spi.discovery.tcp;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -76,9 +75,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
 
-    /** Remote nodes. */
-    private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts =
new ConcurrentHashMap8<>();
-
     /** Socket writer. */
     private SocketWriter sockWriter;
 
@@ -320,9 +316,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
             }
         }
 
-        for (GridFutureAdapter<Boolean> fut : pingFuts.values())
-            fut.onDone(false);
-
         rmtNodes.clear();
 
         U.interrupt(sockTimeoutWorker);
@@ -366,51 +359,15 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
     }
 
     /** {@inheritDoc} */
-    @Override public boolean pingNode(@NotNull final UUID nodeId) {
+    @Override public boolean pingNode(UUID nodeId) {
+        assert nodeId != null;
+
         if (nodeId.equals(getLocalNodeId()))
             return true;
 
         TcpDiscoveryNode node = rmtNodes.get(nodeId);
 
-        if (node == null || !node.visible())
-            return false;
-
-        GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
-
-        if (fut == null) {
-            fut = new GridFutureAdapter<>();
-
-            GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);
-
-            if (oldFut != null)
-                fut = oldFut;
-            else {
-                if (getSpiContext().isStopping()) {
-                    if (pingFuts.remove(nodeId, fut))
-                        fut.onDone(false);
-
-                    return false;
-                }
-
-                final GridFutureAdapter<Boolean> finalFut = fut;
-
-                timer.schedule(new TimerTask() {
-                    @Override public void run() {
-                        if (pingFuts.remove(nodeId, finalFut))
-                            finalFut.onDone(false);
-                    }
-                }, netTimeout);
-
-                sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(),
nodeId));
-            }
-        }
-
-        try {
-            return fut.get();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSpiException(e); // Should newer occur
-        }
+        return node != null && node.visible();
     }
 
     /** {@inheritDoc} */
@@ -597,8 +554,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
         U.join(sockWriter, log);
         U.join(msgWorker, log);
         U.join(sockTimeoutWorker, log);
-
-        timer.cancel();
     }
 
     /**
@@ -1112,10 +1067,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
                 processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
             else if (msg instanceof TcpDiscoveryCustomEventMessage)
                 processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
-            else if (msg instanceof TcpDiscoveryClientPingResponse)
-                processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
-            else if (msg instanceof TcpDiscoveryPingRequest)
-                processPingRequest((TcpDiscoveryPingRequest)msg);
 
             stats.onMessageProcessingFinished(msg);
         }
@@ -1291,7 +1242,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
                         leaveLatch.countDown();
                     }
                 }
-
                 return;
             }
 
@@ -1414,25 +1364,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements
Tcp
         }
 
         /**
-         * @param msg Message.
-         */
-        private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
-            GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
-
-            if (fut != null)
-                fut.onDone(msg.result());
-        }
-
-        /**
-         * Router want to ping this client.
-         *
-         * @param msg Message.
-         */
-        private void processPingRequest(TcpDiscoveryPingRequest msg) {
-            sockWriter.sendMessage(new TcpDiscoveryPingResponse(getLocalNodeId()));
-        }
-
-        /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
          * @param cacheMetrics Cache metrics.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c021e3d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index e4f0ba6..3624791 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -51,7 +51,6 @@ import java.net.*;
 import java.text.*;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.IgniteNodeAttributes.*;
@@ -204,10 +203,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private int reconCnt = DFLT_RECONNECT_CNT;
 
-    /** */
-    private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>());
-
     /** Nodes ring. */
     @GridToStringExclude
     private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
@@ -290,10 +285,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
     private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>>
sendMsgLsnrs =
         new CopyOnWriteArrayList<>();
 
-    /** */
-    private final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs
=
-        new CopyOnWriteArrayList<>();
-
     /** {@inheritDoc} */
     @IgniteInstanceResource
     @Override public void injectResources(Ignite ignite) {
@@ -1068,7 +1059,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
         assert nodeId != null;
 
         if (log.isDebugEnabled())
-            log.debug("Pinging node: " + nodeId + "].");
+            log.debug("Ping node. NodeId: [" + nodeId + "].");
 
         if (nodeId == getLocalNodeId())
             return true;
@@ -1132,10 +1123,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
     }
 
     /**
-     * Pings the node by its address to see if it's alive.
+     * Pings the remote node by its address to see if it's alive.
      *
      * @param addr Address of the node.
-     * @return ID of the remote node and "client exists" flag if node alive.
+     * @return ID of the remote node if node alive.
      * @throws IgniteSpiException If an error occurs.
      */
     private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable
UUID clientNodeId)
@@ -1144,28 +1135,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
 
         UUID locNodeId = getLocalNodeId();
 
-        if (F.contains(locNodeAddrs, addr)) {
-            if (clientNodeId == null)
-                return F.t(getLocalNodeId(), false);
-
-            ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId);
-
-            if (clientWorker == null)
-                return F.t(getLocalNodeId(), false);
-
-            boolean clientPingRes;
-
-            try {
-                clientPingRes = clientWorker.ping();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteInterruptedCheckedException(e);
-            }
-
-            return F.t(getLocalNodeId(), clientPingRes);
-        }
+        if (F.contains(locNodeAddrs, addr))
+            return F.t(getLocalNodeId(), clientNodeId != null && clientMsgWorkers.containsKey(clientNodeId));
 
         GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
 
@@ -2063,29 +2034,15 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
-    public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage>
lsnr) {
-        sendMsgLsnrs.add(lsnr);
-    }
-
-    /**
-     * <strong>FOR TEST ONLY!!!</strong>
-     */
-    public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage>
lsnr) {
-        sendMsgLsnrs.remove(lsnr);
+    public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage>
msg) {
+        sendMsgLsnrs.add(msg);
     }
 
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      */
-    public void addIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
-        incomeConnLsnrs.add(lsnr);
-    }
-
-    /**
-     * <strong>FOR TEST ONLY!!!</strong>
-     */
-    public void removeIncomeConnectionListener(IgniteInClosure<Socket> lsnr) {
-        incomeConnLsnrs.remove(lsnr);
+    public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage>
msg) {
+        sendMsgLsnrs.remove(msg);
     }
 
     /**
@@ -2677,12 +2634,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
             else if (msg instanceof TcpDiscoveryCustomEventMessage)
                 processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
 
-            else if (msg instanceof TcpDiscoveryClientPingRequest)
-                processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
-
-            else if (msg instanceof TcpDiscoveryPingResponse)
-                processPingResponse((TcpDiscoveryPingResponse)msg);
-
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
@@ -2704,28 +2655,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                 msgLsnr.apply(msg);
 
             if (redirectToClients(msg)) {
-                byte[] marshalledMsg = null;
-
-                for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
-                    // Send a clone to client to avoid ConcurrentModificationException
-                    TcpDiscoveryAbstractMessage msgClone;
-
-                    try {
-                        if (marshalledMsg == null)
-                            marshalledMsg = marsh.marshal(msg);
-
-                        msgClone = marsh.unmarshal(marshalledMsg, null);
-
-                        clientMsgWorker.addMessage(msgClone);
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to marshal message: " + msg, e);
-
-                        msgClone = msg;
-                    }
-
-                    clientMsgWorker.addMessage(msgClone);
-                }
+                for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values())
+                    clientMsgWorker.addMessage(msg);
             }
 
             Collection<TcpDiscoveryNode> failedNodes;
@@ -4517,42 +4448,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
         /**
          * @param msg Message.
          */
-        private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
-            utilityPool.execute(new Runnable() {
-                @Override public void run() {
-                    boolean res = pingNode(msg.nodeToPing());
-
-                    final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());
-
-                    if (worker == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Ping request from dead client node, will be skipped:
" + msg.creatorNodeId());
-                    }
-                    else {
-                        TcpDiscoveryClientPingResponse pingRes = new TcpDiscoveryClientPingResponse(
-                            getLocalNodeId(), msg.nodeToPing(), res);
-
-                        pingRes.verify(getLocalNodeId());
-
-                        worker.addMessage(pingRes);
-                    }
-                }
-            });
-        }
-
-        /**
-         * @param msg Message.
-         */
-        private void processPingResponse(final TcpDiscoveryPingResponse msg) {
-            ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
-
-            if (clientWorker != null)
-                clientWorker.pingResult(true);
-        }
-
-        /**
-         * @param msg Message.
-         */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
             if (isLocalNodeCoordinator()) {
                 if (msg.verified()) {
@@ -4714,6 +4609,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
         /** */
         private volatile UUID nodeId;
 
+        /** */
+        private volatile boolean client;
+
         /**
          * Constructor.
          *
@@ -4733,8 +4631,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
         @Override protected void body() throws InterruptedException {
             UUID locNodeId = getLocalNodeId();
 
-            ClientMessageWorker clientMsgWrk = null;
-
             try {
                 InputStream in;
 
@@ -4747,9 +4643,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
 
                     sock.setSoTimeout((int)netTimeout);
 
-                    for (IgniteInClosure<Socket> connLsnr : incomeConnLsnrs)
-                        connLsnr.apply(sock);
-
                     in = new BufferedInputStream(sock.getInputStream());
 
                     byte[] buf = new byte[4];
@@ -4799,12 +4692,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
 
                             TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
 
-                            if (req.clientNodeId() != null) {
-                                ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
-
-                                if (clientWorker != null)
-                                    res.clientExists(clientWorker.ping());
-                            }
+                            if (req.clientNodeId() != null)
+                                res.clientExists(clientMsgWorkers.containsKey(req.clientNodeId()));
 
                             writeToSocket(sock, res);
                         }
@@ -4818,8 +4707,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                     TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
 
                     UUID nodeId = req.creatorNodeId();
+                    boolean client = req.client();
 
                     this.nodeId = nodeId;
+                    this.client = client;
 
                     TcpDiscoveryHandshakeResponse res =
                         new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
@@ -4829,7 +4720,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                     // It can happen if a remote node is stopped and it has a loopback address
in the list of addresses,
                     // the local node sends a handshake request message on the loopback address,
so we get here.
                     if (locNodeId.equals(nodeId)) {
-                        assert !req.client();
+                        assert !client;
 
                         if (log.isDebugEnabled())
                             log.debug("Handshake request from local node: " + req);
@@ -4837,12 +4728,12 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                         return;
                     }
 
-                    if (req.client()) {
+                    if (client) {
                         if (log.isDebugEnabled())
                             log.debug("Created client message worker [locNodeId=" + locNodeId
+
                                 ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
 
-                        clientMsgWrk = new ClientMessageWorker(sock, nodeId);
+                        ClientMessageWorker clientMsgWrk = new ClientMessageWorker(sock,
nodeId);
 
                         clientMsgWrk.start();
 
@@ -4851,11 +4742,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
 
                     if (log.isDebugEnabled())
                         log.debug("Initialized connection with remote node [nodeId=" + nodeId
+
-                            ", client=" + req.client() + ']');
+                            ", client=" + client + ']');
 
                     if (debugMode)
                         debugLog("Initialized connection with remote node [nodeId=" + nodeId
+
-                            ", client=" + req.client() + ']');
+                            ", client=" + client + ']');
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())
@@ -4936,7 +4827,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                             if (!req.responded()) {
                                 boolean ok = processJoinRequestMessage(req);
 
-                                if (clientMsgWrk != null && ok)
+                                if (client && ok)
                                     continue;
                                 else
                                     // Direct join request - no need to handle this socket
anymore.
@@ -4944,7 +4835,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                             }
                         }
                         else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
-                            if (clientMsgWrk != null) {
+                            if (client) {
                                 TcpDiscoverySpiState state = spiStateCopy();
 
                                 if (state == CONNECTED) {
@@ -5081,7 +4972,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                         msgWorker.addMessage(msg);
 
                         // Send receipt back.
-                        if (clientMsgWrk == null)
+                        if (!client)
                             writeToSocket(sock, RES_OK);
                     }
                     catch (IgniteCheckedException e) {
@@ -5135,14 +5026,12 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                 }
             }
             finally {
-                if (clientMsgWrk != null) {
+                if (client) {
                     if (log.isDebugEnabled())
                         log.debug("Client connection failed [sock=" + sock + ", locNodeId="
+ locNodeId +
                             ", rmtNodeId=" + nodeId + ']');
 
-                    clientMsgWorkers.remove(nodeId, clientMsgWrk);
-
-                    U.interrupt(clientMsgWrk);
+                    U.interrupt(clientMsgWorkers.remove(nodeId));
                 }
 
                 U.closeQuiet(sock);
@@ -5295,9 +5184,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
         /** Current client metrics. */
         private volatile ClusterMetrics metrics;
 
-        /** */
-        private final AtomicReference<SettableFuture<Boolean>> pingFut = new
AtomicReference<>();
-
         /**
          * @param sock Socket.
          * @param nodeId Node ID.
@@ -5360,72 +5246,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
                 onException("Client connection failed [sock=" + sock + ", locNodeId="
                     + getLocalNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']',
e);
 
-                clientMsgWorkers.remove(nodeId, this);
-
-                U.interrupt(this);
+                U.interrupt(clientMsgWorkers.remove(nodeId));
 
                 U.closeQuiet(sock);
             }
         }
 
-        /**
-         *
-         */
-        public void pingResult(boolean res) {
-            SettableFuture<Boolean> fut = pingFut.getAndSet(null);
-
-            if (fut != null)
-                fut.set(res);
-        }
-
-        /**
-         *
-         */
-        public boolean ping() throws InterruptedException {
-            if (isNodeStopping())
-                return false;
-
-            SettableFuture<Boolean> fut;
-
-            while (true) {
-                fut = pingFut.get();
-
-                if (fut != null)
-                    break;
-
-                fut = new SettableFuture<>();
-
-                if (pingFut.compareAndSet(null, fut)) {
-                    TcpDiscoveryPingRequest pingReq = new TcpDiscoveryPingRequest(getLocalNodeId(),
nodeId);
-
-                    pingReq.verify(getLocalNodeId());
-
-                    addMessage(pingReq);
-
-                    break;
-                }
-            }
-
-            try {
-                return fut.get(ackTimeout, TimeUnit.MILLISECONDS);
-            }
-            catch (ExecutionException e) {
-                throw new IgniteSpiException("Internal error: ping future cannot be done
with exception", e);
-            }
-            catch (TimeoutException ignored) {
-                if (pingFut.compareAndSet(fut, null))
-                    fut.set(false);
-
-                return false;
-            }
-        }
-
         /** {@inheritDoc} */
         @Override protected void cleanup() {
             super.cleanup();
 
-            pingResult(false);
-
             U.closeQuiet(sock);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c021e3d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
deleted file mode 100644
index f9f164d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp.messages;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Ping request.
- */
-public class TcpDiscoveryClientPingRequest extends TcpDiscoveryAbstractMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Pinged client node ID. */
-    private final UUID nodeToPing;
-
-    /**
-     * @param creatorNodeId Creator node ID.
-     * @param nodeToPing Pinged client node ID.
-     */
-    public TcpDiscoveryClientPingRequest(UUID creatorNodeId, @Nullable UUID nodeToPing) {
-        super(creatorNodeId);
-
-        this.nodeToPing = nodeToPing;
-    }
-
-    /**
-     * @return Pinged client node ID.
-     */
-    @Nullable public UUID nodeToPing() {
-        return nodeToPing;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(TcpDiscoveryClientPingRequest.class, this, "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c021e3d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
deleted file mode 100644
index 26a2b00..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp.messages;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Ping request.
- */
-public class TcpDiscoveryClientPingResponse extends TcpDiscoveryAbstractMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Pinged client node ID. */
-    private final UUID nodeToPing;
-
-    /** */
-    private final boolean res;
-
-    /**
-     * @param creatorNodeId Creator node ID.
-     * @param nodeToPing Pinged client node ID.
-     */
-    public TcpDiscoveryClientPingResponse(UUID creatorNodeId, @Nullable UUID nodeToPing,
boolean res) {
-        super(creatorNodeId);
-
-        this.nodeToPing = nodeToPing;
-        this.res = res;
-    }
-
-    /**
-     * @return Pinged client node ID.
-     */
-    @Nullable public UUID nodeToPing() {
-        return nodeToPing;
-    }
-
-    /**
-     * @return Result of ping.
-     */
-    public boolean result() {
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(TcpDiscoveryClientPingResponse.class, this, "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c021e3d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 1268a23..fe4f80f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -128,7 +128,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
                 if (addr.startsWith("/"))
                     addr = addr.substring(1);
 
-                ipFinder.setAddresses(Collections.singletonList(addr));
+                ipFinder.setAddresses(Arrays.asList(addr));
             }
 
             disco.setIpFinder(ipFinder);
@@ -246,22 +246,22 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testClientNodeFail() throws Exception {
-        startServerNodes(3);
-        startClientNodes(3);
+        startServerNodes(1);
+        startClientNodes(2);
 
-        checkNodes(3, 3);
+        checkNodes(1, 2);
 
-        srvFailedLatch = new CountDownLatch(3);
-        clientFailedLatch = new CountDownLatch(2);
+        srvFailedLatch = new CountDownLatch(1);
+        clientFailedLatch = new CountDownLatch(1);
 
-        attachListeners(3, 3);
+        attachListeners(1, 1);
 
-        failClient(2);
+        failClient(1);
 
         await(srvFailedLatch);
         await(clientFailedLatch);
 
-        checkNodes(3, 2);
+        checkNodes(1, 1);
     }
 
     /**
@@ -353,60 +353,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
-    public void testPingFailedNodeFromClient() throws Exception {
-        startServerNodes(2);
-        startClientNodes(1);
-
-        Ignite srv0 = G.ignite("server-0");
-        Ignite srv1 = G.ignite("server-1");
-        Ignite client = G.ignite("client-0");
-
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new
IgniteInClosure<Socket>() {
-            @Override public void apply(Socket sock) {
-                try {
-                    latch.await();
-                }
-                catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-
-        assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
-        assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
-
-        latch.countDown();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPingFailedClientNode() throws Exception {
-        startServerNodes(2);
-        startClientNodes(1);
-
-        Ignite srv0 = G.ignite("server-0");
-        Ignite srv1 = G.ignite("server-1");
-        Ignite client = G.ignite("client-0");
-
-        ((TcpDiscoverySpiAdapter)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
-
-        ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).pauseSocketWrite();
-
-        assert !((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
-        assert !((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
-
-        ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).resumeAll();
-
-        assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
-        assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testClientReconnectOnRouterFail() throws Exception {
         clientsPerSrv = 1;
 
@@ -485,7 +431,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
 
         clientLeftLatch = new CountDownLatch(1);
 
-        ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll();
+        ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume();
 
         await(clientLeftLatch);
 
@@ -503,6 +449,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
 
         checkNodes(3, 3);
 
+//        setClientRouter(2, 2);
+
         srvFailedLatch = new CountDownLatch(2 + 2);
         clientFailedLatch = new CountDownLatch(2 + 2);
 
@@ -774,7 +722,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         long startTime = -1;
 
         for (Ignite g : G.allGrids()) {
-            IgniteEx kernal = (IgniteEx)g;
+            IgniteEx kernal = (IgniteKernal)g;
 
             assertTrue(kernal.context().discovery().gridStartTime() > 0);
 
@@ -800,7 +748,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         if (addr.startsWith("/"))
             addr = addr.substring(1);
 
-        ipFinder.setAddresses(Collections.singletonList(addr));
+        ipFinder.setAddresses(Arrays.asList(addr));
     }
 
     /**
@@ -990,7 +938,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
     /**
      */
     private static class MessageListener implements IgniteBiPredicate<UUID, Object>
{
-        /** */
         @IgniteInstanceResource
         private Ignite ignite;
 
@@ -1065,14 +1012,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         /**
          *
          */
-        public void pauseSocketWrite() {
-            pauseResumeOperation(true, writeLock);
-        }
-
-        /**
-         *
-         */
-        public void pauseAll() {
+        private void pauseAll() {
             pauseResumeOperation(true, openSockLock, writeLock);
 
             brokeConnection();
@@ -1081,7 +1021,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         /**
          *
          */
-        public void resumeAll() {
+        private void resume() {
             pauseResumeOperation(false, openSockLock, writeLock);
         }
     }



Mime
View raw message