ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [07/50] [abbrv] ignite git commit: IGNITE-3054 - Rework message handling.
Date Mon, 24 Apr 2017 15:03:30 GMT
IGNITE-3054 - Rework message handling.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/661efc21
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/661efc21
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/661efc21

Branch: refs/heads/ignite-3054
Commit: 661efc218038bef89c0ded7ec0c48a6631df8110
Parents: 2c8ee96
Author: dkarachentsev <dkarachentsev@gridgain.com>
Authored: Thu Nov 17 13:21:08 2016 +0300
Committer: dkarachentsev <dkarachentsev@gridgain.com>
Committed: Thu Nov 17 13:21:08 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 459 +++++--------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  66 ++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  33 +-
 3 files changed, 203 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/661efc21/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2a471c4..734e538 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -90,11 +90,15 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple;
+import org.apache.ignite.internal.util.nio.GridBufferedParser;
+import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
+import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
 import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
@@ -155,7 +159,6 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
 import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -204,12 +207,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Node ID in GridNioSession. */
     private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
 
-    /** Not fully read message in GridNioSession. */
-    private static final int INCOMPLETE_MESSAGE_META = GridNioSessionMetaKey.nextUniqueKey();
-
-    /** Not fully read message length. */
-    private static final int MESSAGE_LEN_META = GridNioSessionMetaKey.nextUniqueKey();
-
     /**
      * Number of tries to reopen ServerSocketChannel on 'SocketException: Invalid argument'.
      * <p>This error may happen on simultaneous server nodes startup on the same JVM.</p>
@@ -286,11 +283,70 @@ class ServerImpl extends TcpDiscoveryImpl {
         new ConcurrentHashMap8<>();
 
     /** Nio server that serves client connections. */
-    private GridNioServer clientNioSrv;
+    private GridNioServer<byte[]> clientNioSrv;
 
     /** List of nio workers. */
     private Set<ClientNioMessageWorker> nioWorkers = new CopyOnWriteArraySet<>();
 
+    /** Listener of client nio connections. */
+    private final GridNioServerListener<byte[]> clientLsnr = new GridNioServerListenerAdapter<byte[]>()
{
+        /** */
+        private final ClientNioMessageProcessor msgProc = new ClientNioMessageProcessor(log);
+
+        @Override public void onConnected(final GridNioSession ses) {
+            // No-op.
+        }
+
+        @Override public void onDisconnected(final GridNioSession ses, @Nullable final Exception
e) {
+            final UUID clientNodeId = msgProc.clientNodeId(ses);
+
+            if (log.isDebugEnabled())
+                log.debug("Stopping message worker on disconnect [remoteAddr=" + ses.remoteAddress()
+
+                    ", remote node ID=" + clientNodeId + ']');
+
+            final ClientMessageProcessor proc = clientMsgWorkers.get(clientNodeId);
+
+            if (proc != null && proc instanceof ClientNioMessageWorker &&
((ClientNioMessageWorker)proc).ses == ses) {
+                if (clientMsgWorkers.remove(clientNodeId, proc))
+                    ((ClientNioMessageWorker)proc).nonblockingStop();
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.error("Illegal ClientMessageProcessor: " + proc);
+
+                if (ses.closeTime() == 0)
+                    ses.close();
+            }
+        }
+
+        @Override public void onMessage(final GridNioSession ses, final byte[] msg) {
+            // Release nio thread.
+            nioClientProcessingPool.submit(new Runnable() {
+                @Override public void run() {
+                    try {
+                        msgProc.processMessage(ses, msg);
+                    }
+                    catch (IgniteCheckedException e) {
+                        log.error("Failure processing message, closing connection. [ses="
+ ses + ']', e);
+
+                        final UUID nodeId = ses.meta(NODE_ID_META);
+
+                        assert nodeId != null;
+
+                        final ClientMessageProcessor proc = clientMsgWorkers.get(nodeId);
+
+                        if (proc != null && proc instanceof ClientNioMessageWorker)
{
+                            ClientNioMessageWorker wrk = (ClientNioMessageWorker)proc;
+
+                            if (wrk.ses == ses)
+                                clientMsgWorkers.remove(nodeId, wrk);
+                        }
+                    }
+                }
+            });
+        }
+    };
+
     /**
      * @param adapter Adapter.
      */
@@ -419,19 +475,36 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * @return New NIO server.
      */
-    private GridNioServer createClientNioServer() {
-        final GridNioServer srv;
+    private GridNioServer<byte[]> createClientNioServer() {
+        final GridNioServer<byte[]> srv;
 
         final ArrayList<GridNioFilter> filters = new ArrayList<>();
 
-        filters.add(new ClientNioFilter("marshallerFilter"));
+        filters.add(new GridNioCodecFilter(new GridBufferedParser(
+            spi.isClientDirectBuffer(), spi.getClientByteOrder()) {
+            @Override public ByteBuffer encode(final GridNioSession ses,
+                final Object msg) throws IOException, IgniteCheckedException {
+                // Respond to client without message length.
+                byte[] msg0 = (byte[])msg;
+
+                ByteBuffer res = ByteBuffer.allocateDirect(msg0.length);
+
+                res.put(msg0);
+
+                res.flip();
+
+                return res;
+            }
+        }, log, false));
+
+        filters.add(new GridConnectionBytesVerifyFilter(log));
 
         if (spi.isSslEnabled()) {
             if (spi.sslCtx != null) {
                 GridNioSslFilter sslFilter = new GridNioSslFilter(
                     spi.sslCtx,
-                    true,
-                    ByteOrder.nativeOrder(),
+                    spi.isClientDirectBuffer(),
+                    spi.getClientByteOrder(),
                     log
                 );
 
@@ -448,21 +521,21 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (Thread.currentThread() instanceof IgniteThread)
                 gridName = ((IgniteThread)Thread.currentThread()).getGridName();
 
-            srv = GridNioServer.builder().address(U.getLocalHost())
+            srv = GridNioServer.<byte[]>builder().address(U.getLocalHost())
                 .port(-1)
-                .listener(new ClientNioListener<>(log, writeTimeout))
+                .listener(clientLsnr)
                 .filters(filters.toArray(new GridNioFilter[filters.size()]))
                 .logger(log)
                 .selectorCount(spi.getClientNioThreads())
                 .sendQueueLimit(spi.getClientSendMessageQueueLimit())
-                .byteOrder(ByteOrder.nativeOrder())
+                .byteOrder(spi.getClientByteOrder())
                 .tcpNoDelay(spi.getTcpNodelay())
-                .directBuffer(true)
+                .directBuffer(spi.isClientDirectBuffer())
                 .directMode(false)
                 .socketReceiveBufferSize(0)
                 .socketSendBufferSize(0)
                 .idleTimeout(Long.MAX_VALUE)
-                .gridName("nio-" + gridName)
+                .gridName(gridName)
                 .daemon(false)
                 .writeTimeout(writeTimeout)
                 .build();
@@ -5662,7 +5735,18 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @return Send future.
          */
         GridNioFuture<?> addReceipt(final int receipt) {
-            return spi.sendMessage(ses, null, new byte[]{(byte) receipt});
+            try {
+                return spi.sendMessage(ses, null, new byte[]{(byte) receipt});
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed marshal message, closing connection. [receipt=" + receipt
+ ", ses=" + ses + ']', e);
+
+                nonblockingStop();
+
+                clientMsgWorkers.remove(clientNodeId, this);
+
+                return new GridNioFinishedFuture<>(e);
+            }
         }
 
         /**
@@ -5705,7 +5789,16 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msgBytes Message bytes to send.
          */
         public void sendMessage(final TcpDiscoveryAbstractMessage msg, @Nullable final byte[]
msgBytes) {
-            spi.sendMessage(ses, msg, msgBytes);
+            try {
+                spi.sendMessage(ses, msg, msgBytes);
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed marshal message, closing connection. [msg=" + msg + ",
ses=" + ses + ']', e);
+
+                nonblockingStop();
+
+                clientMsgWorkers.remove(clientNodeId, this);
+            }
         }
 
         /** {@inheritDoc} */
@@ -5760,93 +5853,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
-    /**
-     * Listener for client messages.
-     */
-    private class ClientNioListener<T> implements GridNioServerListener<T> {
-        /** */
-        private final IgniteLogger log;
-
-        /** */
-        private final long writeTimeout;
-
-        /** */
-        private final ClientNioMessageProcessor<T> msgProc;
-
-        /**
-         * @param log Logger.
-         * @param writeTimeout Socket write timeout.
-         */
-        private ClientNioListener(final IgniteLogger log, final long writeTimeout) {
-            this.log = log;
-            this.writeTimeout = writeTimeout;
-
-            msgProc = new ClientNioMessageProcessor<>(log);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onConnected(final GridNioSession ses) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onDisconnected(final GridNioSession ses, @Nullable final Exception
e) {
-            final UUID clientNodeId = msgProc.clientNodeId(ses);
-
-            if (log.isDebugEnabled())
-                log.debug("Stopping message worker on disconnect [remoteAddr=" + ses.remoteAddress()
+
-                    ", remote node ID=" + clientNodeId + ']');
 
-            final ClientMessageProcessor proc = clientMsgWorkers.get(clientNodeId);
-
-            if (proc != null && proc instanceof ClientNioMessageWorker &&
((ClientNioMessageWorker)proc).ses == ses) {
-                if (clientMsgWorkers.remove(clientNodeId, proc))
-                    ((ClientNioMessageWorker)proc).nonblockingStop();
-            }
-            else {
-                if (log.isDebugEnabled())
-                    log.error("Illegal ClientMessageProcessor: " + proc);
-
-                if (ses.closeTime() == 0)
-                    ses.close();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessage(final GridNioSession ses, final T msg0) {
-            // Release nio thread.
-            nioClientProcessingPool.submit(new Runnable() {
-                @Override public void run() {
-                    msgProc.processMessage(ses, msg0);
-                }
-            });
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionWriteTimeout(final GridNioSession ses) {
-            final UUID clientNodeId = msgProc.clientNodeId(ses);
-
-            if (log.isDebugEnabled())
-                log.debug("Stopping message worker on write timeout [remoteAddr=" + ses.remoteAddress()
+
-                    ", writeTimeout=" + writeTimeout + ", remote node ID=" + clientNodeId
+ ']');
-
-            final ClientMessageProcessor proc = clientMsgWorkers.remove(clientNodeId);
-
-            stopClientProcessor(proc, false);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionIdleTimeout(final GridNioSession ses) {
-            // No-op.
-        }
-    }
 
     /**
      * Processes incoming nio client messages.
-     *
-     * @param <T> Message type.
      */
-    private class ClientNioMessageProcessor<T> {
+    private class ClientNioMessageProcessor {
         /** */
         private final IgniteLogger log;
 
@@ -5863,7 +5875,10 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param ses Nio session.
          * @param msg0 Incoming message.
          */
-        void processMessage(GridNioSession ses, T msg0) {
+        void processMessage(GridNioSession ses, byte[] msg0) throws IgniteCheckedException
{
+            final TcpDiscoveryAbstractMessage msg = spi.marshaller().unmarshal(msg0,
+                    U.resolveClassLoader(spi.ignite().configuration()));
+
             final UUID nodeId = getConfiguredNodeId();
             final UUID clientNodeId = clientNodeId(ses);
 
@@ -5872,7 +5887,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (clientMsgWrk == null || clientMsgWrk.ses != ses) {
                 if (log.isDebugEnabled())
                     log.debug("NIO Worker has been closed, drop message. [clientNodeId="
-                        + clientNodeId + ", message=" + msg0 + ", clientMsgWrk=" + clientMsgWrk
+ "]");
+                        + clientNodeId + ", message=" + msg + ", clientMsgWrk=" + clientMsgWrk
+ "]");
 
                 if (ses.closeTime() == 0)
                     ses.close();
@@ -5880,8 +5895,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 return;
             }
 
-            final TcpDiscoveryAbstractMessage msg = (TcpDiscoveryAbstractMessage)msg0;
-
             msg.senderNodeId(nodeId);
 
             if (log.isDebugEnabled())
@@ -6103,183 +6116,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     *
-     */
-    private class ClientNioFilter extends GridNioFilterAdapter {
-        /**
-         * @param name Name.
-         */
-        ClientNioFilter(final String name) {
-            super(name);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionOpened(final GridNioSession ses) throws IgniteCheckedException
{
-            proceedSessionOpened(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionClosed(final GridNioSession ses) throws IgniteCheckedException
{
-            final UUID clientId = ses.meta(NODE_ID_META);
-
-            if (clientId != null) {
-                final ClientNioMessageWorker proc = (ClientNioMessageWorker)clientMsgWorkers.get(clientId);
-
-                if (proc != null && proc.ses == ses && clientMsgWorkers.remove(clientId,
proc))
-                    proc.nonblockingStop();
-            }
-
-            proceedSessionClosed(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onExceptionCaught(final GridNioSession ses,
-            final IgniteCheckedException ex) throws IgniteCheckedException {
-            proceedExceptionCaught(ses, ex);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(final GridNioSession ses,
-            final Object msg) throws IgniteCheckedException {
-            final byte[] bytes = msg instanceof byte[] ? (byte[]) msg : spi.marshaller().marshal(msg);
-
-            return proceedSessionWrite(ses, ByteBuffer.wrap(bytes));
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(final GridNioSession ses,
-            final Object msg) throws IgniteCheckedException {
-            ByteBuffer msgBuf = ses.meta(INCOMPLETE_MESSAGE_META);
-
-            final ByteBuffer buf = (ByteBuffer)msg;
-
-            ByteBufferInputStream in = null;
-
-            if (msgBuf == null || msgBuf.position() == 0) {
-                // first packet
-                final int msgLen = getMessageLength(buf, ses);
-
-                if (msgLen == -1)
-                    return;
-
-                if (msgLen <= buf.remaining())
-                    in = new ByteBufferInputStream(buf, msgLen, false);
-                else {
-                    msgBuf = enlargeIfNeed(msgBuf, msgLen);
-
-                    ses.addMeta(INCOMPLETE_MESSAGE_META, msgBuf);
-                }
-            }
-
-            if (in == null) {
-                final int oldLim = buf.limit();
-
-                if (buf.remaining() > msgBuf.remaining())
-                    buf.limit(buf.position() + msgBuf.remaining());
-
-                msgBuf.put(buf);
-
-                buf.limit(oldLim);
-
-                if (!msgBuf.hasRemaining()) {
-                    msgBuf.rewind();
-
-                    in = new ByteBufferInputStream(msgBuf, msgBuf.limit(), true);
-                }
-            }
-
-            if (in != null) {
-                // unmarshal and process
-                final Object obj;
-
-                try {
-                    obj = spi.marshaller().unmarshal(in,
-                        U.resolveClassLoader(spi.ignite().configuration()));
-                }
-                finally {
-                    U.closeQuiet(in);
-                }
-
-                proceedMessageReceived(ses, obj);
-
-                // There are left bytes not processed
-                while (buf.hasRemaining())
-                    onMessageReceived(ses, msg);
-            }
-        }
-
-        /**
-         * @param buf Byte buffer.
-         * @param len Length.
-         * @return New buffer.
-         */
-        private ByteBuffer enlargeIfNeed(ByteBuffer buf, int len) {
-            assert buf == null || buf.position() == 0 : buf;
-
-            if (buf == null || buf.capacity() < len)
-                buf = ByteBuffer.allocateDirect(len);
-
-            buf.limit(len);
-
-            return buf;
-        }
-
-        /**
-         * @param buf Input buffer.
-         * @return Message length or -1 if not all bytes of length were read.
-         */
-        private int getMessageLength(final ByteBuffer buf, final GridNioSession ses) {
-            ByteBuffer lenBuf = ses.meta(MESSAGE_LEN_META);
-
-            int len = -1;
-
-            if (lenBuf != null || buf.remaining() < 4) {
-                if (lenBuf == null) {
-                    lenBuf = ByteBuffer.allocate(4);
-
-                    ses.addMeta(MESSAGE_LEN_META, lenBuf);
-                }
-
-                buf.get(lenBuf.array(), lenBuf.position(), Math.min(buf.remaining(), lenBuf.remaining()));
-
-                if (lenBuf.remaining() == 0) {
-                    lenBuf.order(ByteOrder.BIG_ENDIAN);
-
-                    len = lenBuf.getInt();
-
-                    ses.removeMeta(MESSAGE_LEN_META);
-                }
-            }
-            else {
-                final ByteOrder curOrder = buf.order();
-
-                buf.order(ByteOrder.BIG_ENDIAN);
-
-                len = buf.getInt();
-
-                buf.order(curOrder);
-            }
-
-            return len;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<Boolean> onSessionClose(final GridNioSession
ses) throws IgniteCheckedException {
-            return proceedSessionClose(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionIdleTimeout(final GridNioSession ses) throws IgniteCheckedException
{
-            proceedSessionIdleTimeout(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionWriteTimeout(final GridNioSession ses) throws IgniteCheckedException
{
-            proceedSessionWriteTimeout(ses);
-        }
-    }
-
-    /**
      * Thread that reads messages from the socket created for incoming connections.
      */
     private class SocketReader extends IgniteSpiThread {
@@ -7910,61 +7746,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     *
-     */
-    private static class ByteBufferInputStream extends InputStream {
-        /** */
-        private final ByteBuffer buf;
-
-        /** */
-        private final int oldLimit;
-
-        /** */
-        private final boolean clearOnClose;
-
-        /**
-         * @param buf Byte buffer.
-         * @param len Length that will be set as buf.limit(buf.position() + len).
-         * @param clearOnClose Clear buffer on close.
-         */
-        private ByteBufferInputStream(final ByteBuffer buf, final int len, final boolean
clearOnClose) {
-            this.buf = buf;
-            this.oldLimit = buf.limit();
-            this.clearOnClose = clearOnClose;
-
-            buf.limit(buf.position() + len);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int read() throws IOException {
-            if (!buf.hasRemaining())
-                return -1;
-
-            return buf.get() & 0xFF;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int read(@NotNull final byte[] b, final int off, final int len)
throws IOException {
-            int toRead = Math.min(len, buf.remaining());
-
-            if (toRead == 0)
-                return -1;
-
-            buf.get(b, off, toRead);
-
-            return toRead;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IOException {
-            if (clearOnClose)
-                buf.clear();
-            else
-                buf.limit(oldLimit);
-        }
-    }
-
-    /**
      * Actually marshals client nio messages to release ring worker from that routine.
      */
     private class NioSendWorker extends GridWorker {

http://git-wip-us.apache.org/repos/asf/ignite/blob/661efc21/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index d648af1..19e5249 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -30,6 +30,7 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -294,6 +295,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
     /** Default max number of messages that could be queued to send to client. (value is
<tt>0</tt>).  */
     public static final int DFLT_CLIENT_SEND_MSG_QUEUE_LIMIT = 0;
 
+    /** Default value for use direct or heap buffer. (value is <tt>true</tt>)
*/
+    public static final boolean DFLT_CLIENT_NIO_DIRECT_BUF = true;
+
+    /** Default byte order for nio client buffers. (value is <tt>ByteOrder.nativeOrder()</tt>)
*/
+    public static final ByteOrder DFLT_CLIENT_NIO_BYTE_ORDER = ByteOrder.nativeOrder();
+
     /** Local address. */
     protected String locAddr;
 
@@ -397,6 +404,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
     /** Max number of messages that could be queued to send to client. */
     protected int clientSndMsgQueueLimit = DFLT_CLIENT_SEND_MSG_QUEUE_LIMIT;
 
+    /** Use direct or heap buffer flag. */
+    protected boolean clientNioDirectBuf = DFLT_CLIENT_NIO_DIRECT_BUF;
+
+    /** Byte order for nio client buffers. */
+    protected ByteOrder clientNioByteOrder = DFLT_CLIENT_NIO_BYTE_ORDER;
+
     /** Node authenticator. */
     protected DiscoverySpiNodeAuthenticator nodeAuth;
 
@@ -891,6 +904,55 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
     }
 
     /**
+     * Gets flag that indicates whether use direct buffer or heap buffer for nio client
+     * connections.
+     * <p>
+     *     Defaults to {@link #DFLT_CLIENT_NIO_DIRECT_BUF}
+     * </p>
+     *
+     * @return {@code True} if use direct buffer.
+     */
+    public boolean isClientDirectBuffer() {
+        return clientNioDirectBuf;
+    }
+
+    /**
+     * Sets flag that indicates whether use direct buffer or heap buffer for nio client
+     * connections.
+     *
+     * @param directBuf Direct buffer flag.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setClientDirectBuffer(boolean directBuf) {
+        this.clientNioDirectBuf = directBuf;
+    }
+
+    /**
+     * Gets byte order that is used in client nio buffers.
+     * <p>
+     *     Defaults to {@link #DFLT_CLIENT_NIO_BYTE_ORDER}
+     * </p>
+     *
+     * @return Client nio buffers byte order.
+     */
+    public ByteOrder getClientByteOrder() {
+        return clientNioByteOrder;
+    }
+
+    /**
+     * Sets byte order that is used in client nio buffers.
+     * <p>
+     *     Defaults to {@link #DFLT_CLIENT_NIO_BYTE_ORDER}
+     * </p>
+     *
+     * @param order Client nio buffers byte order.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setClientByteOrder(ByteOrder order) {
+        this.clientNioByteOrder = order;
+    }
+
+    /**
      * Gets IP finder for IP addresses sharing and storing.
      *
      * @return IP finder for IP addresses sharing and storing.
@@ -1569,7 +1631,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
      * @return Future.
      */
     protected GridNioFuture<?> sendMessage(GridNioSession ses, @Nullable TcpDiscoveryAbstractMessage
msg,
-        @Nullable byte[] msgBytes) {
+        @Nullable byte[] msgBytes) throws IgniteCheckedException {
         assert msg != null || msgBytes != null : "Null message to send";
 
         final GridNioFuture<?> fut;
@@ -1577,7 +1639,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
         if (msgBytes != null)
             fut = ses.send(msgBytes);
         else
-            fut = ses.send(msg);
+            fut = ses.send(marshaller().marshal(msg));
 
         return fut;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/661efc21/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index e87b96d..78752b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -2189,24 +2189,29 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
                         fut.onDone(e);
                     }
 
-                    TestTcpDiscoverySpi.super.sendMessage(ses, msg, msgBytes).chain(new C1<IgniteInternalFuture<?>,
Object>() {
-                        private static final long serialVersionUID = 0L;
+                    try {
+                        TestTcpDiscoverySpi.super.sendMessage(ses, msg, msgBytes).chain(new
C1<IgniteInternalFuture<?>, Object>() {
+                            private static final long serialVersionUID = 0L;
 
-                        @Override public Object apply(final IgniteInternalFuture<?>
igniteInternalFut) {
-                            try {
-                                final Object obj = igniteInternalFut.get();
+                            @Override public Object apply(final IgniteInternalFuture<?>
igniteInternalFut) {
+                                try {
+                                    final Object obj = igniteInternalFut.get();
 
-                                fut.onDone(obj, null);
+                                    fut.onDone(obj, null);
 
-                                return obj;
-                            }
-                            catch (IgniteCheckedException e) {
-                                fut.onDone(e);
-                            }
+                                    return obj;
+                                }
+                                catch (IgniteCheckedException e) {
+                                    fut.onDone(e);
+                                }
 
-                            return null;
-                        }
-                    });
+                                return null;
+                            }
+                        });
+                    }
+                    catch (IgniteCheckedException e) {
+                        fut.onDone(e);
+                    }
                 }
             };
 


Mime
View raw message