ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [1/7] ignite git commit: IGNITE-2951 - Stability fixes for cluster with many clients
Date Thu, 07 Apr 2016 14:46:37 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2630 668895563 -> c180c127b


IGNITE-2951 - Stability fixes for cluster with many clients


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e266153
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e266153
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e266153

Branch: refs/heads/ignite-2630
Commit: 5e266153707021a8866e91dfa3f958066f80fc99
Parents: da47901
Author: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Authored: Wed Apr 6 18:10:45 2016 -0700
Committer: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Committed: Wed Apr 6 23:25:03 2016 -0700

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        |  2 +-
 .../continuous/GridContinuousProcessor.java     | 13 +++++-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 44 +++++++++-----------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 23 ++++------
 .../IgniteClientReconnectAbstractTest.java      |  7 ++--
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  8 ++--
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 39 +++++++++--------
 .../TcpDiscoverySpiFailureTimeoutSelfTest.java  | 23 ++++------
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java  |  5 ++-
 10 files changed, 80 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index bbfc71a..82e9bda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -931,7 +931,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(),
locMap.map());
 
                 m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
-                
+
                 m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index abafe85..d7838f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -393,7 +393,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
         if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
-            DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
+            Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
+
+            for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet())
{
+                Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size());
+
+                for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
+                    copy.put(e0.getKey(), e0.getValue());
+
+                clientInfos0.put(e.getKey(), copy);
+            }
+
+            DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0);
 
             // Collect listeners information (will be sent to joining node during discovery
process).
             for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 950c680..31d614f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1062,7 +1062,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 try {
                     if (ack) {
                         synchronized (mux) {
-                            assert unackedMsg == null : unackedMsg;
+                            assert unackedMsg == null : "Unacked=" + unackedMsg + ", received="
+ msg;
 
                             unackedMsg = msg;
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 88e34e8..27a31c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectStreamException;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.ConnectException;
 import java.net.InetAddress;
@@ -74,7 +76,6 @@ import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -2134,6 +2135,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private Socket sock;
 
+        /** Output stream. */
+        private OutputStream out;
+
         /** Last time status message has been sent. */
         private long lastTimeStatusMsgSent;
 
@@ -2470,10 +2474,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 sock = spi.openSocket(addr, timeoutHelper);
 
+                                out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize());
+
                                 openSock = true;
 
                                 // Handshake.
-                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
+                                spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId),
                                     timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock,
null,
@@ -2627,7 +2633,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
                                     try {
-                                        writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk(
+                                        spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
                                             spi.getSocketTimeout()));
                                     }
                                     finally {
@@ -2679,7 +2685,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     }
                                 }
 
-                                writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                                spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -3999,7 +4005,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
                 else if (leftNode.equals(next) && sock != null) {
                     try {
-                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                        spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());
 
                         if (log.isDebugEnabled())
@@ -5617,6 +5623,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private final Socket sock;
 
+        /** Output stream. */
+        private final OutputStream out;
+
         /** Current client metrics. */
         private volatile ClusterMetrics metrics;
 
@@ -5630,11 +5639,13 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param sock Socket.
          * @param clientNodeId Node ID.
          */
-        protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
+        protected ClientMessageWorker(Socket sock, UUID clientNodeId) throws IOException
{
             super("tcp-disco-client-message-worker", 2000);
 
             this.sock = sock;
             this.clientNodeId = clientNodeId;
+
+            out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize());
         }
 
         /**
@@ -5681,7 +5692,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Sending message ack to client [sock=" + sock + ",
locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg="
+ msg + ']');
 
-                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                        spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());
                     }
                 }
@@ -5692,7 +5703,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     assert topologyInitialized(msg) : msg;
 
-                    writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                    spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
                         spi.failureDetectionTimeout() : spi.getSocketTimeout());
                 }
             }
@@ -5799,9 +5810,6 @@ class ServerImpl extends TcpDiscoveryImpl {
      * Base class for message workers.
      */
     protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
-        /** Pre-allocated output stream (100K). */
-        private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100
* 1024);
-
         /** Message queue. */
         private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
 
@@ -5883,20 +5891,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         protected void noMessageLoop() {
             // No-op.
         }
-
-        /**
-         * @param sock Socket.
-         * @param msg Message.
-         * @param timeout Socket timeout.
-         * @throws IOException If IO failed.
-         * @throws IgniteCheckedException If marshalling failed.
-         */
-        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
long timeout)
-            throws IOException, IgniteCheckedException {
-            bout.reset();
-
-            spi.writeToSocket(sock, msg, bout, timeout);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index df152f8..d981609 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
+import java.io.BufferedOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -51,7 +52,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -1346,45 +1346,38 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
      */
     protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException,
         IgniteCheckedException {
-        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K.
+        writeToSocket(sock, new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()),
msg, timeout);
     }
 
     /**
      * Writes message to the socket.
      *
      * @param sock Socket.
+     * @param out Stream to write to.
      * @param msg Message.
-     * @param bout Byte array output stream.
      * @param timeout Timeout.
      * @throws IOException If IO failed or write timed out.
      * @throws IgniteCheckedException If marshalling failed.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
     protected void writeToSocket(Socket sock,
+        OutputStream out,
         TcpDiscoveryAbstractMessage msg,
-        GridByteArrayOutputStream bout,
         long timeout) throws IOException, IgniteCheckedException {
         assert sock != null;
         assert msg != null;
-        assert bout != null;
-
-        // Marshall message first to perform only write after.
-        marsh.marshal(msg, bout);
+        assert out != null;
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
-        IOException err = null;
+        IgniteCheckedException err = null;
 
         try {
-            OutputStream out = sock.getOutputStream();
-
-            bout.writeTo(out);
-
-            out.flush();
+            marsh.marshal(msg, out);
         }
-        catch (IOException e) {
+        catch (IgniteCheckedException e) {
             err = e;
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 6869d1c..4d49366 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.Socket;
 import java.util.Collection;
 import java.util.Collections;
@@ -384,7 +385,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         volatile CountDownLatch writeLatch;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
long timeout)
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg, long timeout)
             throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                 CountDownLatch writeLatch0 = writeLatch;
@@ -396,7 +397,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
                 }
             }
 
-            super.writeToSocket(sock, msg, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -464,4 +465,4 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
                 log.error(s);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 7debb41..e01094c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.typedef.CIX2;
@@ -2158,8 +2158,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
             boolean fail = false;
@@ -2184,7 +2184,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
                 sock.close();
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
 
             if (afterWrite != null)
                 afterWrite.apply(msg, sock);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 7635f0b..7efaca0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -1852,9 +1852,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private volatile boolean failed;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock,
+        @Override protected void writeToSocket(Socket sock, OutputStream out,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             boolean add = msgIds.add(msg.id());
 
@@ -1864,7 +1863,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 failed = true;
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -1877,8 +1876,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             if (stopBeforeSndAck) {
                 if (msg instanceof TcpDiscoveryCustomEventMessage) {
@@ -1908,7 +1907,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -1940,8 +1939,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             if (stop)
                 return;
@@ -1986,7 +1985,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 return;
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -2001,8 +2000,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private boolean stop;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) {
                 log.info("Stop node on custom event: " + msg);
 
@@ -2014,7 +2013,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             if (stop)
                 return;
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -2035,8 +2034,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private boolean debug;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryNodeAddedMessage) {
                 if (nodeAdded1 != null) {
                     nodeAdded1.countDown();
@@ -2063,7 +2062,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
                 log.info("--- Send custom event: " + msg);
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -2075,13 +2074,13 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private volatile boolean stop;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
 
             if (stop)
                 throw new RuntimeException("Failing ring message worker explicitly");
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -2093,12 +2092,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private volatile boolean stop;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
             if (stop)
                 throw new RuntimeException("Failing ring message worker explicitly");
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 stop = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
index 4cf9bd0..4ef984f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
@@ -348,10 +348,14 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
long timeout)
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg, long timeout)
             throws IOException, IgniteCheckedException {
             if (!(msg instanceof TcpDiscoveryPingRequest)) {
-                super.writeToSocket(sock, msg, timeout);
+                if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+                    connCheckStatusMsgCntSent++;
+
+                super.writeToSocket(sock, out, msg, timeout);
+
                 return;
             }
 
@@ -370,16 +374,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
                 }
             }
             else
-                super.writeToSocket(sock, msg, timeout);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
-            if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
-                connCheckStatusMsgCntSent++;
-
-            super.writeToSocket(sock, msg, bout, timeout);
+                super.writeToSocket(sock, out, msg, timeout);
         }
 
         /** {@inheritDoc} */
@@ -405,4 +400,4 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
             countConnCheckMsg = false;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e266153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index dbc54bc..721192f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -31,12 +32,12 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
     public boolean ignorePingResponse;
 
     /** {@inheritDoc} */
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException,
+    protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg, long timeout) throws IOException,
         IgniteCheckedException {
         if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
             return;
         else
-            super.writeToSocket(sock, msg, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
     }
 
     /** {@inheritDoc} */


Mime
View raw message