ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [32/50] [abbrv] ignite git commit: IGNITE-2951 - Stability fixes for cluster with many clients
Date Tue, 26 Apr 2016 01:07:39 GMT
IGNITE-2951 - Stability fixes for cluster with many clients


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d92e617a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d92e617a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d92e617a

Branch: refs/heads/ignite-testing-discovery
Commit: d92e617a71c31ccaa3fd6e4954aa4ccaf494733c
Parents: e7ab8ee
Author: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Authored: Wed Apr 6 18:10:45 2016 -0700
Committer: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Committed: Wed Apr 6 18:10:45 2016 -0700

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 11 ++---
 .../GridDhtPartitionsExchangeFuture.java        |  4 +-
 .../continuous/GridContinuousProcessor.java     | 13 +++++-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 44 +++++++++-----------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 23 ++++------
 .../IgniteClientReconnectAbstractTest.java      |  7 ++--
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  8 ++--
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 39 +++++++++--------
 .../TcpDiscoverySpiFailureTimeoutSelfTest.java  | 23 ++++------
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java  |  5 ++-
 11 files changed, 87 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 7e13f26..6181d9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -859,7 +859,7 @@ public class GridDhtPartitionDemander {
                     log.debug("Rebalancing is not required [cache=" + cctx.name() +
                         ", topology=" + topVer + "]");
 
-                checkIsDone(cancelled);
+                checkIsDone(cancelled, true);
             }
         }
 
@@ -883,7 +883,7 @@ public class GridDhtPartitionDemander {
 
                 remaining.clear();
 
-                checkIsDone(true /* cancelled */);
+                checkIsDone(true /* cancelled */, false);
             }
 
             return true;
@@ -1014,13 +1014,13 @@ public class GridDhtPartitionDemander {
          *
          */
         private void checkIsDone() {
-            checkIsDone(false);
+            checkIsDone(false, false);
         }
 
         /**
          * @param cancelled Is cancelled.
          */
-        private void checkIsDone(boolean cancelled) {
+        private void checkIsDone(boolean cancelled, boolean wasEmpty) {
             if (remaining.isEmpty()) {
                 if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated()
|| sndStoppedEvnt))
                     preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
@@ -1028,7 +1028,8 @@ public class GridDhtPartitionDemander {
                 if (log.isDebugEnabled())
                     log.debug("Completed rebalance future: " + this);
 
-                cctx.shared().exchange().scheduleResendPartitions();
+                if (!wasEmpty)
+                    cctx.shared().exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 22fb59e..be346b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -929,7 +929,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         U.warn(log, "Failed to wait for partition release future [topVer=" + topologyVersion()
+
             ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the
cause: ");
 
-        cctx.exchange().dumpPendingObjects();
+        cctx.exchange().dumpDebugInfo();
     }
 
     /**
@@ -993,7 +993,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(),
locMap.map());
 
                 m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
-                
+
                 m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 99e0bb5..c6503e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -394,7 +394,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
         if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
-            DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
+            Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());
+
+            for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet())
{
+                Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size());
+
+                for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
+                    copy.put(e0.getKey(), e0.getValue());
+
+                clientInfos0.put(e.getKey(), copy);
+            }
+
+            DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0);
 
             // Collect listeners information (will be sent to joining node during discovery
process).
             for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 65b94ca..834922c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1062,7 +1062,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 try {
                     if (ack) {
                         synchronized (mux) {
-                            assert unackedMsg == null : unackedMsg;
+                            assert unackedMsg == null : "Unacked=" + unackedMsg + ", received="
+ msg;
 
                             unackedMsg = msg;
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f0de546..5c7ffbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectStreamException;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.ConnectException;
 import java.net.InetAddress;
@@ -73,7 +75,6 @@ import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -2120,6 +2121,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private Socket sock;
 
+        /** Output stream. */
+        private OutputStream out;
+
         /** Last time status message has been sent. */
         private long lastTimeStatusMsgSent;
 
@@ -2456,10 +2460,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 sock = spi.openSocket(addr, timeoutHelper);
 
+                                out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize());
+
                                 openSock = true;
 
                                 // Handshake.
-                                writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
+                                spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId),
                                     timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock,
null,
@@ -2613,7 +2619,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
                                     try {
-                                        writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk(
+                                        spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
                                             spi.getSocketTimeout()));
                                     }
                                     finally {
@@ -2665,7 +2671,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     }
                                 }
 
-                                writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                                spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
@@ -3956,7 +3962,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
                 else if (leftNode.equals(next) && sock != null) {
                     try {
-                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                        spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());
 
                         if (log.isDebugEnabled())
@@ -5569,6 +5575,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private final Socket sock;
 
+        /** Output stream. */
+        private final OutputStream out;
+
         /** Current client metrics. */
         private volatile ClusterMetrics metrics;
 
@@ -5582,11 +5591,13 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param sock Socket.
          * @param clientNodeId Node ID.
          */
-        protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
+        protected ClientMessageWorker(Socket sock, UUID clientNodeId) throws IOException
{
             super("tcp-disco-client-message-worker", 2000);
 
             this.sock = sock;
             this.clientNodeId = clientNodeId;
+
+            out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize());
         }
 
         /**
@@ -5633,7 +5644,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Sending message ack to client [sock=" + sock + ",
locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg="
+ msg + ']');
 
-                        writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                        spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());
                     }
                 }
@@ -5644,7 +5655,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     assert topologyInitialized(msg) : msg;
 
-                    writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+                    spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
                         spi.failureDetectionTimeout() : spi.getSocketTimeout());
                 }
             }
@@ -5751,9 +5762,6 @@ class ServerImpl extends TcpDiscoveryImpl {
      * Base class for message workers.
      */
     protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
-        /** Pre-allocated output stream (100K). */
-        private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100
* 1024);
-
         /** Message queue. */
         private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
 
@@ -5835,20 +5843,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         protected void noMessageLoop() {
             // No-op.
         }
-
-        /**
-         * @param sock Socket.
-         * @param msg Message.
-         * @param timeout Socket timeout.
-         * @throws IOException If IO failed.
-         * @throws IgniteCheckedException If marshalling failed.
-         */
-        protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
long timeout)
-            throws IOException, IgniteCheckedException {
-            bout.reset();
-
-            spi.writeToSocket(sock, msg, bout, timeout);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 277055a..abe380c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
+import java.io.BufferedOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -51,7 +52,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -1346,45 +1346,38 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
      */
     protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException,
         IgniteCheckedException {
-        writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K.
+        writeToSocket(sock, new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()),
msg, timeout);
     }
 
     /**
      * Writes message to the socket.
      *
      * @param sock Socket.
+     * @param out Stream to write to.
      * @param msg Message.
-     * @param bout Byte array output stream.
      * @param timeout Timeout.
      * @throws IOException If IO failed or write timed out.
      * @throws IgniteCheckedException If marshalling failed.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
     protected void writeToSocket(Socket sock,
+        OutputStream out,
         TcpDiscoveryAbstractMessage msg,
-        GridByteArrayOutputStream bout,
         long timeout) throws IOException, IgniteCheckedException {
         assert sock != null;
         assert msg != null;
-        assert bout != null;
-
-        // Marshall message first to perform only write after.
-        marsh.marshal(msg, bout);
+        assert out != null;
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
-        IOException err = null;
+        IgniteCheckedException err = null;
 
         try {
-            OutputStream out = sock.getOutputStream();
-
-            bout.writeTo(out);
-
-            out.flush();
+            marsh.marshal(msg, out);
         }
-        catch (IOException e) {
+        catch (IgniteCheckedException e) {
             err = e;
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 0c005e9..96ca0dd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.Socket;
 import java.util.Collection;
 import java.util.Collections;
@@ -384,7 +385,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         volatile CountDownLatch writeLatch;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
long timeout)
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg, long timeout)
             throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                 CountDownLatch writeLatch0 = writeLatch;
@@ -396,7 +397,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
                 }
             }
 
-            super.writeToSocket(sock, msg, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -464,4 +465,4 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
                 log.error(s);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 7debb41..e01094c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.typedef.CIX2;
@@ -2158,8 +2158,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
             boolean fail = false;
@@ -2184,7 +2184,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
                 sock.close();
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
 
             if (afterWrite != null)
                 afterWrite.apply(msg, sock);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0df7da6..b45fe12 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -1849,9 +1849,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private volatile boolean failed;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock,
+        @Override protected void writeToSocket(Socket sock, OutputStream out,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             boolean add = msgIds.add(msg.id());
 
@@ -1861,7 +1860,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 failed = true;
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -1874,8 +1873,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             if (stopBeforeSndAck) {
                 if (msg instanceof TcpDiscoveryCustomEventMessage) {
@@ -1905,7 +1904,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 }
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -1937,8 +1936,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
             TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout,
             long timeout) throws IOException, IgniteCheckedException {
             if (stop)
                 return;
@@ -1983,7 +1982,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 return;
             }
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -1998,8 +1997,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private boolean stop;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) {
                 log.info("Stop node on custom event: " + msg);
 
@@ -2011,7 +2010,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             if (stop)
                 return;
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -2032,8 +2031,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private boolean debug;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
             if (msg instanceof TcpDiscoveryNodeAddedMessage) {
                 if (nodeAdded1 != null) {
                     nodeAdded1.countDown();
@@ -2060,7 +2059,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
                 log.info("--- Send custom event: " + msg);
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -2072,13 +2071,13 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private volatile boolean stop;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
 
             if (stop)
                 throw new RuntimeException("Failing ring message worker explicitly");
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
         }
     }
 
@@ -2090,12 +2089,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         private volatile boolean stop;
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+            long timeout) throws IOException, IgniteCheckedException {
             if (stop)
                 throw new RuntimeException("Failing ring message worker explicitly");
 
-            super.writeToSocket(sock, msg, bout, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 stop = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
index 4cf9bd0..4ef984f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
@@ -348,10 +348,14 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
long timeout)
+        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg, long timeout)
             throws IOException, IgniteCheckedException {
             if (!(msg instanceof TcpDiscoveryPingRequest)) {
-                super.writeToSocket(sock, msg, timeout);
+                if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+                    connCheckStatusMsgCntSent++;
+
+                super.writeToSocket(sock, out, msg, timeout);
+
                 return;
             }
 
@@ -370,16 +374,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
                 }
             }
             else
-                super.writeToSocket(sock, msg, timeout);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
-            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
-            if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
-                connCheckStatusMsgCntSent++;
-
-            super.writeToSocket(sock, msg, bout, timeout);
+                super.writeToSocket(sock, out, msg, timeout);
         }
 
         /** {@inheritDoc} */
@@ -405,4 +400,4 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
             countConnCheckMsg = false;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92e617a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index dbc54bc..721192f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -31,12 +32,12 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
     public boolean ignorePingResponse;
 
     /** {@inheritDoc} */
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException,
+    protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg, long timeout) throws IOException,
         IgniteCheckedException {
         if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
             return;
         else
-            super.writeToSocket(sock, msg, timeout);
+            super.writeToSocket(sock, out, msg, timeout);
     }
 
     /** {@inheritDoc} */


Mime
View raw message