ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [46/51] ignite git commit: Direct marshalling backward compatibility
Date Thu, 19 Nov 2015 04:32:58 GMT
Direct marshalling backward compatibility


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6911884
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6911884
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6911884

Branch: refs/heads/ignite-direct-marsh-opt
Commit: e69118848d5e041e01dd83186e3c7fb55f47162a
Parents: a73c67f
Author: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Authored: Wed Nov 18 16:42:25 2015 -0800
Committer: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Committed: Wed Nov 18 16:42:25 2015 -0800

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  8 +-
 .../internal/util/ipc/IpcToNioAdapter.java      | 14 ++--
 .../util/nio/GridCommunicationClient.java       |  4 +-
 .../internal/util/nio/GridDirectParser.java     | 16 ++--
 .../util/nio/GridNioMessageReaderFactory.java   | 35 +++++++++
 .../util/nio/GridNioMessageWriterFactory.java   | 33 ++++++++
 .../ignite/internal/util/nio/GridNioServer.java | 31 ++++----
 .../util/nio/GridShmemCommunicationClient.java  |  6 +-
 .../communication/MessageFormatter.java         | 11 ++-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  6 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 82 ++++++++++++--------
 .../testframework/GridSpiTestContext.java       |  6 +-
 12 files changed, 174 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3d4fbef..c3a708c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -276,11 +276,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         }
         else {
             formatter = new MessageFormatter() {
-                @Override public MessageWriter writer() {
+                @Override public MessageWriter writer(UUID rmtNodeId) {
+                    assert rmtNodeId != null;
+
                     return new DirectMessageWriter();
                 }
 
-                @Override public MessageReader reader(MessageFactory factory, Class<?
extends Message> msgCls) {
+                @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory,
Class<? extends Message> msgCls) {
+                    assert rmtNodeId != null;
+
                     return new DirectMessageReader(msgFactory);
                 }
             };

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index e732a79..6820dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -30,13 +30,13 @@ import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
 import org.apache.ignite.internal.util.nio.GridNioFilterChain;
 import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
 import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
 import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
 import org.apache.ignite.internal.util.nio.GridNioServerListener;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 
 /**
  * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC)
@@ -65,23 +65,23 @@ public class IpcToNioAdapter<T> {
     private final GridNioMetricsListener metricsLsnr;
 
     /** */
-    private final MessageFormatter formatter;
+    private final GridNioMessageWriterFactory writerFactory;
 
     /**
      * @param metricsLsnr Metrics listener.
      * @param log Log.
      * @param endp Endpoint.
      * @param lsnr Listener.
-     * @param formatter Message formatter.
+     * @param writerFactory Writer factory.
      * @param filters Filters.
      */
     public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint
endp,
-        GridNioServerListener<T> lsnr, MessageFormatter formatter, GridNioFilter...
filters) {
+        GridNioServerListener<T> lsnr, GridNioMessageWriterFactory writerFactory, GridNioFilter...
filters) {
         assert metricsLsnr != null;
 
         this.metricsLsnr = metricsLsnr;
         this.endp = endp;
-        this.formatter = formatter;
+        this.writerFactory = writerFactory;
 
         chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
         ses = new GridNioSessionImpl(chain, null, null, true);
@@ -163,7 +163,7 @@ public class IpcToNioAdapter<T> {
         assert writeBuf.hasArray();
 
         try {
-            int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf, formatter.writer());
+            int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf, writerFactory.writer(ses));
 
             metricsLsnr.onBytesSent(cnt);
         }
@@ -255,4 +255,4 @@ public class IpcToNioAdapter<T> {
             proceedSessionWriteTimeout(ses);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index a933916..0de54e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -94,7 +94,7 @@ public interface GridCommunicationClient {
     public void sendMessage(byte[] data, int len) throws IgniteCheckedException;
 
     /**
-     * @param nodeId Node ID (provided only if versions of local and remote nodes are different).
+     * @param nodeId Remote node ID. Provided only for sync clients.
      * @param msg Message to send.
      * @param closure Ack closure.
      * @throws IgniteCheckedException If failed.
@@ -107,4 +107,4 @@ public interface GridCommunicationClient {
      * @return {@code True} if send is asynchronous.
      */
     public boolean async();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 5140cce..a4c9b02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.jetbrains.annotations.Nullable;
 
@@ -41,18 +40,18 @@ public class GridDirectParser implements GridNioParser {
     private final MessageFactory msgFactory;
 
     /** */
-    private final MessageFormatter formatter;
+    private final GridNioMessageReaderFactory readerFactory;
 
     /**
      * @param msgFactory Message factory.
-     * @param formatter Formatter.
+     * @param readerFactory Message reader factory.
      */
-    public GridDirectParser(MessageFactory msgFactory, MessageFormatter formatter) {
+    public GridDirectParser(MessageFactory msgFactory, GridNioMessageReaderFactory readerFactory)
{
         assert msgFactory != null;
-        assert formatter != null;
+        assert readerFactory != null;
 
         this.msgFactory = msgFactory;
-        this.formatter = formatter;
+        this.readerFactory = readerFactory;
     }
 
     /** {@inheritDoc} */
@@ -61,7 +60,7 @@ public class GridDirectParser implements GridNioParser {
         MessageReader reader = ses.meta(READER_META_KEY);
 
         if (reader == null)
-            ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory, null)); //
TODO: class in null
+            ses.addMeta(READER_META_KEY, reader = readerFactory.reader(ses, msgFactory));
 
         Message msg = ses.removeMeta(MSG_META_KEY);
 
@@ -74,7 +73,8 @@ public class GridDirectParser implements GridNioParser {
             finished = msg.readFrom(buf, reader);
 
         if (finished) {
-            reader.reset();
+            if (reader != null)
+                reader.reset();
 
             return msg;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
new file mode 100644
index 0000000..68c887f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+
+/**
+ * Message reader factory.
+ */
+public interface GridNioMessageReaderFactory {
+    /**
+     * Creates new reader.
+     *
+     * @param ses Current session.
+     * @param msgFactory Message factory.
+     * @return Reader.
+     */
+    public MessageReader reader(GridNioSession ses, MessageFactory msgFactory);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
new file mode 100644
index 0000000..0c251c2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message writer factory.
+ */
+public interface GridNioMessageWriterFactory {
+    /**
+     * Creates new writer.
+     *
+     * @param ses Current session.
+     * @return Writer.
+     */
+    public MessageWriter writer(GridNioSession ses);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 5bd08e7..c4710ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -62,7 +62,6 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -176,7 +175,7 @@ public class GridNioServer<T> {
 
     /** */
     @GridToStringExclude
-    private MessageFormatter formatter;
+    private GridNioMessageWriterFactory writerFactory;
 
     /** */
     @GridToStringExclude
@@ -212,7 +211,7 @@ public class GridNioServer<T> {
      * @param directMode Whether direct mode is used.
      * @param daemon Daemon flag to create threads.
      * @param metricsLsnr Metrics listener.
-     * @param formatter Message formatter.
+     * @param writerFactory Writer factory.
      * @param skipRecoveryPred Skip recovery predicate.
      * @param msgQueueLsnr Message queue size listener.
      * @param filters Filters for this server.
@@ -234,7 +233,7 @@ public class GridNioServer<T> {
         boolean directMode,
         boolean daemon,
         GridNioMetricsListener metricsLsnr,
-        MessageFormatter formatter,
+        GridNioMessageWriterFactory writerFactory,
         IgnitePredicate<Message> skipRecoveryPred,
         IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
         GridNioFilter... filters
@@ -304,7 +303,7 @@ public class GridNioServer<T> {
 
         this.directMode = directMode;
         this.metricsLsnr = metricsLsnr;
-        this.formatter = formatter;
+        this.writerFactory = writerFactory;
 
         this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse();
     }
@@ -935,7 +934,7 @@ public class GridNioServer<T> {
             MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
 
             if (writer == null)
-                ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer());
+                ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
 
             boolean handshakeFinished = sslFilter.lock(ses);
 
@@ -1130,7 +1129,7 @@ public class GridNioServer<T> {
             MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
 
             if (writer == null)
-                ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer());
+                ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
 
             if (req == null) {
                 req = (NioOperationFuture<?>)ses.pollFuture();
@@ -1152,7 +1151,7 @@ public class GridNioServer<T> {
 
                 finished = msg.writeTo(buf, writer);
 
-                if (finished)
+                if (finished && writer != null)
                     writer.reset();
             }
 
@@ -1176,7 +1175,7 @@ public class GridNioServer<T> {
 
                 finished = msg.writeTo(buf, writer);
 
-                if (finished)
+                if (finished && writer != null)
                     writer.reset();
             }
 
@@ -2221,8 +2220,8 @@ public class GridNioServer<T> {
         /** Daemon flag. */
         private boolean daemon;
 
-        /** Message formatter. */
-        private MessageFormatter formatter;
+        /** Writer factory. */
+        private GridNioMessageWriterFactory writerFactory;
 
         /** Skip recovery predicate. */
         private IgnitePredicate<Message> skipRecoveryPred;
@@ -2253,7 +2252,7 @@ public class GridNioServer<T> {
                 directMode,
                 daemon,
                 metricsLsnr,
-                formatter,
+                writerFactory,
                 skipRecoveryPred,
                 msgQueueLsnr,
                 filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
@@ -2450,11 +2449,11 @@ public class GridNioServer<T> {
         }
 
         /**
-         * @param formatter Message formatter.
+         * @param writerFactory Writer factory.
          * @return This for chaining.
          */
-        public Builder<T> messageFormatter(MessageFormatter formatter) {
-            this.formatter = formatter;
+        public Builder<T> writerFactory(GridNioMessageWriterFactory writerFactory)
{
+            this.writerFactory = writerFactory;
 
             return this;
         }
@@ -2479,4 +2478,4 @@ public class GridNioServer<T> {
             return this;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 93e789d..0405b61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -123,13 +123,15 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg,
         IgniteInClosure<IgniteException> closure)
         throws IgniteCheckedException {
+        assert nodeId != null;
+
         if (closed())
             throw new IgniteCheckedException("Communication client was closed: " + this);
 
         assert writeBuf.hasArray();
 
         try {
-            int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer());
+            int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer(nodeId));
 
             metricsLsnr.onBytesSent(cnt);
         }
@@ -154,4 +156,4 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     @Override public String toString() {
         return S.toString(GridShmemCommunicationClient.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
index 4c9e0ee..67dd0d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.plugin.extensions.communication;
 
+import java.util.UUID;
 import org.apache.ignite.plugin.Extension;
 
 /**
@@ -33,16 +34,18 @@ public interface MessageFormatter extends Extension {
     /**
      * Creates new message writer instance.
      *
+     * @param rmtNodeId Remote node ID.
      * @return Message writer.
      */
-    public MessageWriter writer();
+    public MessageWriter writer(UUID rmtNodeId);
 
     /**
      * Creates new message reader instance.
      *
-     * @param factory Message factory.
+     * @param rmtNodeId Remote node ID.
+     * @param msgFactory Message factory.
      * @param msgCls Message class to read.
      * @return Message reader.
      */
-    public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls);
-}
\ No newline at end of file
+    public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory, Class<? extends
Message> msgCls);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index cd46de4..0830bad 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -696,11 +696,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
             if (msgFormatter0 == null) {
                 msgFormatter0 = new MessageFormatter() {
-                    @Override public MessageWriter writer() {
+                    @Override public MessageWriter writer(UUID rmtNodeId) {
                         throw new IgniteException("Failed to write message, node is not started.");
                     }
 
-                    @Override public MessageReader reader(MessageFactory factory, Class<?
extends Message> msgCls) {
+                    @Override public MessageReader reader(UUID rmtNodeId, MessageFactory
msgFactory, Class<? extends Message> msgCls) {
                         throw new IgniteException("Failed to read message, node is not started.");
                     }
                 };
@@ -880,4 +880,4 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
             ((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e8bd8a1..116998e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -76,7 +76,9 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
 import org.apache.ignite.internal.util.nio.GridDirectParser;
 import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
 import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
+import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
 import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -284,7 +286,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
 
     /** Node ID meta for session. */
-    private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
+    public static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
@@ -1575,29 +1577,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                MessageFormatter msgFormatter = new MessageFormatter() {
-                    private MessageFormatter impl;
+                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory()
{
+                    private MessageFormatter formatter;
 
-                    @Override public MessageWriter writer() {
-                        if (impl == null)
-                            impl = getSpiContext().messageFormatter();
+                    @Override public MessageReader reader(GridNioSession ses, MessageFactory
msgFactory) {
+                        if (formatter == null)
+                            formatter = getSpiContext().messageFormatter();
 
-                        assert impl != null;
+                        assert formatter != null;
 
-                        return impl.writer();
+                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+
+                         // TODO: DIRECT - class is null
+                        return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory,
null) : null;
                     }
+                };
 
-                    @Override public MessageReader reader(MessageFactory factory, Class<?
extends Message> msgCls) {
-                        if (impl == null)
-                            impl = getSpiContext().messageFormatter();
+                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory()
{
+                    private MessageFormatter formatter;
 
-                        assert impl != null;
+                    @Override public MessageWriter writer(GridNioSession ses) {
+                        if (formatter == null)
+                            formatter = getSpiContext().messageFormatter();
+
+                        assert formatter != null;
+
+                        UUID rmtNodeId = ses.meta(NODE_ID_META);
 
-                        return impl.reader(factory, msgCls);
+                        return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
                     }
                 };
 
-                GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter);
+                GridDirectParser parser = new GridDirectParser(msgFactory, readerFactory);
 
                 IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>()
{
                     @Override public boolean apply(Message msg) {
@@ -1658,7 +1669,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .metricsListener(metricsLsnr)
                         .writeTimeout(sockWriteTimeout)
                         .filters(filters)
-                        .messageFormatter(msgFormatter)
+                        .writerFactory(writerFactory)
                         .skipRecoveryPredicate(skipRecoveryPred)
                         .messageQueueSizeListener(queueSizeMonitor)
                         .build();
@@ -1918,7 +1929,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     UUID nodeId = null;
 
-                    if (!client.async() && !locNode.version().equals(node.version()))
+                    if (!client.async())
                         nodeId = node.id();
 
                     retry = client.sendMessage(nodeId, msg, ackC);
@@ -2591,7 +2602,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         buf.order(ByteOrder.nativeOrder());
 
-                        boolean written = msg.writeTo(buf, getSpiContext().messageFormatter().writer());
+                        boolean written = msg.writeTo(buf, null);
 
                         assert written;
 
@@ -2932,25 +2943,34 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                MessageFormatter msgFormatter = new MessageFormatter() {
-                    private MessageFormatter impl;
+                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory()
{
+                    private MessageFormatter formatter;
 
-                    @Override public MessageWriter writer() {
-                        if (impl == null)
-                            impl = getSpiContext().messageFormatter();
+                    @Override public MessageWriter writer(GridNioSession ses) {
+                        if (formatter == null)
+                            formatter = getSpiContext().messageFormatter();
 
-                        assert impl != null;
+                        assert formatter != null;
 
-                        return impl.writer();
+                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+
+                        return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
                     }
+                };
 
-                    @Override public MessageReader reader(MessageFactory factory, Class<?
extends Message> msgCls) {
-                        if (impl == null)
-                            impl = getSpiContext().messageFormatter();
+                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory()
{
+                    private MessageFormatter formatter;
 
-                        assert impl != null;
+                    @Override public MessageReader reader(GridNioSession ses, MessageFactory
msgFactory) {
+                        if (formatter == null)
+                            formatter = getSpiContext().messageFormatter();
+
+                        assert formatter != null;
+
+                        UUID rmtNodeId = ses.meta(NODE_ID_META);
 
-                        return impl.reader(factory, msgCls);
+                         // TODO: DIRECT - class is null
+                        return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory,
null) : null;
                     }
                 };
 
@@ -2959,8 +2979,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     log,
                     endpoint,
                     srvLsnr,
-                    msgFormatter,
-                    new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter),
log, true),
+                    writerFactory,
+                    new GridNioCodecFilter(new GridDirectParser(msgFactory, readerFactory),
log, true),
                     new GridConnectionBytesVerifyFilter(log)
                 );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6911884/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index d9334eb..dc1a7e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -492,12 +492,12 @@ public class GridSpiTestContext implements IgniteSpiContext {
     @Override public MessageFormatter messageFormatter() {
         if (formatter == null) {
             formatter = new MessageFormatter() {
-                @Override public MessageWriter writer() {
+                @Override public MessageWriter writer(UUID rmtNodeId) {
                     return new DirectMessageWriter();
                 }
 
-                @Override public MessageReader reader(MessageFactory factory, Class<?
extends Message> msgCls) {
-                    return new DirectMessageReader(factory);
+                @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory,
Class<? extends Message> msgCls) {
+                    return new DirectMessageReader(msgFactory);
                 }
             };
         }


Mime
View raw message