ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [37/50] [abbrv] incubator-ignite git commit: gg-9791 - Communication fixes
Date Tue, 24 Feb 2015 10:06:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
index 5c1578c..d1938bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
@@ -271,7 +271,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeMap("ldrParticipants", ldrParticipants, Type.UUID, Type.IGNITE_UUID))
+                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
                     return false;
 
                 writer.incrementState();
@@ -318,20 +318,20 @@ public class GridDataLoadRequest extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 cacheName = reader.readString("cacheName");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 clsLdrId = reader.readIgniteUuid("clsLdrId");
@@ -339,7 +339,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 colBytes = reader.readByteArray("colBytes");
@@ -347,7 +347,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 3:
                 byte depModeOrd;
@@ -359,7 +359,7 @@ public class GridDataLoadRequest extends MessageAdapter {
 
                 depMode = DeploymentMode.fromOrdinal(depModeOrd);
 
-                readState++;
+                reader.incrementState();
 
             case 4:
                 forceLocDep = reader.readBoolean("forceLocDep");
@@ -367,7 +367,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 5:
                 ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
@@ -375,15 +375,15 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 6:
-                ldrParticipants = reader.readMap("ldrParticipants", Type.UUID, Type.IGNITE_UUID, false);
+                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 7:
                 reqId = reader.readLong("reqId");
@@ -391,7 +391,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 8:
                 resTopicBytes = reader.readByteArray("resTopicBytes");
@@ -399,7 +399,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 9:
                 sampleClsName = reader.readString("sampleClsName");
@@ -407,7 +407,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 10:
                 skipStore = reader.readBoolean("skipStore");
@@ -415,7 +415,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 11:
                 updaterBytes = reader.readByteArray("updaterBytes");
@@ -423,7 +423,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 12:
                 userVer = reader.readString("userVer");
@@ -431,7 +431,7 @@ public class GridDataLoadRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
index 84f11d5..4c0b52f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
@@ -118,20 +118,20 @@ public class GridDataLoadResponse extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 errBytes = reader.readByteArray("errBytes");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 forceLocDep = reader.readBoolean("forceLocDep");
@@ -139,7 +139,7 @@ public class GridDataLoadResponse extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 reqId = reader.readLong("reqId");
@@ -147,7 +147,7 @@ public class GridDataLoadResponse extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
index 5e6917e..f52c6de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -141,23 +141,23 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        if (!super.readFrom(buf))
+        if (!super.readFrom(buf, reader))
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 errBytes = reader.readByteArray("errBytes");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 fileId = reader.readIgniteUuid("fileId");
@@ -165,7 +165,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 id = reader.readLong("id");
@@ -173,7 +173,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
index b9be6a3..37e5881 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
@@ -199,20 +199,20 @@ public final class IgfsBlockKey extends MessageAdapter implements Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 affKey = reader.readIgniteUuid("affKey");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 blockId = reader.readLong("blockId");
@@ -220,7 +220,7 @@ public final class IgfsBlockKey extends MessageAdapter implements Externalizable
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 evictExclude = reader.readBoolean("evictExclude");
@@ -228,7 +228,7 @@ public final class IgfsBlockKey extends MessageAdapter implements Externalizable
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 3:
                 fileId = reader.readIgniteUuid("fileId");
@@ -236,7 +236,7 @@ public final class IgfsBlockKey extends MessageAdapter implements Externalizable
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java
index 006ca39..fbf2796 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java
@@ -99,7 +99,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeMap("blocks", blocks, Type.MSG, Type.BYTE_ARR))
+                if (!writer.writeMap("blocks", blocks, MessageCollectionItemType.MSG, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
@@ -122,23 +122,23 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        if (!super.readFrom(buf))
+        if (!super.readFrom(buf, reader))
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
-                blocks = reader.readMap("blocks", Type.MSG, Type.BYTE_ARR, false);
+                blocks = reader.readMap("blocks", MessageCollectionItemType.MSG, MessageCollectionItemType.BYTE_ARR, false);
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 fileId = reader.readIgniteUuid("fileId");
@@ -146,7 +146,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 id = reader.readLong("id");
@@ -154,7 +154,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
index ecfd876..53626d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java
@@ -63,7 +63,7 @@ public abstract class IgfsCommunicationMessage extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
index e02f1a3..2196525 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
@@ -139,23 +139,23 @@ public class IgfsDeleteMessage extends IgfsCommunicationMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        if (!super.readFrom(buf))
+        if (!super.readFrom(buf, reader))
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 errBytes = reader.readByteArray("errBytes");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 id = reader.readIgniteUuid("id");
@@ -163,7 +163,7 @@ public class IgfsDeleteMessage extends IgfsCommunicationMessage {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
index e736520..2b558fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java
@@ -310,20 +310,20 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 affKey = reader.readIgniteUuid("affKey");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 done = reader.readBoolean("done");
@@ -331,7 +331,7 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 endOff = reader.readLong("endOff");
@@ -339,7 +339,7 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 3:
                 startOff = reader.readLong("startOff");
@@ -347,7 +347,7 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 4:
                 status = reader.readInt("status");
@@ -355,7 +355,7 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java
index f915c83..9223009 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java
@@ -100,7 +100,7 @@ public class IgfsFragmentizerRequest extends IgfsCommunicationMessage {
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeCollection("fragmentRanges", fragmentRanges, Type.MSG))
+                if (!writer.writeCollection("fragmentRanges", fragmentRanges, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -111,31 +111,31 @@ public class IgfsFragmentizerRequest extends IgfsCommunicationMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        if (!super.readFrom(buf))
+        if (!super.readFrom(buf, reader))
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 fileId = reader.readIgniteUuid("fileId");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
-                fragmentRanges = reader.readCollection("fragmentRanges", Type.MSG);
+                fragmentRanges = reader.readCollection("fragmentRanges", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java
index d0cc4d1..9ba6920 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java
@@ -81,23 +81,23 @@ public class IgfsFragmentizerResponse extends IgfsCommunicationMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        if (!super.readFrom(buf))
+        if (!super.readFrom(buf, reader))
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 fileId = reader.readIgniteUuid("fileId");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java
index 2f8b941..0fb683c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java
@@ -104,23 +104,23 @@ public class IgfsSyncMessage extends IgfsCommunicationMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        if (!super.readFrom(buf))
+        if (!super.readFrom(buf, reader))
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 order = reader.readLong("order");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 res = reader.readBoolean("res");
@@ -128,7 +128,7 @@ public class IgfsSyncMessage extends IgfsCommunicationMessage {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java
index 7fd99e3..1da84c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java
@@ -128,20 +128,20 @@ public class GridTaskResultRequest extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 taskId = reader.readIgniteUuid("taskId");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 topicBytes = reader.readByteArray("topicBytes");
@@ -149,7 +149,7 @@ public class GridTaskResultRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java
index 9636dc8..4545beb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java
@@ -158,20 +158,20 @@ public class GridTaskResultResponse extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 err = reader.readString("err");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 finished = reader.readBoolean("finished");
@@ -179,7 +179,7 @@ public class GridTaskResultResponse extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 found = reader.readBoolean("found");
@@ -187,7 +187,7 @@ public class GridTaskResultResponse extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 3:
                 resBytes = reader.readByteArray("resBytes");
@@ -195,7 +195,7 @@ public class GridTaskResultResponse extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
index 264b73c..0c96ce5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
@@ -78,20 +78,20 @@ public class GridStreamerCancelRequest extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 cancelledFutId = reader.readIgniteUuid("cancelledFutId");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
index c4d01c8..2f983d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
@@ -186,7 +186,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMap("ldrParticipants", ldrParticipants, Type.UUID, Type.IGNITE_UUID))
+                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
                     return false;
 
                 writer.incrementState();
@@ -209,20 +209,20 @@ public class GridStreamerExecutionRequest extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 batchBytes = reader.readByteArray("batchBytes");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 clsLdrId = reader.readIgniteUuid("clsLdrId");
@@ -230,7 +230,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 byte depModeOrd;
@@ -242,7 +242,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter {
 
                 depMode = DeploymentMode.fromOrdinal(depModeOrd);
 
-                readState++;
+                reader.incrementState();
 
             case 3:
                 forceLocDep = reader.readBoolean("forceLocDep");
@@ -250,15 +250,15 @@ public class GridStreamerExecutionRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 4:
-                ldrParticipants = reader.readMap("ldrParticipants", Type.UUID, Type.IGNITE_UUID, false);
+                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 5:
                 sampleClsName = reader.readString("sampleClsName");
@@ -266,7 +266,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 6:
                 userVer = reader.readString("userVer");
@@ -274,7 +274,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
index 2260921..6c47179 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
@@ -104,20 +104,20 @@ public class GridStreamerResponse extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 errBytes = reader.readByteArray("errBytes");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 futId = reader.readIgniteUuid("futId");
@@ -125,7 +125,7 @@ public class GridStreamerResponse extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java
index 85bd7c8..833bb7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java
@@ -436,20 +436,20 @@ public class GridByteArrayList extends MessageAdapter implements Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 data = reader.readByteArray("data");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 size = reader.readInt("size");
@@ -457,7 +457,7 @@ public class GridByteArrayList extends MessageAdapter implements Externalizable
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
index 439063e..64bb7ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
@@ -534,20 +534,20 @@ public class GridLongList extends MessageAdapter implements Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 arr = reader.readLongArray("arr");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 idx = reader.readInt("idx");
@@ -555,7 +555,7 @@ public class GridLongList extends MessageAdapter implements Externalizable {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index b76c5d4..7b82d92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -31,6 +31,9 @@ public class GridDirectParser implements GridNioParser {
     /** Message metadata key. */
     private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Reader metadata key. */
+    private static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
     /** */
     private final MessageFactory msgFactory;
 
@@ -54,16 +57,24 @@ public class GridDirectParser implements GridNioParser {
         throws IOException, IgniteCheckedException {
         MessageAdapter msg = ses.removeMeta(MSG_META_KEY);
 
+        MessageReader reader = null;
+
         if (msg == null && buf.hasRemaining()) {
             msg = msgFactory.create(buf.get());
 
-            msg.setReader(formatter.reader(msgFactory));
+            ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory));
         }
 
         boolean finished = false;
 
-        if (buf.hasRemaining())
-            finished = msg.readFrom(buf);
+        if (buf.hasRemaining()) {
+            if (reader == null)
+                reader = ses.meta(READER_META_KEY);
+
+            assert reader != null;
+
+            finished = msg.readFrom(buf, reader);
+        }
 
         if (finished)
             return msg;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 0d5649a..af9c9b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -141,6 +141,10 @@ public class GridNioServer<T> {
     @GridToStringExclude
     private MessageFormatter formatter;
 
+    /** */
+    @GridToStringExclude
+    private IgnitePredicate<MessageAdapter> skipRecoveryPred;
+
     /** Static initializer ensures single-threaded execution of workaround. */
     static {
         // This is a workaround for JDK bug (NPE in Selector.open()).
@@ -169,6 +173,7 @@ public class GridNioServer<T> {
      * @param daemon Daemon flag to create threads.
      * @param metricsLsnr Metrics listener.
      * @param formatter Message formatter.
+     * @param skipRecoveryPred Skip recovery predicate.
      * @param filters Filters for this server.
      * @throws IgniteCheckedException If failed.
      */
@@ -189,6 +194,7 @@ public class GridNioServer<T> {
         boolean daemon,
         GridNioMetricsListener metricsLsnr,
         MessageFormatter formatter,
+        IgnitePredicate<MessageAdapter> skipRecoveryPred,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         A.notNull(addr, "addr");
@@ -254,6 +260,8 @@ public class GridNioServer<T> {
         this.directMode = directMode;
         this.metricsLsnr = metricsLsnr;
         this.formatter = formatter;
+
+        this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<MessageAdapter>alwaysFalse();
     }
 
     /**
@@ -351,7 +359,8 @@ public class GridNioServer<T> {
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+            skipRecoveryPred.apply(msg));
 
         send0(impl, fut, false);
 
@@ -404,7 +413,8 @@ public class GridNioServer<T> {
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
-        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg);
+        NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
+            skipRecoveryPred.apply(msg));
 
         if (lsnr != null) {
             fut.listenAsync(lsnr);
@@ -438,7 +448,7 @@ public class GridNioServer<T> {
             for (GridNioFuture<?> fut : futs) {
                 fut.messageThread(true);
 
-                ((NioOperationFuture)fut).resetMessage(ses0);
+                ((NioOperationFuture)fut).resetSession(ses0);
             }
 
             ses0.resend(futs);
@@ -1781,6 +1791,9 @@ public class GridNioServer<T> {
         /** */
         private Map<Integer, ?> meta;
 
+        /** */
+        private boolean skipRecovery;
+
         /**
          * Creates registration request for a given socket channel.
          *
@@ -1847,9 +1860,10 @@ public class GridNioServer<T> {
          * @param ses Session to change.
          * @param op Requested operation.
          * @param commMsg Direct message.
+         * @param skipRecovery Skip recovery flag.
          */
         NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op,
-            MessageAdapter commMsg) {
+            MessageAdapter commMsg, boolean skipRecovery) {
             assert ses != null;
             assert op != null;
             assert op != NioOperation.REGISTER;
@@ -1858,6 +1872,7 @@ public class GridNioServer<T> {
             this.ses = ses;
             this.op = op;
             this.commMsg = commMsg;
+            this.skipRecovery = skipRecovery;
         }
 
         /**
@@ -1884,11 +1899,9 @@ public class GridNioServer<T> {
         /**
          * @param ses New session instance.
          */
-        private void resetMessage(GridSelectorNioSessionImpl ses) {
+        private void resetSession(GridSelectorNioSessionImpl ses) {
             assert commMsg != null;
 
-//            commMsg = commMsg.clone();
-
             this.ses = ses;
         }
 
@@ -1932,7 +1945,7 @@ public class GridNioServer<T> {
 
         /** {@inheritDoc} */
         @Override public boolean skipRecovery() {
-            return commMsg != null && commMsg.skipRecovery();
+            return skipRecovery;
         }
 
         /** {@inheritDoc} */
@@ -2090,6 +2103,9 @@ public class GridNioServer<T> {
         /** Message formatter. */
         private MessageFormatter formatter;
 
+        /** Skip recovery predicate. */
+        private IgnitePredicate<MessageAdapter> skipRecoveryPred;
+
         /**
          * Finishes building the instance.
          *
@@ -2114,6 +2130,7 @@ public class GridNioServer<T> {
                 daemon,
                 metricsLsnr,
                 formatter,
+                skipRecoveryPred,
                 filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
             );
 
@@ -2316,5 +2333,15 @@ public class GridNioServer<T> {
 
             return this;
         }
+
+        /**
+         * @param skipRecoveryPred Skip recovery predicate.
+         * @return This for chaining.
+         */
+        public Builder<T> skipRecoveryPredicate(IgnitePredicate<MessageAdapter> skipRecoveryPred) {
+            this.skipRecoveryPred = skipRecoveryPred;
+
+            return this;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
index 2402d5e..d3d3f5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.plugin.extensions.communication;
 
-import org.jetbrains.annotations.*;
-
 import java.io.*;
 import java.nio.*;
 
@@ -26,22 +24,6 @@ import java.nio.*;
  * Base class for all communication messages.
  */
 public abstract class MessageAdapter implements Serializable {
-    /** Message reader. */
-    protected MessageReader reader;
-
-    /** Current read state. */
-    protected int readState;
-
-    /**
-     * @param reader Message reader.
-     */
-    public final void setReader(MessageReader reader) {
-        assert this.reader == null;
-        assert reader != null;
-
-        this.reader = reader;
-    }
-
     /**
      * Writes this message to provided byte buffer.
      *
@@ -55,9 +37,10 @@ public abstract class MessageAdapter implements Serializable {
      * Reads this message from provided byte buffer.
      *
      * @param buf Byte buffer.
+     * @param reader Reader.
      * @return Whether message was fully read.
      */
-    public abstract boolean readFrom(ByteBuffer buf);
+    public abstract boolean readFrom(ByteBuffer buf, MessageReader reader);
 
     /**
      * Gets message type.
@@ -72,94 +55,4 @@ public abstract class MessageAdapter implements Serializable {
      * @return Fields count.
      */
     public abstract byte fieldsCount();
-
-    /**
-     * Defines whether recovery for this message should be skipped.
-     *
-     * @return Whether recovery for this message should be skipped.
-     */
-    public boolean skipRecovery() {
-        return false;
-    }
-
-    /**
-     * Enum representing possible types of collection items.
-     */
-    public enum Type {
-        /** Byte. */
-        BYTE,
-
-        /** Short. */
-        SHORT,
-
-        /** Integer. */
-        INT,
-
-        /** Long. */
-        LONG,
-
-        /** Float. */
-        FLOAT,
-
-        /** Double. */
-        DOUBLE,
-
-        /** Character. */
-        CHAR,
-
-        /** Boolean. */
-        BOOLEAN,
-
-        /** Byte array. */
-        BYTE_ARR,
-
-        /** Short array. */
-        SHORT_ARR,
-
-        /** Integer array. */
-        INT_ARR,
-
-        /** Long array. */
-        LONG_ARR,
-
-        /** Float array. */
-        FLOAT_ARR,
-
-        /** Double array. */
-        DOUBLE_ARR,
-
-        /** Character array. */
-        CHAR_ARR,
-
-        /** Boolean array. */
-        BOOLEAN_ARR,
-
-        /** String. */
-        STRING,
-
-        /** Bit set. */
-        BIT_SET,
-
-        /** UUID. */
-        UUID,
-
-        /** Ignite UUID. */
-        IGNITE_UUID,
-
-        /** Message. */
-        MSG;
-
-        /** Enum values. */
-        private static final Type[] VALS = values();
-
-        /**
-         * Efficiently gets enumerated value from its ordinal.
-         *
-         * @param ord Ordinal value.
-         * @return Enumerated value.
-         */
-        @Nullable public static Type fromOrdinal(int ord) {
-            return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
new file mode 100644
index 0000000..ecb79cb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Enum representing possible types of collection items.
+ */
+public enum MessageCollectionItemType {
+    /** Byte. */
+    BYTE,
+
+    /** Short. */
+    SHORT,
+
+    /** Integer. */
+    INT,
+
+    /** Long. */
+    LONG,
+
+    /** Float. */
+    FLOAT,
+
+    /** Double. */
+    DOUBLE,
+
+    /** Character. */
+    CHAR,
+
+    /** Boolean. */
+    BOOLEAN,
+
+    /** Byte array. */
+    BYTE_ARR,
+
+    /** Short array. */
+    SHORT_ARR,
+
+    /** Integer array. */
+    INT_ARR,
+
+    /** Long array. */
+    LONG_ARR,
+
+    /** Float array. */
+    FLOAT_ARR,
+
+    /** Double array. */
+    DOUBLE_ARR,
+
+    /** Character array. */
+    CHAR_ARR,
+
+    /** Boolean array. */
+    BOOLEAN_ARR,
+
+    /** String. */
+    STRING,
+
+    /** Bit set. */
+    BIT_SET,
+
+    /** UUID. */
+    UUID,
+
+    /** Ignite UUID. */
+    IGNITE_UUID,
+
+    /** Message. */
+    MSG;
+
+    /** Enum values. */
+    private static final MessageCollectionItemType[] VALS = values();
+
+    /**
+     * Efficiently gets enumerated value from its ordinal.
+     *
+     * @param ord Ordinal value.
+     * @return Enumerated value.
+     */
+    @Nullable public static MessageCollectionItemType fromOrdinal(int ord) {
+        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index dae8845..8eeb7b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -213,7 +213,7 @@ public interface MessageReader {
      * @param itemCls Array component class.
      * @return Array of objects.
      */
-    public <T> T[] readObjectArray(String name, MessageAdapter.Type itemType, Class<T> itemCls);
+    public <T> T[] readObjectArray(String name, MessageCollectionItemType itemType, Class<T> itemCls);
 
     /**
      * Reads collection.
@@ -222,7 +222,7 @@ public interface MessageReader {
      * @param itemType Collection item type.
      * @return Collection.
      */
-    public <C extends Collection<?>> C readCollection(String name, MessageAdapter.Type itemType);
+    public <C extends Collection<?>> C readCollection(String name, MessageCollectionItemType itemType);
 
     /**
      * Reads map.
@@ -233,8 +233,8 @@ public interface MessageReader {
      * @param linked Whether {@link LinkedHashMap} should be created.
      * @return Map.
      */
-    public <M extends Map<?, ?>> M readMap(String name, MessageAdapter.Type keyType, MessageAdapter.Type valType,
-        boolean linked);
+    public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType keyType,
+        MessageCollectionItemType valType, boolean linked);
 
     /**
      * Tells whether last invocation of any of {@code readXXX(...)}
@@ -244,4 +244,16 @@ public interface MessageReader {
      * @return Whether las value was fully read.
      */
     public boolean isLastRead();
+
+    /**
+     * Gets current read state.
+     *
+     * @return Read state.
+     */
+    public int state();
+
+    /**
+     * Increments read state.
+     */
+    public void incrementState();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 0e35b62..3251102 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -240,7 +240,7 @@ public interface MessageWriter {
      * @param itemType Array component type.
      * @return Whether array was fully written.
      */
-    public <T> boolean writeObjectArray(String name, T[] arr, MessageAdapter.Type itemType);
+    public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType);
 
     /**
      * Writes collection.
@@ -250,7 +250,7 @@ public interface MessageWriter {
      * @param itemType Collection item type.
      * @return Whether value was fully written.
      */
-    public <T> boolean writeCollection(String name, Collection<T> col, MessageAdapter.Type itemType);
+    public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType);
 
     /**
      * Writes map.
@@ -261,8 +261,8 @@ public interface MessageWriter {
      * @param valType Map value type.
      * @return Whether value was fully written.
      */
-    public <K, V> boolean writeMap(String name, Map<K, V> map, MessageAdapter.Type keyType,
-        MessageAdapter.Type valType);
+    public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType,
+        MessageCollectionItemType valType);
 
     /**
      * @return Whether header of current message is already written.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java
index d17fd23..487b58f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java
@@ -78,20 +78,20 @@ public class JobStealingRequest extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
         if (!reader.beforeMessageRead())
             return false;
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 delta = reader.readInt("delta");
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 255822e..eda4f5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1506,6 +1506,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter);
 
+                IgnitePredicate<MessageAdapter> skipRecoveryPred = new IgnitePredicate<MessageAdapter>() {
+                    @Override public boolean apply(MessageAdapter msg) {
+                        return msg instanceof RecoveryLastReceivedMessage;
+                    }
+                };
+
                 GridNioServer<MessageAdapter> srvr =
                     GridNioServer.<MessageAdapter>builder()
                         .address(locHost)
@@ -1526,6 +1532,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .filters(new GridNioCodecFilter(parser, log, true),
                             new GridConnectionBytesVerifyFilter(log))
                         .messageFormatter(messageFormatter)
+                        .skipRecoveryPredicate(skipRecoveryPred)
                         .build();
 
                 boundTcpPort = port;
@@ -3056,7 +3063,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf) {
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
             if (buf.remaining() < 32)
                 return false;
 
@@ -3134,7 +3141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf) {
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
             if (buf.remaining() < 8)
                 return false;
 
@@ -3148,13 +3155,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             return RECOVERY_LAST_ID_MSG_TYPE;
         }
 
-        @Override public byte fieldsCount() {
-            return 0; // TODO: implement.
-        }
-
         /** {@inheritDoc} */
-        @Override public boolean skipRecovery() {
-            return true;
+        @Override public byte fieldsCount() {
+            return 0;
         }
 
         /** {@inheritDoc} */
@@ -3209,7 +3212,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf) {
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
             if (buf.remaining() < 16)
                 return false;
 
@@ -3225,8 +3228,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             return NODE_ID_MSG_TYPE;
         }
 
+        /** {@inheritDoc} */
         @Override public byte fieldsCount() {
-            return 0; // TODO: implement.
+            return 0;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 3f19bef..45e2bb7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -151,7 +151,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
         }
 
         /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf) {
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index f9add5a..ba8082b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -225,7 +225,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf) {
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java
index dbc8394..80944ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java
@@ -113,7 +113,7 @@ class GridTestMessage extends MessageAdapter implements Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
index 9104d20..dea2c79 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java
@@ -134,17 +134,17 @@ public class GridTestMessage extends MessageAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf) {
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
-        switch (readState) {
+        switch (reader.state()) {
             case 0:
                 srcNodeId = reader.readUuid(null);
 
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 1:
                 msgId = reader.readLong(null);
@@ -152,7 +152,7 @@ public class GridTestMessage extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 2:
                 resId = reader.readLong(null);
@@ -160,7 +160,7 @@ public class GridTestMessage extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
 
             case 3:
                 payload = reader.readByteArray(null);
@@ -168,7 +168,7 @@ public class GridTestMessage extends MessageAdapter {
                 if (!reader.isLastRead())
                     return false;
 
-                readState++;
+                reader.incrementState();
         }
 
         return true;


Mime
View raw message