ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: ignite-752: fixed the rest of review notes
Date Wed, 22 Jul 2015 05:54:59 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-752 c453ab8dc -> c399a828f


ignite-752: fixed the rest of review notes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c399a828
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c399a828
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c399a828

Branch: refs/heads/ignite-752
Commit: c399a828f51644dc0991a4d73b195dc900a32ec5
Parents: c453ab8
Author: Denis Magda <dmagda@gridgain.com>
Authored: Wed Jul 22 08:54:48 2015 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Wed Jul 22 08:54:48 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   7 +-
 .../IgniteSpiOperationTimeoutController.java    | 102 --------------
 .../spi/IgniteSpiOperationTimeoutException.java |   2 +-
 .../spi/IgniteSpiOperationTimeoutHelper.java    | 102 ++++++++++++++
 .../communication/tcp/TcpCommunicationSpi.java  |  20 +--
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  12 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 132 +++++++++----------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   8 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |   4 +-
 ...TcpDiscoverySpiFailureThresholdSelfTest.java |  10 +-
 10 files changed, 202 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 33a250d..5e8f061 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -599,7 +599,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         if (failureDetectionThresholdEnabled) {
             failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
 
-            assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold
> 0");
+            if (failureDetectionThreshold <= 0)
+                throw new IgniteSpiException("Invalid failure detection threshold value:
" + failureDetectionThreshold);
+            else if (failureDetectionThreshold <= 10)
+                // Because U.currentTimeInMillis() is updated once in 10 milliseconds.
+                log.warning("Failure detection threshold is too low, it may lead to unpredictable
behaviour " +
+                    "[failureDetectionThreshold=" + failureDetectionThreshold + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
deleted file mode 100644
index 6213893..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.spi;
-
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.net.*;
-
-/**
- * Object that incorporates logic that determines a timeout value for the next network related
operation and checks
- * whether a failure detection threshold is reached or not.
- *
- * A new instance of the class should be created for every complex network based operations
that usually consists of
- * request and response parts.
- */
-public class IgniteSpiOperationTimeoutController {
-    /** */
-    private long lastOperStartTs;
-
-    /** */
-    private long timeout;
-
-    /** */
-    private final boolean failureDetectionThresholdEnabled;
-
-    /** */
-    private final long failureDetectionThreshold;
-
-    /**
-     * Constructor.
-     *
-     * @param adapter SPI adapter.
-     */
-    public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) {
-        failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
-        failureDetectionThreshold = adapter.failureDetectionThreshold();
-    }
-
-    /**
-     * Returns a timeout value to use for the next network operation.
-     *
-     * If failure detection threshold is enabled then the returned value is a portion of
time left since the last time
-     * this method is called. If the threshold is disabled then {@code dfltTimeout} is returned.
-     *
-     * @param dfltTimeout Timeout to use if failure detection threshold is disabled.
-     * @return Timeout in milliseconds.
-     * @throws IgniteSpiOperationTimeoutException If failure detection threshold is reached
for an operation that uses
-     * this {@code IgniteSpiOperationTimeoutController}.
-     */
-    public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException
{
-        if (!failureDetectionThresholdEnabled)
-            return dfltTimeout;
-
-        if (lastOperStartTs == 0) {
-            timeout = failureDetectionThreshold;
-            lastOperStartTs = U.currentTimeMillis();
-        }
-        else {
-            long curTs = U.currentTimeMillis();
-
-            timeout = timeout - (curTs - lastOperStartTs);
-
-            lastOperStartTs = curTs;
-
-            if (timeout <= 0)
-                throw new IgniteSpiOperationTimeoutException("Network operation timed out.
Increase " +
-                    "'failureDetectionThreshold' configuration property or set SPI specific
timeouts" +
-                    " manually. Current failure detection threshold: " + failureDetectionThreshold);
-        }
-
-        return timeout;
-    }
-
-    /**
-     * Checks whether the given {@link Exception} is generated because failure detection
threshold has been reached.
-     *
-     * @param e Exception.
-     * @return {@code true} if failure detection threshold is reached, {@code false} otherwise.
-     */
-    public boolean checkFailureDetectionThresholdReached(Exception e) {
-        if (!failureDetectionThresholdEnabled)
-            return false;
-
-        return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException
||
-            X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
index 1ea05fd..235fd2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
@@ -27,7 +27,7 @@ import org.apache.ignite.configuration.*;
  * {@link TcpCommunicationSpi}.
  *
  * For more information refer to {@link IgniteConfiguration#setFailureDetectionThreshold(long)}
and
- * {@link IgniteSpiOperationTimeoutController}.
+ * {@link IgniteSpiOperationTimeoutHelper}.
  */
 public class IgniteSpiOperationTimeoutException extends IgniteCheckedException {
     /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
new file mode 100644
index 0000000..03858d9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.net.*;
+
+/**
+ * Object that incorporates logic that determines a timeout value for the next network related
operation and checks
+ * whether a failure detection threshold is reached or not.
+ *
+ * A new instance of the class should be created for every complex network based operations
that usually consists of
+ * request and response parts.
+ */
+public class IgniteSpiOperationTimeoutHelper {
+    /** */
+    private long lastOperStartTs;
+
+    /** */
+    private long timeout;
+
+    /** */
+    private final boolean failureDetectionThresholdEnabled;
+
+    /** */
+    private final long failureDetectionThreshold;
+
+    /**
+     * Constructor.
+     *
+     * @param adapter SPI adapter.
+     */
+    public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) {
+        failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
+        failureDetectionThreshold = adapter.failureDetectionThreshold();
+    }
+
+    /**
+     * Returns a timeout value to use for the next network operation.
+     *
+     * If failure detection threshold is enabled then the returned value is a portion of
time left since the last time
+     * this method is called. If the threshold is disabled then {@code dfltTimeout} is returned.
+     *
+     * @param dfltTimeout Timeout to use if failure detection threshold is disabled.
+     * @return Timeout in milliseconds.
+     * @throws IgniteSpiOperationTimeoutException If failure detection threshold is reached
for an operation that uses
+     * this {@code IgniteSpiOperationTimeoutController}.
+     */
+    public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException
{
+        if (!failureDetectionThresholdEnabled)
+            return dfltTimeout;
+
+        if (lastOperStartTs == 0) {
+            timeout = failureDetectionThreshold;
+            lastOperStartTs = U.currentTimeMillis();
+        }
+        else {
+            long curTs = U.currentTimeMillis();
+
+            timeout = timeout - (curTs - lastOperStartTs);
+
+            lastOperStartTs = curTs;
+
+            if (timeout <= 0)
+                throw new IgniteSpiOperationTimeoutException("Network operation timed out.
Increase " +
+                    "'failureDetectionThreshold' configuration property or set SPI specific
timeouts" +
+                    " manually. Current failure detection threshold: " + failureDetectionThreshold);
+        }
+
+        return timeout;
+    }
+
+    /**
+     * Checks whether the given {@link Exception} is generated because failure detection
threshold has been reached.
+     *
+     * @param e Exception.
+     * @return {@code true} if failure detection threshold is reached, {@code false} otherwise.
+     */
+    public boolean checkThresholdReached(Exception e) {
+        if (!failureDetectionThresholdEnabled)
+            return false;
+
+        return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException
||
+            X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1f09f05..b24d424 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1941,7 +1941,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         long connTimeout0 = connTimeout;
 
-        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
 
         while (true) {
             GridCommunicationClient client;
@@ -1949,12 +1949,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             try {
                 client = new GridShmemCommunicationClient(metricsLsnr,
                     port,
-                    timeoutCtrl.nextTimeoutChunk(connTimeout),
+                    timeoutHelper.nextTimeoutChunk(connTimeout),
                     log,
                     getSpiContext().messageFormatter());
             }
             catch (IgniteCheckedException e) {
-                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                if (timeoutHelper.checkThresholdReached(e))
                     throw e;
 
                 // Reconnect for the second time, if connection is not established.
@@ -1968,13 +1968,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
+                safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
             }
             catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
 
                 if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException
||
-                    timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+                    timeoutHelper.checkThresholdReached(e))) {
                     log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold="
+
                         failureDetectionThreshold() + ", err=" + e.getMessage() + ", client="
+ client + ']');
 
@@ -2100,7 +2100,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             int attempt = 1;
 
-            IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+            IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
 
             while (!conn) { // Reconnection on handshake timeout.
                 try {
@@ -2128,9 +2128,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     long rcvCnt = -1;
 
                     try {
-                        ch.socket().connect(addr, (int)timeoutCtrl.nextTimeoutChunk(connTimeout));
+                        ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
+                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
 
                         if (rcvCnt == -1)
                             return null;
@@ -2172,7 +2172,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
 
                     if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException
||
-                        timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+                        timeoutHelper.checkThresholdReached(e))) {
 
                         String msg = "Handshake timed out (failure detection threshold is
reached) " +
                             "[failureDetectionThreshold=" + failureDetectionThreshold() +
", addr=" + addr + ']';
@@ -2240,7 +2240,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Client creation failed [addr=" + addr + ", err=" + e +
']');
 
-                    boolean failureDetThrReached = timeoutCtrl.checkFailureDetectionThresholdReached(e);
+                    boolean failureDetThrReached = timeoutHelper.checkThresholdReached(e);
 
                     if (failureDetThrReached)
                         LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold'
" +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 196c1b3..e0d1741 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -488,7 +488,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         UUID locNodeId = getLocalNodeId();
 
-        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
         while (true) {
             boolean openSock = false;
@@ -498,7 +498,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr, timeoutCtrl);
+                sock = spi.openSocket(addr, timeoutHelper);
 
                 openSock = true;
 
@@ -506,7 +506,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 req.client(true);
 
-                spi.writeToSocket(sock, req, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
 
@@ -536,7 +536,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 msg.client(true);
 
-                spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -544,7 +544,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     log.debug("Message has been sent to address [msg=" + msg + ", addr="
+ addr +
                         ", rmtNodeId=" + rmtNodeId + ']');
 
-                return new T3<>(sock, spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)),
res.clientAck());
+                return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
res.clientAck());
             }
             catch (IOException | IgniteCheckedException e) {
                 U.closeQuiet(sock);
@@ -559,7 +559,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
-                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                if (timeoutHelper.checkThresholdReached(e))
                     break;
 
                 if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2a09c62..1f98ba8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -462,7 +462,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         UUID locNodeId = getLocalNodeId();
 
-        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
         if (F.contains(spi.locNodeAddrs, addr)) {
             if (clientNodeId == null)
@@ -476,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             boolean clientPingRes;
 
             try {
-                clientPingRes = clientWorker.ping(timeoutCtrl);
+                clientPingRes = clientWorker.ping(timeoutHelper);
             }
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -508,12 +508,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         long tstamp = U.currentTimeMillis();
 
-                        sock = spi.openSocket(addr, timeoutCtrl);
+                        sock = spi.openSocket(addr, timeoutHelper);
 
                         spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
-                            timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+                            timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
-                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
+                        TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
                             spi.getAckTimeout()));
 
                         if (locNodeId.equals(res.creatorNodeId())) {
@@ -537,7 +537,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         errs.add(e);
 
-                        if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                        if (timeoutHelper.checkThresholdReached(e))
                             break;
                         else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt
== spi.getReconnectCount())
                             break;
@@ -600,14 +600,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override protected void onDataReceived() {
-        if (spi.failureDetectionThresholdEnabled()) {
-            if (locNode != null)
-                locNode.lastDataReceivedTime(U.currentTimeMillis());
-
-            if (msgWorker != null)
-                // Node receives messages from remote nodes, reset this flag.
-                msgWorker.failureDetectionNotified = false;
-        }
+        if (spi.failureDetectionThresholdEnabled() && locNode != null)
+            locNode.lastDataReceivedTime(U.currentTimeMillis());
     }
 
     /**
@@ -894,7 +888,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         UUID locNodeId = getLocalNodeId();
 
-        IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+        IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
         int reconCnt = 0;
 
@@ -910,15 +904,15 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 long tstamp = U.currentTimeMillis();
 
-                sock = spi.openSocket(addr, timeoutCtrl);
+                sock = spi.openSocket(addr, timeoutHelper);
 
                 openSock = true;
 
                 // Handshake.
-                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutCtrl.nextTimeoutChunk(
+                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
                     spi.getSocketTimeout()));
 
-                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
+                TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
                     ackTimeout0));
 
                 if (locNodeId.equals(res.creatorNodeId())) {
@@ -933,7 +927,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Send message.
                 tstamp = U.currentTimeMillis();
 
-                spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -950,7 +944,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // E.g. due to class not found issue.
                 joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
 
-                return spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+                return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
             }
             catch (ClassCastException e) {
                 // This issue is rarely reproducible on AmazonEC2, but never
@@ -976,7 +970,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
-                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                if (timeoutHelper.checkThresholdReached(e))
                     break;
 
                 if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
@@ -1774,8 +1768,8 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Time when the last status message has been sent. */
         private long lastTimeConnCheckMsgSent;
 
-        /** Whether an error message has been printed out when failure detection threshold
is reached. */
-        private volatile boolean failureDetectionNotified;
+        /** Flag that keeps info on whether the threshold is reached or not. */
+        private boolean failureThresholdReached;
 
         /** Last time hearbeat message has been sent. */
         private long lastTimeHbMsgSent;
@@ -1783,7 +1777,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         /**
          */
         protected RingMessageWorker() {
-            super("tcp-disco-msg-worker");
+            super("tcp-disco-msg-worker", 10);
         }
 
         /**
@@ -1843,6 +1837,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (spi.ensured(msg))
                 msgHist.add(msg);
 
+            if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId()))
+                // Reset the flag.
+                failureThresholdReached = false;
+
             spi.stats.onMessageProcessingFinished(msg);
         }
 
@@ -1921,16 +1919,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (debugMode)
                             debugLog("No next node in topology.");
 
-                        if (ring.hasRemoteNodes()) {
+                        /*if (ring.hasRemoteNodes()) {
                             msg.senderNodeId(locNodeId);
 
-                            if (msg instanceof TcpDiscoveryConnectionCheckMessage ||
-                                (msg instanceof TcpDiscoveryStatusCheckMessage &&
-                                ((TcpDiscoveryStatusCheckMessage)msg).replacedConnCheckMsg()))
-                                break;
-
                             addMessage(msg);
-                        }
+                        }*/
 
                         break;
                     }
@@ -1975,12 +1968,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     int reconCnt = 0;
 
-                    IgniteSpiOperationTimeoutController timeoutCtrl = null;
+                    IgniteSpiOperationTimeoutHelper timeoutHelper = null;
 
                     while (true) {
                         if (sock == null) {
-                            if (timeoutCtrl == null)
-                                timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+                            if (timeoutHelper == null)
+                                timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
                             nextNodeExists = false;
 
@@ -1992,16 +1985,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                sock = spi.openSocket(addr, timeoutCtrl);
+                                sock = spi.openSocket(addr, timeoutHelper);
 
                                 openSock = true;
 
                                 // Handshake.
                                 writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
-                                    timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+                                    timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock,
null,
-                                    timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+                                    timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                 if (locNodeId.equals(res.creatorNodeId())) {
                                     if (log.isDebugEnabled())
@@ -2088,7 +2081,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (!spi.failureDetectionThresholdEnabled() && ++reconCnt
== spi.getReconnectCount())
                                     break;
 
-                                if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                                if (timeoutHelper.checkThresholdReached(e))
                                     break;
                                 else if (!spi.failureDetectionThresholdEnabled() &&
(e instanceof
                                     SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)))
{
@@ -2111,7 +2104,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     nextNodeExists = true;
                                     // Resetting timeout control object to let the code below
to use a new one
                                     // for the next bunch of operations.
-                                    timeoutCtrl = null;
+                                    timeoutHelper = null;
                                 }
                             }
                         }
@@ -2149,11 +2142,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
                                         pendingMsgs.discardId);
 
-                                    if (timeoutCtrl == null)
-                                        timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+                                    if (timeoutHelper == null)
+                                        timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
                                     try {
-                                        writeToSocket(sock, pendingMsg, timeoutCtrl.nextTimeoutChunk(
+                                        writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk(
                                             spi.getSocketTimeout()));
                                     }
                                     finally {
@@ -2162,7 +2155,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis()
- tstamp);
 
-                                    int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+                                    int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                     if (log.isDebugEnabled())
                                         log.debug("Pending message has been sent to next
node [msg=" + msg.id() +
@@ -2176,7 +2169,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                     // Resetting timeout control object to create a new one
for the next bunch of
                                     // operations.
-                                    timeoutCtrl = null;
+                                    timeoutHelper = null;
                                 }
                             }
 
@@ -2185,14 +2178,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                             try {
                                 long tstamp = U.currentTimeMillis();
 
-                                if (timeoutCtrl == null)
-                                    timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+                                if (timeoutHelper == null)
+                                    timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
-                                writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+                                writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
-                                int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+                                int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                 if (log.isDebugEnabled())
                                     log.debug("Message has been sent to next node [msg="
+ msg +
@@ -2227,7 +2220,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             onException("Failed to send message to next node [next=" + next.id()
+ ", msg=" + msg + ']',
                                 e);
 
-                            if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+                            if (timeoutHelper.checkThresholdReached(e))
                                 break;
 
                             if (!spi.failureDetectionThresholdEnabled()) {
@@ -4045,14 +4038,17 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Check connection aliveness status.
          */
         private void checkConnection() {
-            if (!failureDetectionNotified && U.currentTimeMillis() - locNode.lastDataReceivedTime()
+            if (!spi.failureDetectionThresholdEnabled())
+                return;
+
+            if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime()
                 >= spi.failureDetectionThreshold() && ring.hasRemoteNodes() &&
spiStateCopy() == CONNECTED) {
 
                 log.info("Local node seems to be disconnected from topology (failure detection
threshold " +
                              "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold()
+
                              ", connCheckFreq=" + spi.connCheckFreq + ']');
 
-                failureDetectionNotified = true;
+                failureThresholdReached = true;
             }
 
             long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
@@ -4261,17 +4257,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
 
-                            IgniteSpiOperationTimeoutController timeoutCtrl =
-                                new IgniteSpiOperationTimeoutController(spi);
+                            IgniteSpiOperationTimeoutHelper timeoutHelper =
+                                new IgniteSpiOperationTimeoutHelper(spi);
 
                             if (req.clientNodeId() != null) {
                                 ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
 
                                 if (clientWorker != null)
-                                    res.clientExists(clientWorker.ping(timeoutCtrl));
+                                    res.clientExists(clientWorker.ping(timeoutHelper));
                             }
 
-                            spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+                            spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
                         }
                         else if (log.isDebugEnabled())
                             log.debug("Ignore ping request, node is stopping.");
@@ -4831,7 +4827,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param clientNodeId Node ID.
          */
         protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
-            super("tcp-disco-client-message-worker");
+            super("tcp-disco-client-message-worker", 2000);
 
             this.sock = sock;
             this.clientNodeId = clientNodeId;
@@ -4928,11 +4924,11 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * @param timeoutCtrl Timeout controller.
+         * @param timeoutHelper Timeout controller.
          * @return Ping result.
          * @throws InterruptedException If interrupted.
          */
-        public boolean ping(IgniteSpiOperationTimeoutController timeoutCtrl) throws InterruptedException
{
+        public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException
{
             if (spi.isNodeStopping0())
                 return false;
 
@@ -4958,7 +4954,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             try {
-                return fut.get(timeoutCtrl.nextTimeoutChunk(spi.getAckTimeout()),
+                return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()),
                     TimeUnit.MILLISECONDS);
             }
             catch (IgniteInterruptedCheckedException ignored) {
@@ -4998,12 +4994,18 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Backed interrupted flag. */
         private volatile boolean interrupted;
 
+        /** Polling timeout. */
+        private final long pollingTimeout;
+
         /**
          * @param name Thread name.
+         * @param pollingTimeout Messages polling timeout.
          */
-        protected MessageWorkerAdapter(String name) {
+        protected MessageWorkerAdapter(String name, long pollingTimeout) {
             super(spi.ignite().name(), name, log);
 
+            this.pollingTimeout = pollingTimeout;
+
             setPriority(spi.threadPri);
         }
 
@@ -5013,14 +5015,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() +
']');
 
             while (!isInterrupted()) {
-                TcpDiscoveryAbstractMessage msg = queue.poll(10, TimeUnit.MILLISECONDS);
+                TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
 
-                if (msg == null) {
+                if (msg == null)
                     noMessageLoop();
-                    continue;
-                }
-
-                processMessage(msg);
+                else
+                    processMessage(msg);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 87848d4..588ff98 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1178,11 +1178,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
 
     /**
      * @param sockAddr Remote address.
-     * @param timeoutCtrl Timeout controller.
+     * @param timeoutHelper Timeout helper.
      * @return Opened socket.
      * @throws IOException If failed.
      */
-    protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutController
timeoutCtrl)
+    protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper
timeoutHelper)
         throws IOException, IgniteSpiOperationTimeoutException {
         assert sockAddr != null;
 
@@ -1199,9 +1199,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
 
         sock.setTcpNoDelay(true);
 
-        sock.connect(resolved, (int)timeoutCtrl.nextTimeoutChunk(sockTimeout));
+        sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
 
-        writeToSocket(sock, U.IGNITE_HEADER, timeoutCtrl.nextTimeoutChunk(sockTimeout));
+        writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
 
         return sock;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 994b7b5..a50b060 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -2123,10 +2123,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
 
         /** {@inheritDoc} */
         @Override protected Socket openSocket(InetSocketAddress sockAddr,
-            IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException
{
+            IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException
{
             waitFor(openSockLock);
 
-            return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutController(this));
+            return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutHelper(this));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
index 362be15..1ee839c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -308,15 +308,15 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
 
 
         /** {@inheritDoc} */
-        @Override protected Socket openSocket(InetSocketAddress sockAddr,
-            IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException
{
+        @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper
timeoutHelper)
+            throws IOException, IgniteSpiOperationTimeoutException {
 
             if (openSocketTimeout) {
                 err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout");
                 throw err;
             }
             else if (openSocketTimeoutWait) {
-                long timeout = timeoutCtrl.nextTimeoutChunk(0);
+                long timeout = timeoutHelper.nextTimeoutChunk(0);
 
                 try {
                     Thread.sleep(timeout + 1000);
@@ -326,14 +326,14 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
                 }
 
                 try {
-                    timeoutCtrl.nextTimeoutChunk(0);
+                    timeoutHelper.nextTimeoutChunk(0);
                 }
                 catch (IgniteSpiOperationTimeoutException e) {
                     throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait"));
                 }
             }
 
-            Socket sock = super.openSocket(sockAddr, timeoutCtrl);
+            Socket sock = super.openSocket(sockAddr, timeoutHelper);
 
             try {
                 Thread.sleep(1500);



Mime
View raw message